Data Quality Design Patterns: WAP and AWAP
Overview
This tutorial demonstrates how to implement Write-Audit-Publish (WAP) and Audit-Write-Audit-Publish (AWAP) data quality patterns using CoginitiScript. These patterns, borrowed from software engineering, provide controlled gates that ensure only validated data reaches your production tables.
What You'll Learn:
- Implement WAP pattern for post-transformation validation
- Implement AWAP pattern for pre and post-transformation validation
- Use CoginitiScript's test framework to enforce data quality gates
- Orchestrate multi-phase pipelines with fail-fast validation
Prerequisites:
- Familiarity with CoginitiScript basics
- Understanding of SQL and data transformations
- Access to a data warehouse (Snowflake, BigQuery, Redshift, etc.)
Understanding the Patterns
WAP (Write-Audit-Publish)
WAP introduces a staging layer between raw data and production tables. Data is transformed into staging, validated through quality tests, and only promoted to production if it passes all checks.
Flow: Raw Data → Write to Staging → Audit Staging → Publish to Production
Benefits:
- Prevents bad data from reaching production
- Maintains production table integrity
- Provides clear rollback points
When to Use:
- Transformation logic is well-tested
- Source data is generally reliable
- You need efficient validation gates
AWAP (Audit-Write-Audit-Publish)
AWAP extends WAP by adding input validation before transformation. This catches data quality issues at the source, saving compute resources and providing earlier failure signals.
Flow: Raw Data → Audit Input → Write to Staging → Audit Output → Publish to Production
Benefits:
- Catches source data issues early (fail-fast)
- Validates both input and output
- Reduces wasted compute on bad data
When to Use:
- Source data quality is inconsistent
- Transformations are computationally expensive
- You need comprehensive quality gates
Project Setup
Create the following directory structure:
data_quality_tutorial/
├── sources/
│ └── sources # Source data abstractions
├── staging/
│ ├── wap/
│ │ ├── customers_staging # WAP staging layer
│ │ └── customers_tests # WAP validation tests
│ └── awap/
│ ├── input_tests # AWAP input validation
│ ├── customers_staging # AWAP staging layer
│ └── output_tests # AWAP output validation
├── production/
│ ├── wap_customers # WAP production tables
│ └── awap_customers # AWAP production tables
├── wap_pipeline # WAP orchestration
└── awap_pipeline # AWAP orchestration
Implementation: WAP Pattern
Step 1: Source Data Abstraction
Create a source layer to abstract raw tables and provide consistent column naming.
File: sources/sources
#+src sql RawCustomers()
#+meta {
:doc "Abstracts raw customer data from source system"
}
#+begin
SELECT
customer_id,
email,
first_name,
last_name,
country_code,
phone,
created_at,
updated_at
FROM raw.customers;
#+end
#+src sql RawOrders()
#+meta {
:doc "Abstracts raw order data from source system"
}
#+begin
SELECT
order_id,
customer_id,
order_date,
order_status,
total_amount
FROM raw.orders;
#+end
Step 2: Staging Layer with Transformations
Transform and materialize data to a staging table.
File: staging/wap/customers_staging
#+import "data_quality_tutorial/sources"
#+src sql Customers()
#+meta {
:doc "Stages customer data with business logic transformations",
:publication {
:type "table",
:name "wap_customers_staging"
}
}
#+begin
SELECT
customer_id,
LOWER(TRIM(email)) AS email,
INITCAP(first_name) AS first_name,
INITCAP(last_name) AS last_name,
CASE
WHEN country_code = 'US' THEN 'USA'
WHEN country_code = 'UK' THEN 'GBR'
ELSE UPPER(country_code)
END AS country_code,
REGEXP_REPLACE(phone, '[^0-9]', '') AS phone_clean,
created_at,
updated_at
FROM {{ sources.RawCustomers() }}
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM wap_customers_staging
);
#+end
Step 3: Quality Tests
Define tests that return empty results on success, or failing rows on failure.
File: staging/wap/customers_tests
#+import "data_quality_tutorial/staging/wap/customers_staging"
#+test sql NoNullKeys()
#+meta {
:doc "Verify no null primary keys or required fields"
}
#+begin
SELECT
customer_id,
email,
created_at
FROM {{ customers_staging.Customers() }}
WHERE customer_id IS NULL
OR email IS NULL
OR created_at IS NULL;
#+end
#+test sql ValidEmailFormat()
#+meta {
:doc "Ensure all emails match expected pattern"
}
#+begin
SELECT
customer_id,
email
FROM {{ customers_staging.Customers() }}
WHERE email NOT LIKE '%@%.%'
OR email LIKE '%..%'
OR email LIKE '@%'
OR email LIKE '%@';
#+end
#+test sql NoDuplicates()
#+meta {
:doc "Check for duplicate customer IDs in staging"
}
#+begin
SELECT
customer_id,
COUNT(*) AS duplicate_count
FROM {{ customers_staging.Customers() }}
GROUP BY customer_id
HAVING COUNT(*) > 1;
#+end
#+test sql ValidCountryCodes()
#+meta {
:doc "Verify country codes are ISO standard (2-3 chars, uppercase)"
}
#+begin
SELECT
customer_id,
country_code
FROM {{ customers_staging.Customers() }}
WHERE LENGTH(country_code) NOT BETWEEN 2 AND 3
OR country_code != UPPER(country_code)
OR country_code ~ '[^A-Z]';
#+end
#+test sql ValidPhoneNumbers()
#+meta {
:doc "Check phone numbers are cleaned and valid length"
}
#+begin
SELECT
customer_id,
phone_clean
FROM {{ customers_staging.Customers() }}
WHERE phone_clean IS NOT NULL
AND LENGTH(phone_clean) NOT BETWEEN 10 AND 15;
#+end
Step 4: Production Layer
The production layer references staging blocks directly. CoginitiScript handles materialization through publication metadata.
File: production/wap_customers
#+import "data_quality_tutorial/staging/wap/customers_staging"
#+src sql Customers()
#+meta {
:doc "Production customer table - contains only validated data",
:publication {
:type "table",
:name "customers"
}
}
#+begin
SELECT
customer_id,
email,
first_name,
last_name,
country_code,
phone_clean AS phone,
created_at,
updated_at
FROM {{ customers_staging.Customers() }};
#+end
Step 5: WAP Orchestration
Orchestrate the three-phase WAP pipeline: Write → Audit → Publish.
File: wap_pipeline
#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/wap/customers_staging"
#+import "data_quality_tutorial/staging/wap/customers_tests"
#+import "data_quality_tutorial/production/wap_customers"
-- Phase 1: WRITE to staging
{{ publication.Run(packages=[customers_staging]) }}
-- Phase 2: AUDIT staging data (fail fast if any test fails)
{{ test.Run(packages=[customers_tests], onFailure=test.Stop) }}
-- Phase 3: PUBLISH to production (only runs if tests pass)
{{ publication.Run(packages=[wap_customers]) }}
Key Points:
publication.Run()materializes staging tables based on publication metadatatest.Run()withonFailure=test.Stopimplements fail-fast behavior- Production publication only executes if all tests pass
Implementation: AWAP Pattern
Step 1: Input Validation Tests
Create tests that validate raw data before transformation.
File: staging/awap/input_tests
#+import "data_quality_tutorial/sources"
#+test sql InputSchemaValid()
#+meta {
:doc "Verify raw data has all required columns without nulls"
}
#+begin
SELECT
customer_id,
email,
created_at
FROM {{ sources.RawCustomers() }}
WHERE customer_id IS NULL
OR email IS NULL
OR created_at IS NULL;
#+end
#+test sql InputVolumeCheck()
#+meta {
:doc "Ensure minimum expected daily volume of new records"
}
#+begin
SELECT
COUNT(*) AS new_records,
CURRENT_DATE - 1 AS check_date
FROM {{ sources.RawCustomers() }}
WHERE DATE(updated_at) = CURRENT_DATE - 1
HAVING COUNT(*) < 10; -- Expect at least 10 records daily
#+end
#+test sql InputDataFreshness()
#+meta {
:doc "Check that source data is not stale"
}
#+begin
SELECT
MAX(updated_at) AS latest_update,
CURRENT_TIMESTAMP AS check_time,
EXTRACT(HOUR FROM CURRENT_TIMESTAMP - MAX(updated_at)) AS hours_stale
FROM {{ sources.RawCustomers() }}
HAVING EXTRACT(HOUR FROM CURRENT_TIMESTAMP - MAX(updated_at)) > 25; -- Alert if > 25 hours old
#+end
#+test sql InputAnomalyDetection()
#+meta {
:doc "Flag suspicious patterns in raw data (potential bot/spam)"
}
#+begin
WITH email_domains AS (
SELECT
SUBSTRING(email FROM '@(.*)$') AS domain,
COUNT(*) AS domain_count
FROM {{ sources.RawCustomers() }}
WHERE DATE(updated_at) >= CURRENT_DATE - 1
GROUP BY SUBSTRING(email FROM '@(.*)$')
)
SELECT
domain,
domain_count
FROM email_domains
WHERE domain_count > 1000 -- Flag if single domain has >1000 signups/day
OR domain IS NULL;
#+end
#+test sql InputReferentialIntegrity()
#+meta {
:doc "Validate foreign keys exist in reference tables"
}
#+begin
SELECT
r.customer_id,
r.country_code
FROM {{ sources.RawCustomers() }} r
LEFT JOIN ref.country_codes c
ON r.country_code = c.code
WHERE r.country_code IS NOT NULL
AND c.code IS NULL;
#+end
Step 2: Staging Layer (same as WAP)
File: staging/awap/customers_staging
#+import "data_quality_tutorial/sources"
#+src sql Customers()
#+meta {
:doc "Stages customer data with business logic transformations",
:publication {
:type "table",
:name "awap_customers_staging"
}
}
#+begin
SELECT
customer_id,
LOWER(TRIM(email)) AS email,
INITCAP(first_name) AS first_name,
INITCAP(last_name) AS last_name,
CASE
WHEN country_code = 'US' THEN 'USA'
WHEN country_code = 'UK' THEN 'GBR'
ELSE UPPER(country_code)
END AS country_code,
REGEXP_REPLACE(phone, '[^0-9]', '') AS phone_clean,
created_at,
updated_at
FROM {{ sources.RawCustomers() }}
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM awap_customers_staging
);
#+end
Step 3: Output Validation Tests
Create tests that validate transformed data in staging.
File: staging/awap/output_tests
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+test sql OutputBusinessRules()
#+meta {
:doc "Verify transformed data meets all business requirements"
}
#+begin
SELECT
customer_id,
email,
country_code,
created_at,
updated_at
FROM {{ customers_staging.Customers() }}
WHERE email NOT LIKE '%@%.%'
OR LENGTH(country_code) NOT IN (2, 3)
OR created_at > CURRENT_TIMESTAMP()
OR updated_at < created_at;
#+end
#+test sql OutputReferentialIntegrity()
#+meta {
:doc "Ensure transformed country codes match dimension table"
}
#+begin
SELECT
s.customer_id,
s.country_code
FROM {{ customers_staging.Customers() }} s
LEFT JOIN dim.countries c
ON s.country_code = c.country_code
WHERE c.country_code IS NULL;
#+end
#+test sql OutputDuplicates()
#+meta {
:doc "Check for duplicates introduced during transformation"
}
#+begin
SELECT
customer_id,
COUNT(*) AS duplicate_count,
ARRAY_AGG(updated_at) AS update_timestamps
FROM {{ customers_staging.Customers() }}
GROUP BY customer_id
HAVING COUNT(*) > 1;
#+end
#+test sql OutputMetrics()
#+meta {
:doc "Validate output metrics are within expected ranges"
}
#+begin
WITH metrics AS (
SELECT
COUNT(*) AS total_records,
COUNT(DISTINCT email) AS unique_emails,
COUNT(DISTINCT email)::FLOAT / NULLIF(COUNT(*), 0) AS email_uniqueness,
AVG(LENGTH(email)) AS avg_email_length
FROM {{ customers_staging.Customers() }}
)
SELECT *
FROM metrics
WHERE email_uniqueness < 0.98 -- Expect 98%+ unique emails
OR avg_email_length < 5 -- Suspicious if avg email < 5 chars
OR avg_email_length > 100; -- Suspicious if avg email > 100 chars
#+end
Step 4: Production Layer (same as WAP)
File: production/awap_customers
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+src sql Customers()
#+meta {
:doc "Production customer table - validated through AWAP pipeline",
:publication {
:type "table",
:name "customers_awap"
}
}
#+begin
SELECT
customer_id,
email,
first_name,
last_name,
country_code,
phone_clean AS phone,
created_at,
updated_at
FROM {{ customers_staging.Customers() }};
#+end
Step 5: AWAP Orchestration
Orchestrate the four-phase AWAP pipeline: Audit Input → Write → Audit Output → Publish.
File: awap_pipeline
#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/awap/input_tests"
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+import "data_quality_tutorial/staging/awap/output_tests"
#+import "data_quality_tutorial/production/awap_customers"
-- Phase 1: AUDIT INPUT (validate raw data before transformation)
{{ test.Run(packages=[input_tests], onFailure=test.Stop) }}
-- Phase 2: WRITE to staging (only runs if input validation passes)
{{ publication.Run(packages=[customers_staging]) }}
-- Phase 3: AUDIT OUTPUT (validate transformed data)
{{ test.Run(packages=[output_tests], onFailure=test.Stop) }}
-- Phase 4: PUBLISH to production (only runs if all tests pass)
{{ publication.Run(packages=[awap_customers]) }}
Key Difference from WAP:
- Input validation runs before
publication.Run()for staging - Catches source issues early, preventing wasted transformation compute
- Both input and output must pass for production promotion
Advanced Patterns
Selective Test Execution
Run specific tests instead of entire packages:
#+import "std/test"
#+import "data_quality_tutorial/staging/awap/input_tests"
#+import "data_quality_tutorial/staging/awap/output_tests"
-- Run only critical input tests
{{ test.Run(
tests=[
input_tests.InputSchemaValid,
input_tests.InputVolumeCheck
],
onFailure=test.Stop
) }}
-- Run all output tests
{{ test.Run(packages=[output_tests], onFailure=test.Stop) }}
Multi-Table Parallel Pipeline
Process multiple entities with parallel execution:
File: multi_table_awap_pipeline
#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/awap/customers_input_tests"
#+import "data_quality_tutorial/staging/awap/orders_input_tests"
#+import "data_quality_tutorial/staging/awap/products_input_tests"
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+import "data_quality_tutorial/staging/awap/orders_staging"
#+import "data_quality_tutorial/staging/awap/products_staging"
#+import "data_quality_tutorial/staging/awap/customers_output_tests"
#+import "data_quality_tutorial/staging/awap/orders_output_tests"
#+import "data_quality_tutorial/staging/awap/products_output_tests"
#+import "data_quality_tutorial/production"
-- Phase 1: AUDIT all inputs in parallel
{{ test.Run(
packages=[
customers_input_tests,
orders_input_tests,
products_input_tests
],
onFailure=test.Stop
) }}
-- Phase 2: WRITE all staging tables in parallel
{{ publication.Run(
packages=[
customers_staging,
orders_staging,
products_staging
],
parallelism=3
) }}
-- Phase 3: AUDIT all outputs in parallel
{{ test.Run(
packages=[
customers_output_tests,
orders_output_tests,
products_output_tests
],
onFailure=test.Stop
) }}
-- Phase 4: PUBLISH to production
{{ publication.Run(packages=[production]) }}
Monitoring-Friendly Pipeline
Use test.Continue for non-critical checks that should log warnings but not block:
#+import "std/test"
#+import "std/publication"
#+import "data_quality_tutorial/staging/awap/input_tests"
#+import "data_quality_tutorial/staging/awap/customers_staging"
#+import "data_quality_tutorial/staging/awap/output_tests"
#+import "data_quality_tutorial/staging/awap/monitoring_tests"
#+import "data_quality_tutorial/production/awap_customers"
-- Critical input validation (must pass)
{{ test.Run(packages=[input_tests], onFailure=test.Stop) }}
-- Transform
{{ publication.Run(packages=[customers_staging]) }}
-- Critical output validation (must pass)
{{ test.Run(packages=[output_tests], onFailure=test.Stop) }}
-- Non-critical monitoring checks (log but don't block)
{{ test.Run(packages=[monitoring_tests], onFailure=test.Continue) }}
-- Publish
{{ publication.Run(packages=[awap_customers]) }}
Testing Your Implementation
Execute WAP Pipeline
#+import "data_quality_tutorial/wap_pipeline"
{{ wap_pipeline.Run() }}
Execute AWAP Pipeline
#+import "data_quality_tutorial/awap_pipeline"
{{ awap_pipeline.Run() }}
Verify Results
-- Check WAP production table
SELECT
COUNT(*) AS total_customers,
COUNT(DISTINCT email) AS unique_emails,
MIN(created_at) AS earliest_customer,
MAX(updated_at) AS latest_update
FROM customers;
-- Check AWAP production table
SELECT
COUNT(*) AS total_customers,
COUNT(DISTINCT email) AS unique_emails,
MIN(created_at) AS earliest_customer,
MAX(updated_at) AS latest_update
FROM customers_awap;
-- Compare staging vs production counts
SELECT
'WAP' AS pipeline,
(SELECT COUNT(*) FROM wap_customers_staging) AS staging_count,
(SELECT COUNT(*) FROM customers) AS production_count,
(SELECT COUNT(*) FROM wap_customers_staging) -
(SELECT COUNT(*) FROM customers) AS diff
UNION ALL
SELECT
'AWAP' AS pipeline,
(SELECT COUNT(*) FROM awap_customers_staging) AS staging_count,
(SELECT COUNT(*) FROM customers_awap) AS production_count,
(SELECT COUNT(*) FROM awap_customers_staging) -
(SELECT COUNT(*) FROM customers_awap) AS diff;
Simulate Test Failures
Insert bad data to verify your quality gates work:
-- Insert invalid email
INSERT INTO raw.customers (customer_id, email, created_at, updated_at)
VALUES (99999, 'invalid-email', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);
-- Run pipeline - should fail at validation stage
#+import "data_quality_tutorial/wap_pipeline"
{{ wap_pipeline.Run() }}
-- Clean up
DELETE FROM raw.customers WHERE customer_id = 99999;
Best Practices
1. Test Naming Conventions
Use descriptive prefixes to organize tests:
Input*for pre-transformation validationOutput*for post-transformation validationNoNull*,Valid*,Check*for specific validation types
2. Incremental Loading
Use WHERE clauses to process only new/changed records:
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01')
FROM staging_table
)
3. Fail-Fast vs Fail-Tolerant
- Use
onFailure=test.Stopfor critical quality gates - Use
onFailure=test.Continuefor monitoring and alerting - Always use
test.Stopfor input validation in AWAP
4. Test Completeness
Design tests that:
- Return empty results on success
- Return failing rows with diagnostic information
- Include row counts and affected IDs for debugging
5. Documentation
Use #+meta blocks to document:
- Purpose of each transformation
- Business rules being validated
- Expected data volumes and SLAs
When to Use Each Pattern
Choose WAP When:
- Source data quality is generally good
- Transformation logic is the primary concern
- You need efficient, single-stage validation
- Compute costs are not a primary concern
Choose AWAP When:
- Source data quality is inconsistent or unknown
- Transformations are computationally expensive
- Early failure detection saves significant resources
- You need comprehensive end-to-end validation
- Regulatory compliance requires input validation
Performance Considerations
| Aspect | WAP | AWAP |
|---|---|---|
| Compute Cost | Moderate | Higher |
| Storage Cost | Lower | Lower |
| Failure Detection | Post-transformation | Pre and post-transformation |
| Wasted Compute | Higher (transforms bad data) | Lower (fails before transform) |
| Complexity | Lower | Higher |
| Best For | Stable sources | Volatile sources |
Scheduling and Automation
For CoginitiScript Team/Enterprise editions, schedule pipelines through the UI:
- Navigate to your pipeline file (
wap_pipelineorawap_pipeline) - Click "Schedule" in the toolbar
- Configure frequency (hourly, daily, weekly)
- Set timezone and execution window
- Enable notifications for test failures
The scheduler automatically resolves dependencies and executes blocks in the correct order.
Troubleshooting
Pipeline Fails at Test Stage
Check test results:
-- Tests return failing rows, not error messages
-- Review the actual data that failed validation
Common issues:
- Null values in required fields → Check source data quality
- Duplicate keys → Review transformation logic for unintended duplicates
- Invalid formats → Add cleaning logic in transformation block
Staging Populates but Production is Empty
Verify publication metadata:
- Ensure production blocks have
#+meta { :publication {...} } - Check that
publication.Run()includes production packages
Tests Pass but Data Quality is Poor
Expand test coverage:
- Add statistical checks (mean, median, outliers)
- Implement cross-field validation
- Add trend analysis (compare to historical patterns)
Summary
This tutorial demonstrated how to implement enterprise-grade data quality patterns using CoginitiScript:
- WAP provides efficient post-transformation validation
- AWAP extends WAP with pre-transformation validation for fail-fast behavior
- CoginitiScript's test framework enables declarative quality gates
- Block composition and publication metadata create maintainable pipelines
- Parallel execution optimizes multi-table workflows
These patterns ensure that only validated, high-quality data reaches your production tables, reducing downstream errors and increasing trust in your analytics.
Next Steps
- Extend patterns to your organization's data entities
- Implement custom tests for business-specific rules
- Create reusable test libraries for common validations
- Integrate with alerting systems for test failure notifications
- Measure pipeline success rates and data quality metrics over time