Skip to main content

Building an E-commerce Analytics Pipeline with CoginitiScript

Welcome to this comprehensive tutorial on building data transformation pipelines with CoginitiScript! We'll use a fictional e-commerce store called Varenyk Shop to demonstrate how to transform raw sales data into analytics-ready datasets.

What You'll Learn

By the end of this tutorial, you'll understand how to:

  • Structure CoginitiScript projects using packages and layers
  • Create reusable source data abstractions
  • Build analytical datasets with RFM (Recency, Frequency, Monetary) analysis
  • Use dynamic SQL generation for flexible reporting
  • Implement publication strategies for data materialization
  • Set up automated data pipelines

Prerequisites

  • Basic understanding of SQL
  • Access to a PostgreSQL database
  • Coginiti Team or Enterprise account
  • Familiarity with CoginitiScript basics (see CoginitiScript Reference)

About Varenyk Shop

Varenyk Shop is a fictional e-commerce store that sells varenyky - traditional Ukrainian dumplings. The business needs to analyze customer behavior, order patterns, and payment preferences to make data-driven decisions.

Raw Data Overview

Our source system contains the following tables with raw transactional data:

Source Tables:

  • src_customers - Customer information
  • src_products - Product catalog
  • src_orders - Order headers
  • src_order_lines - Order line items
  • src_order_statuses - Order status lookup
  • src_payment_types - Payment method lookup

Analytics Goals: We want to transform this raw data into two main analytics datasets:

  • Customers dataset - Customer analysis with RFM metrics
  • Orders dataset - Order analysis with payment breakdowns

Tutorial Setup

Step 1: Download and Import the Project

  1. Download varenyk_shop.coginitipkg from the coginiti-dev/varenyk_shop repository
  2. Import this package into your Coginiti catalog root folder
  3. Navigate to the imported varenyk_shop project in your catalog

Step 2: Create Sample Data

  1. Open varenyk_shop/run_seed in Coginiti
  2. Select your PostgreSQL database connection
  3. Execute the script to generate sample data

run_seed is a thin entry point that simply invokes the seed package:

#+import "varenyk_shop/seed"

{{ seed.Run() }};

This creates all source tables and populates them with sample e-commerce data.

Step 3: Execute the Pipeline

  1. Open varenyk_shop/schedule_nightly
  2. Execute it on the same PostgreSQL database
  3. Verify that the source tests pass and that customers and orders tables are created

Now let's dive into how this pipeline is structured and implemented!

Project Structure

The tutorial project follows a layered architecture pattern:

varenyk_shop/
├── reporting/ # Analytics layer
│ ├── customers # Customer analytics dataset
│ └── orders # Order analytics dataset
├── sources/ # Source data abstraction layer
│ ├── sources # Source table definitions
│ └── sources_test # Data quality tests for the sources package
├── seed/ # Sample data generation
│ ├── gen/
│ │ └── person # Person data generator
│ ├── clean # Data cleanup utilities
│ └── source_data # Source data creation
├── run_seed # Entry point: populates source data
├── schedule_nightly # Pipeline orchestration: tests + publications
├── linter.toml # CoginitiScript SQL linter rules
└── project.toml # Project manifest and environment configuration

This structure demonstrates several CoginitiScript best practices:

  • Layered architecture for maintainability
  • Package organization for code reuse
  • Separation of concerns between data layers
  • Tests live next to the data they validate, in the sources package

Layer 1: Source Data Abstraction

Why Create a Source Layer?

The source layer provides several key benefits:

  • Abstraction: Downstream transformations reference blocks, not tables directly
  • Maintainability: Changes to source systems only affect the source layer
  • Cleanliness: Transformation logic stays focused on business rules
  • Consistency: Standardized column names and data types

Source Block Definitions

Let's examine the source blocks in varenyk_shop/sources/sources:

#+src sql Customers()
#+begin
SELECT
id AS customer_id,
first_name,
last_name,
email
FROM
src_customers;
#+end

#+src sql Products()
#+begin
SELECT
id AS product_id,
name
FROM
src_products;
#+end

#+src sql Orders()
#+begin
SELECT
id AS order_id,
order_date,
customer_id,
payment_type_id,
order_status
FROM
src_orders;
#+end

#+src sql OrderLines()
#+begin
SELECT
id AS order_line_id,
order_id,
product_id,
quantity,
price
FROM
src_order_lines;
#+end

#+src sql OrderStatuses()
#+begin
SELECT
id AS order_status_id,
status_name
FROM
src_order_statuses;
#+end

#+src sql PaymentTypes()
#+begin
SELECT
id AS payment_type_id,
payment_type_name
FROM
src_payment_types;
#+end

Key Features:

  • Column renaming: Standardize ID column names across tables
  • Column selection: Only expose needed columns
  • Clean interface: Downstream consumers don't need to know about source table structure

Source Data Quality Tests

The sources package also ships with a companion file, varenyk_shop/sources/sources_test, that asserts invariants on the raw data before it flows downstream. Tests are declared with #+test sql, and any rows the query returns are treated as failures.

#+test sql Test_ShoultNotHaveCustomersWithNullEmail()
#+meta {
:doc "Should not have customers with invalid email"
}
#+begin
SELECT
*
FROM
{{ Customers() }}
WHERE
email ~* '^[A-Za-z0-9._+%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+$' = false;
#+end

#+test sql Test_OrdersHaveExistingProducts()
#+meta {
:doc "Verify that order lines references only existing products"
}
#+begin
SELECT
*
FROM
{{ OrderLines() }}
WHERE
product_id NOT IN (
SELECT product_id FROM {{ Products() }}
);
#+end

These tests are executed automatically by schedule_nightly (covered later) so that bad source data fails the pipeline before the reporting layer is rebuilt.

Layer 2: Analytics Models

Customer Analytics with RFM Analysis

RFM analysis is a proven method for customer segmentation:

  • Recency: How recently did the customer make a purchase?
  • Frequency: How often does the customer purchase?
  • Monetary Value: How much does the customer spend?

Implementation Strategy

We split the customer analytics into two blocks for better maintainability:

File: varenyk_shop/reporting/customers

#+import "varenyk_shop/sources"

#+src sql rfmValues()
#+meta {
:doc "Calculates RFM metrics for each customer:
- first_order: Date of first purchase
- recent_order: Date of most recent purchase
- total_orders: Total number of orders (Frequency)
- total_sales: Total amount spent (Monetary Value)"
}
#+begin
SELECT
customer_id,
MIN(o.order_date) AS first_order,
MAX(o.order_date) AS recent_order,
COUNT(o.order_id) AS total_orders,
SUM(ol.price) AS total_sales
FROM
{{ sources.Orders() }} AS o
INNER JOIN {{ sources.OrderLines() }} AS ol
ON ol.order_id = o.order_id
GROUP BY
customer_id;
#+end

#+src sql Customers()
#+meta {
:publication {
:type "table",
:name "customers"
}
}
#+begin
SELECT
c.customer_id,
c.first_name,
c.last_name,
r.first_order,
r.recent_order,
r.total_orders,
r.total_sales AS ltv -- Lifetime Value
FROM
{{ sources.Customers() }} AS c
LEFT JOIN {{ rfmValues() }} AS r
ON r.customer_id = c.customer_id;
#+end

Design Highlights:

  • Block composition: rfmValues() calculates metrics, Customers() assembles final result
  • Source references: Uses sources.Orders() and sources.OrderLines() blocks
  • Publication strategy: Materializes result as customers table
  • Documentation: Clear metadata explaining business logic

Order Analytics with Dynamic Payment Columns

The orders dataset demonstrates CoginitiScript's dynamic SQL generation capabilities.

File: varenyk_shop/reporting/orders

#+import "varenyk_shop/sources"

#+src sql Orders()
#+meta {
:publication {
:type "table",
:name "orders"
}
}
#+begin
SELECT
o.order_id,
o.customer_id,
o.order_date,
os.status_name AS order_status,
#+for t : iterator(sources.PaymentTypes()) do
SUM(CASE WHEN pt.payment_type_name = '{{ t["payment_type_name"] }}' THEN ol.price ELSE 0 END) AS {{ t["payment_type_name"] }}_amount,
#+end
SUM(ol.price) AS amount
FROM
{{ sources.Orders() }} AS o
INNER JOIN {{ sources.OrderLines() }} AS ol
ON ol.order_id = o.order_id
INNER JOIN {{ sources.PaymentTypes() }} AS pt
ON pt.payment_type_id = o.payment_type_id
INNER JOIN {{ sources.OrderStatuses() }} AS os
ON os.order_status_id = o.order_status
GROUP BY
o.order_id,
o.customer_id,
o.order_date,
os.status_name;
#+end

Generated SQL Preview:

iterator(sources.PaymentTypes()) runs the PaymentTypes() block at compile time and loops over each row. With the seeded payment types (credit_card, paypal, wire_transfer), the template expands to:

SELECT
o.order_id,
o.customer_id,
o.order_date,
os.status_name AS order_status,
SUM(CASE WHEN pt.payment_type_name = 'credit_card' THEN ol.price ELSE 0 END) AS credit_card_amount,
SUM(CASE WHEN pt.payment_type_name = 'paypal' THEN ol.price ELSE 0 END) AS paypal_amount,
SUM(CASE WHEN pt.payment_type_name = 'wire_transfer' THEN ol.price ELSE 0 END) AS wire_transfer_amount,
SUM(ol.price) AS amount
FROM ...

Benefits of Dynamic Generation:

  • Driven by data: New payment types in src_payment_types automatically produce new columns - no code change required
  • DRY Principle: No repetitive CASE expressions
  • Flexibility: Easy to evolve the pivot without touching reporting code

Pipeline Orchestration

Automated Test and Publication Execution

The pipeline orchestration is handled by varenyk_shop/schedule_nightly. It runs the source data quality tests first, then publishes the reporting layer:

#+import "std/publication"
#+import "std/test"
#+import "varenyk_shop/sources"
#+import "varenyk_shop/reporting"

{{ test.Run(packages = [sources]) }};

{{ publication.Run(packages = [reporting]) }};

How it works:

  1. Import required packages: Standard library (std/test, std/publication), the sources package (for its tests), and the reporting package (for its publications)
  2. Run tests: test.Run executes every #+test block in the sources package and fails fast if any test returns rows
  3. Execute publications: publication.Run materializes every block with :publication metadata in the reporting package
  4. Dependency resolution: CoginitiScript handles execution order automatically

Scheduling (Enterprise Feature)

In Coginiti Team/Enterprise, you can schedule this asset to run nightly:

  1. Right-click on schedule_nightly
  2. Select "Schedule"
  3. Configure timing and database connection
  4. Enable the schedule

Exploring the Results

Query the Customer Dataset

SELECT 
customer_id,
first_name,
last_name,
first_order,
recent_order,
total_orders,
ltv
FROM customers
ORDER BY ltv DESC
LIMIT 10;

Sample Results:

customer_idfirst_namelast_namefirst_orderrecent_ordertotal_ordersltv
15MariaPetrenko2024-01-152024-03-208245.50
7IvanKovalenko2024-01-202024-03-186198.75

Query the Orders Dataset

SELECT 
order_id,
customer_id,
order_date,
order_status,
credit_card_amount,
paypal_amount,
wire_transfer_amount,
amount
FROM orders
WHERE order_date >= '2024-03-01'
ORDER BY order_date DESC;

Advanced Patterns Demonstrated

1. Package Organization

-- Clean imports at the top of each file
#+import "varenyk_shop/sources"
#+import "std/publication"

2. Block Composition

-- Breaking complex logic into manageable pieces
SELECT * FROM {{ rfmValues() }} -- Intermediate calculation
-- Used in final result assembly

3. Dynamic SQL Generation

-- Iterate over a source block at compile time to generate columns dynamically
#+for t : iterator(sources.PaymentTypes()) do
-- Use t["payment_type_name"] to reference each row's columns
#+end

4. Publication Strategies

-- Materializing results for performance
#+meta {
:publication {
:type "table",
:name "customers"
}
}

Best Practices Demonstrated

1. Layered Architecture

  • Source layer: Abstract raw data
  • Reporting layer: Business logic and analytics
  • Orchestration: Pipeline execution

2. Documentation

#+meta {
:doc "Clear description of what this block does,
including parameter explanations and business context."
}

3. Naming Conventions

  • Block names: PascalCase (Customers(), OrderLines())
  • Column names: snake_case (customer_id, total_orders)
  • Package names: lowercase (sources, reporting)

4. Error Prevention

  • LEFT JOIN: Handle customers without orders gracefully
  • Standardized IDs: Consistent naming across all sources

Extending the Tutorial

Add New Metrics

Want to add customer acquisition cost? Add to rfmValues():

#+src sql rfmValues()
#+begin
SELECT
customer_id,
-- Existing metrics...
COUNT(DISTINCT DATE_TRUNC('month', o.order_date)) AS active_months,
AVG(ol.price) AS avg_order_value
FROM ...
#+end

Add New Payment Types

Because reporting/orders iterates over sources.PaymentTypes() at compile time, adding a new payment type is a data change, not a code change. Insert a new row into the lookup table:

INSERT INTO src_payment_types(payment_type_name) VALUES ('crypto'), ('bank_transfer');

Re-running schedule_nightly regenerates the orders publication with new crypto_amount and bank_transfer_amount columns automatically.

Create Product Analytics

Add a new reporting block:

#+src sql ProductAnalysis()
#+meta {
:publication {
:type "table",
:name "product_metrics"
}
}
#+begin
SELECT
p.product_id,
p.name,
COUNT(ol.order_line_id) AS total_orders,
SUM(ol.quantity) AS total_quantity_sold,
SUM(ol.price) AS total_revenue
FROM
{{ sources.Products() }} AS p
LEFT JOIN {{ sources.OrderLines() }} AS ol
ON ol.product_id = p.product_id
GROUP BY
p.product_id, p.name;
#+end

Troubleshooting

Common Issues

Issue: "Block not found" error

Solution: Check import statements and block names
- Ensure packages are imported correctly
- Verify block names match exactly (case-sensitive)

Issue: Publication fails

Solution: Check database permissions and table names
- Ensure database user can CREATE TABLE
- Verify table names don't conflict with existing objects

Issue: Dynamic SQL doesn't generate correctly

Solution: Test template expansion
- Use simple test queries to verify loop output
- Check constant definitions and data types

Next Steps

Learn More CoginitiScript Features

Explore Advanced Patterns

  • Incremental processing: Update only changed data
  • Data quality testing: Implement validation rules
  • Multi-environment deployment: Promote code through dev/test/prod

Build Your Own Pipeline

Apply these patterns to your data:

  1. Identify your source systems
  2. Design your analytics requirements
  3. Create a source abstraction layer
  4. Build transformation logic
  5. Set up publication and scheduling

Summary

This tutorial demonstrated how to build a complete analytics pipeline using CoginitiScript:

Structured project organization with packages and layers ✅ Source data abstraction for maintainability ✅ Business logic implementation with RFM analysis ✅ Dynamic SQL generation for flexible reporting ✅ Publication strategies for data materialization ✅ Pipeline orchestration for automation

CoginitiScript's features—blocks, packages, templates, and publication—provide a powerful foundation for building scalable, maintainable data transformation pipelines. The patterns shown here can be adapted to virtually any analytics use case.

Start with this foundation and gradually add complexity as your requirements grow. Happy coding with CoginitiScript!