Skip to main content

CoginitiScript Reference

CoginitiScript is a block-based scripting language designed for data engineering and analytics workflows. It encourages code reusability, modular design, and follows software engineering best practices like the DRY (Don't Repeat Yourself) principle.

Overview

CoginitiScript is built around blocks - self-contained units of source code where each block represents code in a specific language. This design makes CoginitiScript files backward-compatible with existing language files:

  • Any SQL file is a valid CoginitiScript file
  • Any Python file is a valid CoginitiScript file
  • Any supported language file is a valid CoginitiScript file

Block Structure

Each block is defined using the following syntax:

#+src language name([arg=val...])[: returntype]
#+begin
... source code here
#+end

Source code outside defined blocks is treated as a default block (typically SQL). For example:

SELECT 1;
SELECT 2;

#+src sql foo()
#+begin
SELECT 3;
#+end

SELECT * FROM {{ foo() }};

This represents three blocks:

  • Default SQL block with "SELECT 1" and "SELECT 2"
  • Named SQL block "foo" with "SELECT 3"
  • Default SQL block with "SELECT * FROM {{ foo() }}"

Code Blocks

Block Definition

Blocks are defined with a language, optional name, optional parameters, and optional return type:

#+src sql SalesPerStore()
#+begin
SELECT
s.name AS store_name,
SUM(s.quantity * s.price) AS total_sales_amount
FROM
fact_sales AS s
INNER JOIN dim_store AS ds
ON s.store_id = ds.id
GROUP BY
s.name
#+end

Block Invocation

Reference blocks using {{ block-name(args) }} syntax:

SELECT * FROM {{ SalesPerStore() }} WHERE store_name = 'Main Store';

Block Execution

  • Anonymous blocks: Execute immediately when the script runs
  • Named blocks: Execute only when referenced via {{ block-name(args) }}
  • Development execution: Blocks execute when cursor is inside and run-at-cursor is used
-- This executes immediately
#+src sql
#+begin
SELECT 1;
#+end

-- This executes only when called
#+src sql selectOne()
#+begin
SELECT 1;
#+end

SELECT * FROM {{ selectOne() }}; -- Execution happens here

Block Parameters

Blocks can accept zero or more parameters:

#+src sql GetCustomerByEmail(country, domain)
#+begin
SELECT *
FROM dim_customers
WHERE
country = '{{ country }}'
AND email LIKE '%{{ domain }}'
#+end

Calling with positional arguments:

SELECT * FROM {{ GetCustomerByEmail("USA", "gmail.com") }};

Calling with named arguments:

SELECT * FROM {{ GetCustomerByEmail(country="USA", domain="gmail.com") }};
note

You cannot mix positional and named arguments in a single block call.

Return Types

Blocks support two return types:

recordset (default)

Returns the result of the last SELECT statement:

#+src sql CustomerData()
#+begin
-- Multiple statements allowed
CREATE TEMP TABLE temp_customers AS SELECT * FROM customers;

-- This result is returned
SELECT * FROM temp_customers WHERE active = true;
#+end

void

Performs actions without returning values:

#+src sql GenCustomersTable(cnt, minAge, maxAge): void
#+meta {
:doc "Generate customers table with random data"
}
#+begin
DROP TABLE IF EXISTS customers;
CREATE TABLE customers(
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER
);

INSERT INTO customers
SELECT
generate_series AS id,
'Customer ' || generate_series AS name,
FLOOR(RANDOM() * ({{ maxAge }} - {{ minAge }} + 1))::INTEGER + {{ minAge }} AS age
FROM generate_series(1, {{ cnt }});
#+end

Execution Mechanics

When a block is referenced, CoginitiScript materializes the block's results using one of several strategies, depending on the SQL complexity and platform capabilities:

Default Strategy: CTE (Common Table Expression)

By default, CoginitiScript uses CTEs for referenced blocks:

-- Original code
#+src sql selectOne()
#+begin
SELECT 1 AS one;
#+end

SELECT * FROM {{ selectOne() }};
-- Generated SQL (CTE approach)
WITH _selectOne AS (
SELECT 1 AS one
)
SELECT * FROM _selectOne;

Fallback Strategy: Temporary Tables

When the final SQL becomes too complex for CTEs or platform limitations prevent CTE usage, CoginitiScript falls back to temporary tables:

-- Generated SQL (temp table approach)
CREATE TEMPORARY TABLE _random_name_ AS
SELECT 1 AS one;

SELECT * FROM _random_name_;

Ephemeral Tables

For certain platforms or complex scenarios, CoginitiScript may create standard tables that are automatically cleaned up after execution (ephemeral tables):

-- Generated SQL (ephemeral table approach)  
CREATE TABLE _ephemeral_random_name_ AS
SELECT 1 AS one;

SELECT * FROM _ephemeral_random_name_;

-- Automatic cleanup
DROP TABLE _ephemeral_random_name_;

The strategy selection is automatic and transparent to the user, ensuring optimal performance while maintaining consistent behavior across different database platforms.

Limitations

  • Snowflake: Requires explicit column aliases in SELECT statements for referenced blocks
  • Explain plans: Cannot be executed for statements with block references

Packages

A package is a directory containing CoginitiScript files and/or other packages. Files in the catalog root belong to the "main" package.

Public/Private Blocks

CoginitiScript follows Go-like identifier export rules:

  • Public: Identifiers starting with uppercase letters
  • Private: All other identifiers
#+src sql foo()  -- Private
#+begin
SELECT 1;
#+end

#+src sql Bar() -- Public
#+begin
SELECT 2;
#+end

Importing Packages

Use the #+import directive:

#+import "sales/fact_sales"
#+import "sales/customer/dim_customer"
#+import "sales/fact_sales" as sales -- With alias

Reference imported blocks with package qualification:

#+import "sales/customer"
#+import "sales/fact_sales" as sales

SELECT * FROM {{ customer.CustomerDimension() }};
SELECT * FROM {{ sales.SalesInteraction() }};
note

Package names with spaces or special characters require aliases:

#+import "Users/username/Sales Interaction" as sales
SELECT * FROM {{ sales.BlockName() }};

Metadata Properties

Blocks support arbitrary metadata as key-value pairs:

#+src sql SalesPerCountry()
#+meta {
:author "John Doe",
:version "1.2.0",
:last_updated "2024-01-15"
}
#+begin
SELECT country, SUM(sales) FROM fact_sales GROUP BY country;
#+end

Documentation

Use the :doc metadata key for documentation:

#+src sql CalculateMetrics(start_date, end_date)
#+meta {
:doc "Calculate performance metrics for a date range.

This function computes key performance indicators including revenue,
conversion rates, and customer acquisition costs for the specified
date range.

Parameters:
- `start_date`: Start date for the analysis (YYYY-MM-DD format)
- `end_date`: End date for the analysis (YYYY-MM-DD format)

Returns a recordset with metrics aggregated by day."
}
#+begin
SELECT
DATE(created_at) as metric_date,
SUM(revenue) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM fact_orders
WHERE DATE(created_at) BETWEEN '{{ start_date }}' AND '{{ end_date }}'
GROUP BY DATE(created_at)
ORDER BY metric_date;
#+end

Multiline String Literals

CoginitiScript supports multiline strings with automatic indentation handling:

SELECT * FROM {{ block("
line 1
line 2
line 3
line 4
line 5
") }}

The re-indentation algorithm removes consistent leading whitespace to produce:

line 1
line 2
line 3
line 4
line 5

Publication

Publication materializes block results as external database objects or files.

Database Publication

Table Publication

#+src sql salesFact()
#+meta {
:publication {
:type "table",
:name "mart_sales_fact",
:schema "analytics" -- Optional
}
}
#+begin
SELECT * FROM raw_sales_data;
#+end

View Publication

#+src sql customerSummary()
#+meta {
:publication {
:type "view",
:name "customer_summary_view"
}
}
#+begin
SELECT customer_id, SUM(order_value) as total_value
FROM orders GROUP BY customer_id;
#+end

File Publication

CSV Publication

#+src sql salesReport()
#+meta {
:publication {
:type "csv",
:path "/reports/sales_report.csv",
:connection "S3 Connection", -- Optional
:options {
:delimiter ",",
:null_value "",
:quote_char "\"",
:overwrite true,
:header true,
:encoding "UTF-8",
:compression "GZIP"
}
}
}
#+begin
SELECT * FROM sales_summary;
#+end
Windows Paths

Use raw strings (backticks) for Windows paths to avoid escaping issues:

:path `C:\Projects\Reports\sales_report.csv`

Parquet Publication

#+src sql salesData()
#+meta {
:publication {
:type "parquet",
:path "/data/sales_data.parquet",
:options {
:row_group_size 134217728, -- 128 MiB
:page_size 1048576, -- 1 MiB
:overwrite false,
:compression "snappy" -- snappy, gzip, none
}
}
}
#+begin
SELECT * FROM fact_sales;
#+end

Object Store Publication

CoginitiScript supports publishing to cloud object stores like Amazon S3, Azure Blob Storage, and Google Cloud Storage:

#+src sql DataExport()
#+meta {
:publication {
:type "csv",
:path "analytics-bucket/exports/daily_report.csv",
:connection "AWS S3 Connection"
}
}
#+begin
SELECT
customer_id,
order_date,
total_amount
FROM fact_orders
WHERE order_date = CURRENT_DATE;
#+end

Connection Configuration: Object store connections must be configured with appropriate credentials and permissions in Coginiti before use. The connection name references the configured connection profile.

Referencing Published Blocks

When referencing blocks with database publication, the published table/view is used instead of executing the block:

-- This query
SELECT * FROM {{ salesFact() }};

-- Becomes this
SELECT * FROM mart_sales_fact;
warning

Referencing blocks with file publication (CSV/Parquet) is not supported and will result in a runtime error.

Incremental Publication

Incremental publication publishes only new data since the last run. Three strategies are supported:

Append Strategy

#+src sql DailyActiveUsers()
#+meta {
:publication {
:type "table",
:name "daily_active_users",
:incremental "append"
}
}
#+begin
SELECT
DATE_TRUNC('day', visit_date) as date_day,
COUNT(DISTINCT user_id) AS users_count
FROM visits
#+if publication.Incremental() then
WHERE visit_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }})
#+end
GROUP BY date_day;
#+end

Merge Strategy

#+src sql DailyMetrics()
#+meta {
:publication {
:type "table",
:name "daily_metrics",
:incremental "merge",
:unique_key ["date_day"]
}
}
#+begin
SELECT
DATE_TRUNC('day', event_date) as date_day,
COUNT(*) as event_count
FROM events
#+if publication.Incremental() then
WHERE event_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }})
#+end
GROUP BY date_day;
#+end

Conditional Merge Strategy

#+src sql CustomerDimension()
#+meta {
:publication {
:type "table",
:name "dim_customer",
:incremental "merge_conditionally",
:unique_key ["customer_id"],
:update_on_changes_in ["first_name", "last_name", "email"]
}
}
#+begin
SELECT
customer_id,
first_name,
last_name,
email,
updated_at
FROM source_customers;
#+end

Executing Publications

Use the std/publication package to execute publications programmatically:

#+import "std/publication"
#+import "analytics/sales"
#+import "analytics/customers"

{{
publication.Run(
blocks=[sales.SalesDetail, sales.SalesHeader],
packages=[customers],
postOp=grantSelectPermissions
)
}}

#+src sql grantSelectPermissions(): void
#+begin
#+if publication.Type() == "table" || publication.Type() == "view" then
GRANT SELECT ON {{ publication.Target() }} TO GROUP "analysts";
#+end
#+end

Parameters:

  • blocks: List of specific code blocks to publish (each block must contain publication metadata configuration)
  • packages: List of packages to publish (all blocks with publication metadata)
  • parallelism: Maximum degree of concurrency for publication block execution within each dependency step (default: 1 (sequential execution))
  • fullRefresh: Boolean flag to force full refresh regardless of incremental settings (default: false)
  • beforeAll: Block to execute before any publications start (must return void)
  • beforeEach: Block to execute before each publication starts (must return void)
  • afterEach: Block to execute after each publication completes (must return void)
  • afterAll: Block to execute after all publications complete (must return void)
  • postOp: [DEPRECATED] Use afterEach instead - alias for afterEach parameter
Parameter Requirements
  • At least one of blocks or packages must be provided
  • All lifecycle blocks (beforeAll, beforeEach, afterEach, afterAll, postOp) must return void
  • Only user-defined packages are allowed (not built-in/standard library packages)

Parallelism parameter

The parallelism parameter controls the level of concurrent execution when running publication blocks. It determines how many publication blocks can be executed simultaneously within each execution step of the publication dependency graph.

note

A step in the execution block graph represents a set of blocks that have no dependencies on each other and can execute safely in parallel. The system automatically analyzes block dependencies and groups blocks into sequential steps, where:

  • All blocks within a step can run concurrently
  • All dependencies from previous steps must complete before the next step begins
  • Each step respects the dependency constraints defined by block references

Consider publication blocks with dependencies:

  • a -> d -> g
  • b -> d -> g
  • c -> e
  • f -> h
  • i (no dependencies)

Step grouping with parallelism > 1:

  • Step 1: [a, b, c, f, i] - No dependencies
  • Step 2: [d, e, h] - Dependencies from Step 1 satisfied
  • Step 3: [g] - Depends on d from Step 2

Accepted Values:

  • Range: Integer between 1 and 32 (inclusive)
  • Default behavior: When null or not provided, defaults to 1 (sequential execution)

Execution Behavior:

  • parallelism = 1: blocks execute one after another
  • parallelism > 1: blocks within each step execute concurrently using a thread pool

Performance Benefits:

  1. Faster Execution: When parallelism > 1, independent publication blocks within the same execution step run concurrently, significantly reducing overall execution time
  2. Resource Utilization: Better utilizes available system resources (CPU, database connections) by executing multiple blocks simultaneously
  3. Scalability: Allows scaling publication execution based on system capacity and workload requirements

Use Cases:

  • Large Publications: When publishing many independent tables/views, parallel execution can dramatically reduce total runtime
  • Resource-Rich Environments: Systems with sufficient database connections and processing power can benefit from higher parallelism
  • Time-Sensitive Workflows: Critical data pipelines that need to complete within specific time windows

Example Usage:

-- Execute up to 4 blocks concurrently
{{
publication.Run(
blocks=[customers.CustomerAnalysis, sales.SalesReport],
packages=[inventory, finance],
parallelism=4
)
}}

Important Considerations:

  • Dependencies: Blocks with dependencies still execute in the correct order - parallelism only affects independent blocks within the same execution step
  • Resource Limits: Higher parallelism requires more database connections and system resources
  • Error Handling: If any block fails during parallel execution, the entire publication run is aborted

Built-in functions available:

  • publication.Target(): Returns the target table/view name
  • publication.Type(): Returns publication type ("table", "view", "csv", "parquet")
  • publication.Incremental(): Returns true if block should execute in incremental mode, false otherwise

publication.Incremental() Function

The publication.Incremental() function is a built-in function available within publication blocks that returns a boolean indicating whether the current block should execute in incremental mode.

Function Behavior:

Returns: true if the block should execute incrementally, false for full refresh

Incremental mode is enabled when ALL of the following conditions are met:

  1. The block has incremental publication configuration (:incremental "append" or :incremental "merge")
  2. The fullRefresh parameter is not set to true in publication.Run()
  3. The target table/view already exists in the database

Usage Context:

  • Only available within publication blocks during execution
  • Cannot be called outside of publication context
  • Typically used in conditional #+if statements to modify query behavior

Example Usage:

The function is commonly used to implement different logic for initial vs. incremental runs:

#+src sql DailyActiveUsers()
#+meta {
:publication {
:type "table",
:name "daily_active_users",
:incremental "append"
}
}
#+begin
SELECT
DATE_TRUNC('day', visit_date) as date_day,
COUNT(DISTINCT user_id) AS users_count,
'{{ publication.Incremental() }}' AS was_incremental -- Shows true/false
FROM visits
#+if publication.Incremental() then
WHERE visit_date >= (SELECT MAX(date_day) FROM {{ publication.Target() }})
#+end
GROUP BY date_day;
#+end

Truth Table:

Incremental ConfigfullRefreshTarget ExistsResult
"append"falsetruetrue
"append"falsefalsefalse
"append"truefalsefalse
"append"truetruefalse
"merge"falsetruetrue
"merge"falsefalsefalse
"merge"truefalsefalse
"merge"truetruefalse
(none)anyanyfalse

Error Conditions:

  • Execution Context Error: Function must be called from within an execution context
  • Publication Context Error: Function must be called from within a publication block context

Tests

CoginitiScript supports built-in testing for data quality and validation.

Defining Tests

#+test sql TestCustomerEmailFormat()
#+begin
SELECT * FROM {{ customerData() }} WHERE email IS NULL OR email NOT LIKE '%@%';
#+end

Test Results

Tests return three types of objects:

  • Nil or empty Dataset: Test passes
  • Non-empty Dataset or Error: Test fails

Running Tests

Individual test: Place cursor on test block and execute

All tests: Use run-all command

Programmatic execution:

#+import "std/test"
#+import "data_quality/customers"
#+import "data_quality/orders"

-- Run specific tests
{{ test.Run(tests=[customers.TestEmailFormat, customers.TestPhoneNumbers]) }}

-- Run all tests in packages
{{ test.Run(packages=[customers, orders]) }}

-- Continue execution even if tests fail
{{ test.Run(
packages=[customers],
onFailure=test.Continue
) }}

Options for onFailure:

  • test.Stop: Stop execution on test failure (default)
  • test.Continue: Continue execution despite test failures

Expressions, Loops, and Conditions

CoginitiScript supports template preprocessing with control flow constructs.

Expressions

Interpolate values using {{ }} delimiters:

SELECT {{ 1 + 1 }}     -- SELECT 2
SELECT {{ 2 > 5 }} -- SELECT false
SELECT {{ "string" }} -- SELECT string
note

String values are inserted without quotes to allow dynamic identifier generation:

#+const
columnPrefix = "sales";
#+end

SELECT {{ columnPrefix }}_amount FROM fact_sales;
-- Generates: SELECT sales_amount FROM fact_sales;

For quoted strings in SQL:

SELECT '{{ paymentType }}';  -- SELECT 'credit_card';

Conditions

Use #+if statements for conditional code execution:

#+if debug == true then
SELECT 'Debug mode enabled';
#+else
SELECT 'Production mode';
#+end

Loops

Iterate over collections with #+for:

#+for item : ["col1", "col2", "col3"] separator "," do
{{ item }}
#+end

-- Usage in SQL
SELECT
#+for field : ["name", "email", "phone"] separator "," do
{{ field }}
#+end
FROM customers;

Iterator Function

Process block results dynamically:

#+src sql GetReportDates()
#+begin
SELECT report_date FROM active_reports;
#+end

SELECT *
FROM fact_sales
WHERE transaction_date IN (
#+for row : iterator(GetReportDates()) separator ", " do
'{{ row["report_date"] }}'
#+end
);

Constants

Declare reusable constants with #+const:

#+const
userLimit = 100;
emailDomain = "company.com";
reportFields = ["name", "email", "signup_date"];
config = { :environment "production", :debug false };
#+end

SELECT
#+for field : reportFields separator "," do
{{ field }}
#+end
FROM users
WHERE email LIKE '%@{{ emailDomain }}'
LIMIT {{ userLimit }};

Supported data types:

  • Integers: 42, 1000
  • Floats: 3.14, 0.75
  • Strings: "hello world"
  • Keywords: :environment, :debug
  • Lists: ["item1", "item2"]
  • Maps: { :key "value", :count 5 }

Public/Private constants:

  • Public: Constants starting with uppercase letters
  • Private: All other constants

Macros

Macros provide code inlining for reusable snippets:

#+macro countryGroup(country)
#+meta {
:doc "Returns CASE statement for country grouping"
}
#+begin
CASE
WHEN {{ country }} IN ('US', 'CA') THEN 'North America'
WHEN {{ country }} IN ('GB', 'FR', 'DE') THEN 'Europe'
ELSE 'Other'
END
#+end

SELECT
country,
{{ countryGroup(country="country") }} AS region,
SUM(revenue) AS total_revenue
FROM sales
GROUP BY country, region;

Query Tags

Query tags add metadata to SQL queries for monitoring, cost allocation, and audit purposes.

Defining Query Tags

#+src sql AnalyticsQuery()
#+meta {
:query_tags {
:department "analytics",
:project "q4_report",
:priority 1,
:automated true
}
}
#+begin
SELECT * FROM sales_data;
#+end

Platform-Specific Implementation

Snowflake:

ALTER SESSION SET query_tag = '{"department":"analytics","project":"q4_report","priority":1,"automated":true}';
SELECT * FROM sales_data;
ALTER SESSION UNSET query_tag;

BigQuery:

SET @@query_label = "department:analytics,project:q4_report,priority:1,automated:true";
SELECT * FROM sales_data;

Redshift:

SET query_group TO '{"department":"analytics","project":"q4_report","priority":1,"automated":true}';
SELECT * FROM sales_data;
SET query_group TO 'default';

Querying Tagged Queries

Snowflake:

SELECT query_tag, query_id, execution_time 
FROM snowflake.account_usage.query_history
WHERE query_tag IS NOT NULL
ORDER BY start_time DESC;

BigQuery:

SELECT j.labels, j.job_id, j.total_bytes_processed 
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_USER j
WHERE j.labels IS NOT NULL
ORDER BY creation_time DESC;

Standard Library Packages

std/time Package

The std/time package provides date and time manipulation functions:

#+import "std/time"

#+src sql DailyReport()
#+meta {
:publication {
:type "table",
:name "report_" + time.Format(time.Now(), time.IsoBasicDate)
}
}
#+begin
SELECT
'{{ time.Format(time.NowUTC(), time.IsoDateTime) }}' as generated_at,
COUNT(*) as record_count
FROM source_data;
#+end

Key Functions:

  • time.Now(tz): Current time in specified timezone
  • time.NowUTC(): Current UTC time
  • time.Format(time, format): Format time value

Supported Formats:

  • time.IsoBasicDate: 20111203
  • time.IsoLocalDate: 2011-12-03
  • time.IsoLocalTime: 10:15:30
  • time.IsoDateTime: 2011-12-03T10:15:30+01:00[Europe/Paris]
  • time.IsoOrdinalDate: 2012-337
  • time.IsoWeekDate: 2012-W48-6

std/publication Package

Provides programmatic publication execution:

#+import "std/publication"

{{
publication.Run(
blocks=[sales.MonthlySummary],
packages=[analytics],
postOp=notifyCompletion
)
}}

std/test Package

Enables programmatic test execution:

#+import "std/test"

{{
test.Run(
tests=[validation.TestDataQuality],
packages=[compliance],
onFailure=test.Continue
)
}}

Best Practices

Code Organization

  1. Use meaningful block names that describe their purpose
  2. Group related blocks into packages
  3. Document blocks with :doc metadata
  4. Follow DRY principles by reusing blocks instead of duplicating code

Performance

  1. Use incremental publication for large datasets
  2. Consider publication strategy for frequently referenced blocks
  3. Optimize block dependencies to minimize execution time
  4. Use appropriate return types (void vs recordset)

Testing

  1. Write tests for critical data transformations
  2. Use descriptive test names that explain what is being tested
  3. Test edge cases and data quality constraints
  4. Integrate tests into CI/CD pipelines

Metadata Usage

  1. Add documentation to all public blocks
  2. Use query tags for monitoring and cost allocation
  3. Version your blocks using metadata
  4. Document parameters and return values

This reference provides comprehensive coverage of CoginitiScript features. For examples and tutorials, see the Getting Started Guide and How-to Guides.