Process large datasets with LLM blocks
LLM blocks are scheduled to ship in the Coginiti 26.6 release. This tutorial is published as a preview so you can plan ahead — the feature is not yet available in earlier versions, and some details may change before general availability.
When using LLM blocks to analyze data, embedding an entire dataset into a single prompt with print.Csv() can exceed token limits or degrade result quality. This tutorial teaches four patterns that break large datasets into manageable pieces, each sending bounded prompts to the LLM.
Prerequisites
- A Coginiti workspace with LLM blocks enabled
- Familiarity with CoginitiScript blocks and imports
- Familiarity with LLM block basics
Project setup
Create a new project called llm_streaming in your catalog. The project.toml defines the project name used in import paths:
[general]
name = "llm_streaming"
description = "LLM streaming tutorial project"
version = "1.0.0"
Create the following scripts inside the project:
llm_streaming/
project.toml
seed -- Seed data generation and publication
row_level -- Pattern 1: Row-level mapping
fixed_batching -- Pattern 2: Fixed-size batching
dynamic_batching -- Pattern 3: Data-driven batching
incremental -- Pattern 4: Incremental publication
All scripts share the same package, so public blocks (names starting with uppercase) defined in any script are accessible from all other scripts without imports. For example, SupportTickets() defined in seed is directly available in row_level, fixed_batching, and so on.
The problem
A common approach to LLM-based analysis embeds all data into one prompt:
#+import "std/print"
#+src sql AllTickets()
#+begin
SELECT ticket_id, subject, body FROM {{ SupportTickets() }}
#+end
#+src llm ClassifyAll()
#+meta {
:schema {
:columns [{:name "ticket_id", :type "INT"},
{:name "category", :type "STRING"},
{:name "priority", :type "STRING"}]
}
}
#+begin
Classify each support ticket:
{{ print.Csv(AllTickets()) }}
#+end
This works for small datasets. When the source returns thousands of rows, the prompt exceeds token limits and the LLM call fails. Both print.Csv() and iterator() with #+for expand the entire resultset into prompt text before the LLM sees anything.
The four patterns below solve this by keeping each LLM prompt bounded.
Generate seed data
Use an LLM block to generate 50 realistic support tickets and publish them to a table. This gives the streaming patterns a meaningful dataset to work with.
Create the seed script inside your project:
#+import "std/publication"
#+src llm SupportTickets()
#+meta {
:schema {
:columns [
{:name "ticket_id", :type "INT", :description "Sequential ID starting from 1"},
{:name "subject", :type "STRING", :description "Short subject line, under 60 characters"},
{:name "body", :type "STRING", :description "Ticket body, 1-3 sentences"}
]
},
:publication {
:type "table",
:name "support_tickets"
}
}
#+begin
Generate 50 realistic customer support tickets for a SaaS analytics platform.
Include a realistic mix of:
- Bug reports (app crashes, errors, broken features)
- Feature requests (dark mode, integrations, export options)
- Billing issues (double charges, refund requests, plan changes)
- How-to questions (documentation, setup help, configuration)
- Positive feedback (praise, success stories)
Vary the tone, urgency, and detail level. Some tickets should be brief,
others more detailed with context.
#+end
Publish the seed data:
{{ publication.Run(blocks=[SupportTickets]) }}
Verify the result:
#+src sql TicketCount()
#+begin
SELECT COUNT(*) AS total_tickets FROM support_tickets
#+end
You now have a support_tickets table that all four patterns will process.
Step 1: Row-level mapping
The simplest pattern: a parameterized LLM block processes one ticket at a time. A SQL block iterates over tickets, calls the LLM block per row, and combines results with UNION ALL.
Create the row_level script:
#+src llm ClassifyOne(ticket_id, subject, body)
#+meta {
:schema {
:columns [
{:name "ticket_id", :type "INT"},
{:name "category", :type "STRING",
:description "bug, feature_request, billing, question, or praise"},
{:name "priority", :type "STRING",
:description "high, medium, or low"},
{:name "sentiment", :type "STRING",
:description "positive, neutral, or negative"}
]
}
}
#+begin
Classify this support ticket.
Ticket ID: {{ ticket_id }}
Subject: {{ subject }}
Body: {{ body }}
Return the ticket_id, category, priority, and sentiment.
#+end
#+src sql ClassifyRowLevel()
#+begin
#+for t : iterator(SupportTickets()) separator " UNION ALL " do
SELECT * FROM {{ ClassifyOne(t["ticket_id"], t["subject"], t["body"]) }}
#+end
#+end
Run ClassifyRowLevel() to see the results.
Each iteration generates a separate LLM call with a single ticket in the prompt. The #+for loop expands at preprocessing time, producing one SELECT * FROM ... per ticket joined by UNION ALL.
Each of the 50 tickets gets its own LLM call, producing a 50-way UNION ALL chain. In production datasets with thousands of rows, cost and SQL complexity grow accordingly.
When to use: Tasks where each row needs isolated, deep analysis — classification, extraction, translation. Prompt size per call is minimal.
Trade-offs: N rows means N LLM calls. Cost and latency scale linearly. The generated UNION ALL chain can hit SQL limits for very large datasets. Use the batching patterns below to reduce the number of calls.
Step 2: Fixed-size batching
Reduce the number of LLM calls by processing multiple rows per call. Each batch uses print.Csv() to embed a bounded slice of data.
Create the fixed_batching script:
#+import "std/print"
#+const
Offsets = [0, 10, 20, 30, 40];
BatchSize = 10;
#+end
#+src sql TicketBatch(batchSize, offset)
#+begin
SELECT ticket_id, subject, body
FROM {{ SupportTickets() }}
ORDER BY ticket_id
LIMIT {{ batchSize }} OFFSET {{ offset }}
#+end
#+src llm ClassifyBatch(batchSize, offset)
#+meta {
:schema {
:columns [
{:name "ticket_id", :type "INT"},
{:name "category", :type "STRING",
:description "bug, feature_request, billing, question, or praise"},
{:name "priority", :type "STRING",
:description "high, medium, or low"},
{:name "sentiment", :type "STRING",
:description "positive, neutral, or negative"}
]
}
}
#+begin
Classify each support ticket by category, priority, and sentiment.
Return one row per ticket.
{{ print.Csv(TicketBatch(batchSize, offset)) }}
#+end
#+src sql ClassifyBatched()
#+begin
#+for offset : Offsets separator " UNION ALL " do
SELECT * FROM {{ ClassifyBatch(BatchSize, offset) }}
#+end
#+end
Run ClassifyBatched() to see the results.
50 tickets processed in 5 LLM calls (10 per call) instead of 50. Each prompt contains at most 10 tickets worth of CSV data.
When to use: When you know the approximate dataset size and want a simple, predictable batching strategy.
Trade-offs: Batch offsets are hardcoded. If the dataset grows beyond 50 rows, you must update the Offsets constant. The next pattern eliminates this limitation.
Step 3: Data-driven batching
Eliminate hardcoded offsets by computing batch boundaries from the data itself.
Create the dynamic_batching script:
#+import "std/print"
#+src sql BatchRanges(batchSize)
#+begin
WITH numbered AS (
SELECT
ticket_id,
FLOOR((ROW_NUMBER() OVER (ORDER BY ticket_id) - 1) * 1.0 / {{ batchSize }}) AS batch_num
FROM {{ SupportTickets() }}
)
SELECT MIN(ticket_id) AS min_id, MAX(ticket_id) AS max_id
FROM numbered
GROUP BY batch_num
ORDER BY batch_num
#+end
#+src sql TicketRange(min_id, max_id)
#+begin
SELECT ticket_id, subject, body
FROM {{ SupportTickets() }}
WHERE ticket_id BETWEEN {{ min_id }} AND {{ max_id }}
#+end
#+src llm ClassifyRange(min_id, max_id)
#+meta {
:schema {
:columns [
{:name "ticket_id", :type "INT"},
{:name "category", :type "STRING",
:description "bug, feature_request, billing, question, or praise"},
{:name "priority", :type "STRING",
:description "high, medium, or low"},
{:name "sentiment", :type "STRING",
:description "positive, neutral, or negative"}
]
}
}
#+begin
Classify each support ticket by category, priority, and sentiment.
Return one row per ticket.
{{ print.Csv(TicketRange(min_id, max_id)) }}
#+end
#+src sql ClassifyDynamic()
#+begin
#+for b : iterator(BatchRanges(10)) separator " UNION ALL " do
SELECT * FROM {{ ClassifyRange(b["min_id"], b["max_id"]) }}
#+end
#+end
Run ClassifyDynamic() to see the results.
BatchRanges(10) computes the min and max ticket IDs for each batch of 10. The #+for loop iterates over these ranges, and each ClassifyRange() call processes only tickets within its range. If the table grows from 50 to 50,000 tickets, the batching adapts automatically — no constant updates needed. Change the batch size argument to control how many rows each LLM call handles.
When to use: Production pipelines where dataset size varies over time.
Trade-offs: Adds a query to compute batch boundaries. Gaps in IDs can create uneven batches — consider using ROW_NUMBER()-based pagination instead of ID ranges if your keys are sparse.
Step 4: Incremental publication
Process new data across separate runs rather than all at once. Each run handles a bounded batch and appends results to a target table.
Create the incremental script:
#+import "std/print"
#+import "std/publication"
#+src sql UnprocessedTickets()
#+begin
SELECT ticket_id, subject, body
FROM {{ SupportTickets() }}
#+if publication.Incremental() then
WHERE ticket_id > (
SELECT COALESCE(MAX(ticket_id), 0)
FROM {{ publication.Target() }}
)
#+end
ORDER BY ticket_id
LIMIT 10
#+end
#+src llm ClassifyIncremental()
#+meta {
:schema {
:columns [
{:name "ticket_id", :type "INT"},
{:name "category", :type "STRING",
:description "bug, feature_request, billing, question, or praise"},
{:name "priority", :type "STRING",
:description "high, medium, or low"},
{:name "sentiment", :type "STRING",
:description "positive, neutral, or negative"}
]
},
:publication {
:type "table",
:name "classified_tickets",
:incremental "append"
}
}
#+begin
Classify each support ticket by category, priority, and sentiment.
Return one row per ticket.
{{ print.Csv(UnprocessedTickets()) }}
#+end
Run publications with:
{{ publication.Run(blocks=[ClassifyIncremental]) }}
On the first run, publication.Incremental() returns false (the target table does not exist yet), so UnprocessedTickets() returns the first 10 tickets without a filter. Results are published to classified_tickets.
On subsequent runs, publication.Incremental() returns true. The WHERE clause skips already-processed tickets by comparing against the highest ticket_id in the target table. Each run processes the next 10 unprocessed tickets. After 5 runs, all 50 tickets are classified.
Schedule this as a Coginiti Action to continuously classify new tickets as they arrive.
When to use: Continuous pipelines where new data arrives over time — ticket classification, log analysis, daily report generation.
Trade-offs: Requires multiple runs and external orchestration. Results accumulate across runs, so the target table must be managed over time.
Choosing a pattern
| Row-level | Fixed batching | Data-driven batching | Incremental | |
|---|---|---|---|---|
| LLM calls | 1 per row | 1 per batch | 1 per batch | 1 per run |
| Prompt size | Single row | Bounded | Bounded | Bounded |
| Adapts to data growth | Yes | No | Yes | Yes |
| Cross-row context | None | Within batch | Within batch | Within batch |
| Requires multiple runs | No | No | No | Yes |
| Best for | Per-row classification | Known-size datasets | Variable-size datasets | Continuous pipelines |
These patterns can be combined. For example, use data-driven batching (step 3) within an incremental publication (step 4) to process large incoming batches in bounded chunks across multiple runs.
What you've learned
- Generating seed data with LLM blocks and publishing to a table
- Row-level mapping with
#+for,iterator(), and parameterized LLM blocks - Fixed-size and data-driven batching with
print.Csv()and bounded queries - Incremental publication for continuous processing across multiple runs
- How to choose a pattern based on dataset size, prompt limits, and pipeline requirements
Next steps
- LLM blocks reference for complete syntax
- Publication reference for incremental strategies
- Coginiti Actions for scheduling incremental runs
- LLM blocks tutorial for foundational LLM block patterns