Skip to main content

Data Quality Design Patterns: WAP and AWAP

Overview

This tutorial demonstrates how to implement Write-Audit-Publish (WAP) and Audit-Write-Audit-Publish (AWAP) data quality patterns using CoginitiScript. These patterns, borrowed from software engineering, provide controlled gates that ensure only validated data reaches your production tables.

What You'll Learn:

  • Implement WAP pattern for post-transformation validation
  • Implement AWAP pattern for pre and post-transformation validation
  • Use CoginitiScript's test framework to enforce data quality gates
  • Orchestrate multi-phase pipelines with fail-fast validation

Prerequisites:

  • Familiarity with CoginitiScript basics
  • Understanding of SQL and data transformations
  • Access to a data warehouse (Snowflake, BigQuery, Redshift, etc.)

Understanding the Patterns

WAP (Write-Audit-Publish)

WAP introduces a staging layer between raw data and production tables. Data is transformed into staging, validated through quality tests, and only promoted to production if it passes all checks.

Flow: Raw Data → Write to Staging → Audit Staging → Publish to Production

Benefits:

  • Prevents bad data from reaching production
  • Maintains production table integrity
  • Provides clear rollback points

When to Use:

  • Transformation logic is well-tested
  • Source data is generally reliable
  • You need efficient validation gates

AWAP (Audit-Write-Audit-Publish)

AWAP extends WAP by adding input validation before transformation. This catches data quality issues at the source, saving compute resources and providing earlier failure signals.

Flow: Raw Data → Audit Input → Write to Staging → Audit Output → Publish to Production

Benefits:

  • Catches source data issues early (fail-fast)
  • Validates both input and output
  • Reduces wasted compute on bad data

When to Use:

  • Source data quality is inconsistent
  • Transformations are computationally expensive
  • You need comprehensive quality gates

Project Setup

Create the following directory structure:

data_quality_tutorial/
├── sources/
│ └── sources # Source data abstractions
├── staging/
│ ├── wap/
│ │ ├── customers_staging # WAP staging layer
│ │ └── customers_tests # WAP validation tests
│ └── awap/
│ ├── input_tests # AWAP input validation
│ ├── customers_staging # AWAP staging layer
│ └── output_tests # AWAP output validation
├── production/
│ ├── wap_customers # WAP production tables
│ └── awap_customers # AWAP production tables
├── wap_pipeline # WAP orchestration
└── awap_pipeline # AWAP orchestration

Implementation: WAP Pattern

Step 1: Source Data Abstraction

Create a source layer to abstract raw tables and provide consistent column naming.

File: sources/sources

#+src sql RawCustomers()
#+meta {
:doc "Abstracts raw customer data from source system"
}
#+begin
SELECT
customer_id,
email,
first_name,
last_name,
country_code,
phone,
created_at,
updated_at
FROM raw.customers;
#+end

#+src sql RawOrders()
#+meta {
:doc "Abstracts raw order data from source system"
}
#+begin
SELECT
order_id,
customer_id,
order_date,
order_status,
total_amount
FROM raw.orders;
#+end

Step 2: Staging Layer with Transformations

Transform and materialize data to a staging table.

File: staging/wap/customers_staging

#+import "data_quality_tutorial/sources"

#+src sql Customers()
#+meta {
:doc "Stages customer data with business logic transformations",
:publication {
:type "table",
:name "wap_customers_staging"
}
}
#+begin
SELECT
customer_id,
LOWER(TRIM(email)) AS email,
INITCAP(first_name) AS first_name,
INITCAP(last_name) AS last_name,
CASE
WHEN country_code = 'US' THEN 'USA'
WHEN country_code = 'UK' THEN 'GBR'
ELSE UPPER(country_code)
END AS country_code,
REGEXP_REPLACE(phone, '[^0-9]', '') AS phone_clean,
created_at,
updated_at
FROM {{ sources.RawCustomers() }}
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM wap_customers_staging
);
#+end

Step 3: Quality Tests

Define tests that return empty results on success, or failing rows on failure.

File: staging/wap/customers_tests

#+import "data_quality_tutorial/staging/wap/customers_staging"

#+test sql NoNullKeys()
#+meta {
:doc "Verify no null primary keys or required fields"
}
#+begin
SELECT
customer_id,
email,
created_at
FROM {{ customers_staging.Customers() }}
WHERE customer_id IS NULL
OR email IS NULL
OR created_at IS NULL;
#+end

#+test sql ValidEmailFormat()
#+meta {
:doc "Ensure all emails match expected pattern"
}
#+begin
SELECT
customer_id,
email
FROM {{ customers_staging.Customers() }}
WHERE email NOT LIKE '%@%.%'
OR email LIKE '%..%'
OR email LIKE '@%'
OR email LIKE '%@';
#+end

#+test sql NoDuplicates()
#+meta {
:doc "Check for duplicate customer IDs in staging"
}
#+begin
SELECT
customer_id,
COUNT(*) AS duplicate_count
FROM {{ customers_staging.Customers() }}
GROUP BY customer_id
HAVING COUNT(*) > 1;
#+end

#+test sql ValidCountryCodes()
#+meta {
:doc "Verify country codes are ISO standard (2-3 chars, uppercase)"
}
#+begin
SELECT
customer_id,
country_code
FROM {{ customers_staging.Customers() }}
WHERE LENGTH(country_code) NOT BETWEEN 2 AND 3
OR country_code != UPPER(country_code)
OR country_code ~ '[^A-Z]';
#+end

#+test sql ValidPhoneNumbers()
#+meta {
:doc "Check phone numbers are cleaned and valid length"
}
#+begin
SELECT
customer_id,
phone_clean
FROM {{ customers_staging.Customers() }}
WHERE phone_clean IS NOT NULL
AND LENGTH(phone_clean) NOT BETWEEN 10 AND 15;
#+end

Step 4: Production Layer

The production layer references staging blocks directly. CoginitiScript handles materialization through publication metadata.

File: production/wap_customers

#+import "data_quality_tutorial/staging/wap/customers_staging"

#+src sql Customers()
#+meta {
:doc "Production customer table - contains only validated data",
:publication {
:type "table",
:name "customers"
}
}
#+begin
SELECT
customer_id,
email,
first_name,
last_name,
country_code,
phone_clean AS phone,
created_at,
updated_at
FROM {{ customers_staging.Customers() }};
#+end

Step 5: WAP Orchestration

Orchestrate the three-phase WAP pipeline: Write → Audit → Publish.

File: wap_pipeline

#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/wap/customers_staging"
#+import "data_quality_tutorial/staging/wap/customers_tests"
#+import "data_quality_tutorial/production/wap_customers"

-- Phase 1: WRITE to staging
{{ publication.Run(packages=[customers_staging]) }}

-- Phase 2: AUDIT staging data (fail fast if any test fails)
{{ test.Run(packages=[customers_tests], onFailure=test.Stop) }}

-- Phase 3: PUBLISH to production (only runs if tests pass)
{{ publication.Run(packages=[wap_customers]) }}

Key Points:

  • publication.Run() materializes staging tables based on publication metadata
  • test.Run() with onFailure=test.Stop implements fail-fast behavior
  • Production publication only executes if all tests pass

Implementation: AWAP Pattern

Step 1: Input Validation Tests

Create tests that validate raw data before transformation.

File: staging/awap/input_tests

#+import "data_quality_tutorial/sources"

#+test sql InputSchemaValid()
#+meta {
:doc "Verify raw data has all required columns without nulls"
}
#+begin
SELECT
customer_id,
email,
created_at
FROM {{ sources.RawCustomers() }}
WHERE customer_id IS NULL
OR email IS NULL
OR created_at IS NULL;
#+end

#+test sql InputVolumeCheck()
#+meta {
:doc "Ensure minimum expected daily volume of new records"
}
#+begin
SELECT
COUNT(*) AS new_records,
CURRENT_DATE - 1 AS check_date
FROM {{ sources.RawCustomers() }}
WHERE DATE(updated_at) = CURRENT_DATE - 1
HAVING COUNT(*) < 10; -- Expect at least 10 records daily
#+end

#+test sql InputDataFreshness()
#+meta {
:doc "Check that source data is not stale"
}
#+begin
SELECT
MAX(updated_at) AS latest_update,
CURRENT_TIMESTAMP AS check_time,
EXTRACT(HOUR FROM CURRENT_TIMESTAMP - MAX(updated_at)) AS hours_stale
FROM {{ sources.RawCustomers() }}
HAVING EXTRACT(HOUR FROM CURRENT_TIMESTAMP - MAX(updated_at)) > 25; -- Alert if > 25 hours old
#+end

#+test sql InputAnomalyDetection()
#+meta {
:doc "Flag suspicious patterns in raw data (potential bot/spam)"
}
#+begin
WITH email_domains AS (
SELECT
SUBSTRING(email FROM '@(.*)$') AS domain,
COUNT(*) AS domain_count
FROM {{ sources.RawCustomers() }}
WHERE DATE(updated_at) >= CURRENT_DATE - 1
GROUP BY SUBSTRING(email FROM '@(.*)$')
)
SELECT
domain,
domain_count
FROM email_domains
WHERE domain_count > 1000 -- Flag if single domain has >1000 signups/day
OR domain IS NULL;
#+end

#+test sql InputReferentialIntegrity()
#+meta {
:doc "Validate foreign keys exist in reference tables"
}
#+begin
SELECT
r.customer_id,
r.country_code
FROM {{ sources.RawCustomers() }} r
LEFT JOIN ref.country_codes c
ON r.country_code = c.code
WHERE r.country_code IS NOT NULL
AND c.code IS NULL;
#+end

Step 2: Staging Layer (same as WAP)

File: staging/awap/customers_staging

#+import "data_quality_tutorial/sources"

#+src sql Customers()
#+meta {
:doc "Stages customer data with business logic transformations",
:publication {
:type "table",
:name "awap_customers_staging"
}
}
#+begin
SELECT
customer_id,
LOWER(TRIM(email)) AS email,
INITCAP(first_name) AS first_name,
INITCAP(last_name) AS last_name,
CASE
WHEN country_code = 'US' THEN 'USA'
WHEN country_code = 'UK' THEN 'GBR'
ELSE UPPER(country_code)
END AS country_code,
REGEXP_REPLACE(phone, '[^0-9]', '') AS phone_clean,
created_at,
updated_at
FROM {{ sources.RawCustomers() }}
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM awap_customers_staging
);
#+end

Step 3: Output Validation Tests

Create tests that validate transformed data in staging.

File: staging/awap/output_tests

#+import "data_quality_tutorial/staging/awap/customers_staging"

#+test sql OutputBusinessRules()
#+meta {
:doc "Verify transformed data meets all business requirements"
}
#+begin
SELECT
customer_id,
email,
country_code,
created_at,
updated_at
FROM {{ customers_staging.Customers() }}
WHERE email NOT LIKE '%@%.%'
OR LENGTH(country_code) NOT IN (2, 3)
OR created_at > CURRENT_TIMESTAMP()
OR updated_at < created_at;
#+end

#+test sql OutputReferentialIntegrity()
#+meta {
:doc "Ensure transformed country codes match dimension table"
}
#+begin
SELECT
s.customer_id,
s.country_code
FROM {{ customers_staging.Customers() }} s
LEFT JOIN dim.countries c
ON s.country_code = c.country_code
WHERE c.country_code IS NULL;
#+end

#+test sql OutputDuplicates()
#+meta {
:doc "Check for duplicates introduced during transformation"
}
#+begin
SELECT
customer_id,
COUNT(*) AS duplicate_count,
ARRAY_AGG(updated_at) AS update_timestamps
FROM {{ customers_staging.Customers() }}
GROUP BY customer_id
HAVING COUNT(*) > 1;
#+end

#+test sql OutputMetrics()
#+meta {
:doc "Validate output metrics are within expected ranges"
}
#+begin
WITH metrics AS (
SELECT
COUNT(*) AS total_records,
COUNT(DISTINCT email) AS unique_emails,
COUNT(DISTINCT email)::FLOAT / NULLIF(COUNT(*), 0) AS email_uniqueness,
AVG(LENGTH(email)) AS avg_email_length
FROM {{ customers_staging.Customers() }}
)
SELECT *
FROM metrics
WHERE email_uniqueness < 0.98 -- Expect 98%+ unique emails
OR avg_email_length < 5 -- Suspicious if avg email < 5 chars
OR avg_email_length > 100; -- Suspicious if avg email > 100 chars
#+end

Step 4: Production Layer (same as WAP)

File: production/awap_customers

#+import "data_quality_tutorial/staging/awap/customers_staging"

#+src sql Customers()
#+meta {
:doc "Production customer table - validated through AWAP pipeline",
:publication {
:type "table",
:name "customers_awap"
}
}
#+begin
SELECT
customer_id,
email,
first_name,
last_name,
country_code,
phone_clean AS phone,
created_at,
updated_at
FROM {{ customers_staging.Customers() }};
#+end

Step 5: AWAP Orchestration

Orchestrate the four-phase AWAP pipeline: Audit Input → Write → Audit Output → Publish.

File: awap_pipeline

#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/awap/input_tests"
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+import "data_quality_tutorial/staging/awap/output_tests"
#+import "data_quality_tutorial/production/awap_customers"

-- Phase 1: AUDIT INPUT (validate raw data before transformation)
{{ test.Run(packages=[input_tests], onFailure=test.Stop) }}

-- Phase 2: WRITE to staging (only runs if input validation passes)
{{ publication.Run(packages=[customers_staging]) }}

-- Phase 3: AUDIT OUTPUT (validate transformed data)
{{ test.Run(packages=[output_tests], onFailure=test.Stop) }}

-- Phase 4: PUBLISH to production (only runs if all tests pass)
{{ publication.Run(packages=[awap_customers]) }}

Key Difference from WAP:

  • Input validation runs before publication.Run() for staging
  • Catches source issues early, preventing wasted transformation compute
  • Both input and output must pass for production promotion

Advanced Patterns

Selective Test Execution

Run specific tests instead of entire packages:

#+import "std/test"
#+import "data_quality_tutorial/staging/awap/input_tests"
#+import "data_quality_tutorial/staging/awap/output_tests"

-- Run only critical input tests
{{ test.Run(
tests=[
input_tests.InputSchemaValid,
input_tests.InputVolumeCheck
],
onFailure=test.Stop
) }}

-- Run all output tests
{{ test.Run(packages=[output_tests], onFailure=test.Stop) }}

Multi-Table Parallel Pipeline

Process multiple entities with parallel execution:

File: multi_table_awap_pipeline

#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/awap/customers_input_tests"
#+import "data_quality_tutorial/staging/awap/orders_input_tests"
#+import "data_quality_tutorial/staging/awap/products_input_tests"
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+import "data_quality_tutorial/staging/awap/orders_staging"
#+import "data_quality_tutorial/staging/awap/products_staging"
#+import "data_quality_tutorial/staging/awap/customers_output_tests"
#+import "data_quality_tutorial/staging/awap/orders_output_tests"
#+import "data_quality_tutorial/staging/awap/products_output_tests"
#+import "data_quality_tutorial/production"

-- Phase 1: AUDIT all inputs in parallel
{{ test.Run(
packages=[
customers_input_tests,
orders_input_tests,
products_input_tests
],
onFailure=test.Stop
) }}

-- Phase 2: WRITE all staging tables in parallel
{{ publication.Run(
packages=[
customers_staging,
orders_staging,
products_staging
],
parallelism=3
) }}

-- Phase 3: AUDIT all outputs in parallel
{{ test.Run(
packages=[
customers_output_tests,
orders_output_tests,
products_output_tests
],
onFailure=test.Stop
) }}

-- Phase 4: PUBLISH to production
{{ publication.Run(packages=[production]) }}

Monitoring-Friendly Pipeline

Use test.Continue for non-critical checks that should log warnings but not block:

#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/awap/input_tests"
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+import "data_quality_tutorial/staging/awap/output_tests"
#+import "data_quality_tutorial/staging/awap/monitoring_tests"
#+import "data_quality_tutorial/production/awap_customers"

-- Critical input validation (must pass)
{{ test.Run(packages=[input_tests], onFailure=test.Stop) }}

-- Transform
{{ publication.Run(packages=[customers_staging]) }}

-- Critical output validation (must pass)
{{ test.Run(packages=[output_tests], onFailure=test.Stop) }}

-- Non-critical monitoring checks (log but don't block)
{{ test.Run(packages=[monitoring_tests], onFailure=test.Continue) }}

-- Publish
{{ publication.Run(packages=[awap_customers]) }}

Testing Your Implementation

Execute WAP Pipeline

#+import "data_quality_tutorial/wap_pipeline"

{{ wap_pipeline.Run() }}

Execute AWAP Pipeline

#+import "data_quality_tutorial/awap_pipeline"

{{ awap_pipeline.Run() }}

Verify Results

-- Check WAP production table
SELECT
COUNT(*) AS total_customers,
COUNT(DISTINCT email) AS unique_emails,
MIN(created_at) AS earliest_customer,
MAX(updated_at) AS latest_update
FROM customers;

-- Check AWAP production table
SELECT
COUNT(*) AS total_customers,
COUNT(DISTINCT email) AS unique_emails,
MIN(created_at) AS earliest_customer,
MAX(updated_at) AS latest_update
FROM customers_awap;

-- Compare staging vs production counts
SELECT
'WAP' AS pipeline,
(SELECT COUNT(*) FROM wap_customers_staging) AS staging_count,
(SELECT COUNT(*) FROM customers) AS production_count,
(SELECT COUNT(*) FROM wap_customers_staging) -
(SELECT COUNT(*) FROM customers) AS diff
UNION ALL
SELECT
'AWAP' AS pipeline,
(SELECT COUNT(*) FROM awap_customers_staging) AS staging_count,
(SELECT COUNT(*) FROM customers_awap) AS production_count,
(SELECT COUNT(*) FROM awap_customers_staging) -
(SELECT COUNT(*) FROM customers_awap) AS diff;

Simulate Test Failures

Insert bad data to verify your quality gates work:

-- Insert invalid email
INSERT INTO raw.customers (customer_id, email, created_at, updated_at)
VALUES (99999, 'invalid-email', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);

-- Run pipeline - should fail at validation stage
#+import "data_quality_tutorial/wap_pipeline"
{{ wap_pipeline.Run() }}

-- Clean up
DELETE FROM raw.customers WHERE customer_id = 99999;

Best Practices

1. Test Naming Conventions

Use descriptive prefixes to organize tests:

  • Input* for pre-transformation validation
  • Output* for post-transformation validation
  • NoNull*, Valid*, Check* for specific validation types

2. Incremental Loading

Use WHERE clauses to process only new/changed records:

WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM staging_table
)

3. Fail-Fast vs Fail-Tolerant

  • Use onFailure=test.Stop for critical quality gates
  • Use onFailure=test.Continue for monitoring and alerting
  • Always use test.Stop for input validation in AWAP

4. Test Completeness

Design tests that:

  • Return empty results on success
  • Return failing rows with diagnostic information
  • Include row counts and affected IDs for debugging

5. Documentation

Use #+meta blocks to document:

  • Purpose of each transformation
  • Business rules being validated
  • Expected data volumes and SLAs

When to Use Each Pattern

Choose WAP When:

  • Source data quality is generally good
  • Transformation logic is the primary concern
  • You need efficient, single-stage validation
  • Compute costs are not a primary concern

Choose AWAP When:

  • Source data quality is inconsistent or unknown
  • Transformations are computationally expensive
  • Early failure detection saves significant resources
  • You need comprehensive end-to-end validation
  • Regulatory compliance requires input validation

Performance Considerations

AspectWAPAWAP
Compute CostModerateHigher
Storage CostLowerLower
Failure DetectionPost-transformationPre and post-transformation
Wasted ComputeHigher (transforms bad data)Lower (fails before transform)
ComplexityLowerHigher
Best ForStable sourcesVolatile sources

Scheduling and Automation

For CoginitiScript Team/Enterprise editions, schedule pipelines through the UI:

  1. Navigate to your pipeline file (wap_pipeline or awap_pipeline)
  2. Click "Schedule" in the toolbar
  3. Configure frequency (hourly, daily, weekly)
  4. Set timezone and execution window
  5. Enable notifications for test failures

The scheduler automatically resolves dependencies and executes blocks in the correct order.

Troubleshooting

Pipeline Fails at Test Stage

Check test results:

-- Tests return failing rows, not error messages
-- Review the actual data that failed validation

Common issues:

  • Null values in required fields → Check source data quality
  • Duplicate keys → Review transformation logic for unintended duplicates
  • Invalid formats → Add cleaning logic in transformation block

Staging Populates but Production is Empty

Verify publication metadata:

  • Ensure production blocks have #+meta { :publication {...} }
  • Check that publication.Run() includes production packages

Tests Pass but Data Quality is Poor

Expand test coverage:

  • Add statistical checks (mean, median, outliers)
  • Implement cross-field validation
  • Add trend analysis (compare to historical patterns)

Summary

This tutorial demonstrated how to implement enterprise-grade data quality patterns using CoginitiScript:

  • WAP provides efficient post-transformation validation
  • AWAP extends WAP with pre-transformation validation for fail-fast behavior
  • CoginitiScript's test framework enables declarative quality gates
  • Block composition and publication metadata create maintainable pipelines
  • Parallel execution optimizes multi-table workflows

These patterns ensure that only validated, high-quality data reaches your production tables, reducing downstream errors and increasing trust in your analytics.

Next Steps

  • Extend patterns to your organization's data entities
  • Implement custom tests for business-specific rules
  • Create reusable test libraries for common validations
  • Integrate with alerting systems for test failure notifications
  • Measure pipeline success rates and data quality metrics over time

Additional Resources