Batch Iterator
Process one page at a time and call Continue-as-New with the next offset after each page so the Workflow's event history never grows without bound. With this method you can process infinite pages. Use this when your record set is arbitrarily large, you need a durable checkpoint after every page, and sequential page-by-page throughput is acceptable.
Overview
The Batch Iterator pattern processes a large record set one page at a time. Each Workflow run processes a single page and then calls Continue-as-New with the next offset, producing a chain of short-lived runs that together cover the entire record set without accumulating unbounded event history.
Problem
A single Workflow run is limited to 50,000 history events (aim for 2,000) and 2,000 in-flight Activities. Processing millions of records in one run is not possible within these bounds.
You need a way to process an arbitrarily large record set reliably, with the ability to resume from a checkpoint if the Workflow is interrupted, and without overwhelming downstream systems with a burst of concurrent requests.
Solution
Each Workflow run fetches one page of records using a persistent offset parameter, processes each record sequentially, and then calls continueAsNew with the incremented offset. The next run picks up exactly where the previous one left off.
Because each run processes only a bounded number of records, history stays well within limits. The offset acts as a durable checkpoint: if the Workflow is interrupted mid-page, the next run replays only from the start of the current page.
The following describes each step in the diagram:
- The Workflow starts with
offset=0and callsfetchPage(offset, pageSize)to retrieve the first page of records. - It processes each record in the page by executing the
processRecordActivity. - After the page is fully processed, it calls
continueAsNewwithoffset + pageSize, passing the updated offset to the next run. - The next run begins with a clean history and repeats the same steps for the next page.
- When
fetchPagereturns fewer records thanpageSize, the Workflow knows it has reached the last page and returns normally.
Implementation
The following examples show how each SDK implements the Batch Iterator pattern.
- TypeScript
- Python
- Go
- Java
// workflows.ts
import { continueAsNew, log, proxyActivities } from "@temporalio/workflow";
import type * as activities from "./activities";
import { PAGE_SIZE } from "./shared";
const { fetchPage, processRecord } = proxyActivities<typeof activities>({
startToCloseTimeout: "10 seconds",
});
export async function batchIteratorWorkflow(
offset: number = 0,
totalProcessed: number = 0
): Promise<number> {
const page = await fetchPage(offset, PAGE_SIZE);
for (const record of page) {
await processRecord(record);
totalProcessed++;
}
log.info(`Processed page at offset ${offset} (${page.length} records, running total: ${totalProcessed})`);
if (page.length === PAGE_SIZE) {
await continueAsNew<typeof batchIteratorWorkflow>(offset + PAGE_SIZE, totalProcessed);
}
return totalProcessed;
}
# workflows.py
from temporalio import workflow
from temporalio.workflow import continue_as_new
from datetime import timedelta
from activities import fetch_page, process_record
from shared import PAGE_SIZE
@workflow.defn
class BatchIteratorWorkflow:
@workflow.run
async def run(self, offset: int = 0, total_processed: int = 0) -> int:
page = await workflow.execute_activity(
fetch_page,
args=[offset, PAGE_SIZE],
start_to_close_timeout=timedelta(seconds=10),
)
for record in page:
await workflow.execute_activity(
process_record,
record,
start_to_close_timeout=timedelta(seconds=10),
)
total_processed += 1
workflow.logger.info(
f"Processed page at offset {offset} ({len(page)} records, running total: {total_processed})"
)
if len(page) == PAGE_SIZE:
continue_as_new(offset + PAGE_SIZE, total_processed)
return total_processed
// workflows.go
package main
import (
"go.temporal.io/sdk/workflow"
)
func BatchIteratorWorkflow(ctx workflow.Context, offset int, totalProcessed int) (int, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var page []Record
if err := workflow.ExecuteActivity(ctx, FetchPage, offset, PageSize).Get(ctx, &page); err != nil {
return totalProcessed, err
}
for _, record := range page {
if err := workflow.ExecuteActivity(ctx, ProcessRecord, record).Get(ctx, nil); err != nil {
return totalProcessed, err
}
totalProcessed++
}
workflow.GetLogger(ctx).Info("Processed page",
"offset", offset,
"pageSize", len(page),
"totalProcessed", totalProcessed)
if len(page) == PageSize {
return totalProcessed, workflow.NewContinueAsNewError(ctx, BatchIteratorWorkflow, offset+PageSize, totalProcessed)
}
return totalProcessed, nil
}
// BatchIteratorWorkflow.java
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.*;
import java.time.Duration;
import java.util.List;
@WorkflowInterface
public interface BatchIteratorWorkflow {
@WorkflowMethod
int run(int offset, int totalProcessed);
}
// BatchIteratorWorkflowImpl.java
public class BatchIteratorWorkflowImpl implements BatchIteratorWorkflow {
private final Activities activities = Workflow.newActivityStub(
Activities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.build()
);
@Override
public int run(int offset, int totalProcessed) {
List<Record> page = activities.fetchPage(offset, Shared.PAGE_SIZE);
for (Record record : page) {
activities.processRecord(record);
totalProcessed++;
}
Workflow.getLogger(BatchIteratorWorkflowImpl.class).info(
"Processed page at offset " + offset + " (" + page.size() + " records, total: " + totalProcessed + ")"
);
if (page.size() == Shared.PAGE_SIZE) {
throw Workflow.newContinueAsNewStub(BatchIteratorWorkflow.class)
.run(offset + Shared.PAGE_SIZE, totalProcessed);
}
return totalProcessed;
}
}
Best Practices
- Choose a page size that keeps history under 2,000 events. Each page produces roughly
3 × pageSizehistory events (ActivityTaskScheduled+ActivityTaskStarted+ActivityTaskCompleted). A page size of 500–800 records is a safe target. - Include
totalProcessed(or a similar counter) in thecontinueAsNewargs. This lets you observe overall progress via the Workflow input visible in the UI without querying internal state. - Fetch inside an Activity, not the Workflow. The
fetchPagecall must be an Activity — not inline Workflow code — so it can interact with external systems and be retried independently. - Make
processRecordidempotent. Activities have at-least-once execution semantics. If a worker crashes after an Activity completes externally but before the completion is recorded in history, Temporal will retry it. Your downstream system must tolerate receiving the same record more than once. - Avoid accumulating large local state between pages.
continueAsNewdoes not carry over in-memory state; only the arguments you pass are available in the next run.
Common Pitfalls
- Forgetting
continueAsNewon the last page. If you callcontinueAsNewunconditionally, the Workflow loops forever even when the data source is exhausted. Check whether the returned page is shorter thanpageSizebefore continuing. - Passing unnecessary state into
continueAsNew. All arguments are serialized and stored in history. Pass only the minimal state needed (offset, counters) — not accumulated result lists or large collections that grow with each page. - Sequential processing bottlenecks. The default implementation processes one record at a time per page. You can fan out Activities concurrently within a page using the SDK's async primitives for higher per-page throughput — note this increases per-page event count accordingly. If record-set-wide throughput matters more than rate limiting, consider Sliding Window or MapReduce Tree.
Related Resources
- Continue-as-New pattern — core concepts for history management via
continueAsNew - Sliding Window — bounded concurrency that progresses at the rate of the fastest processor
- MapReduce Tree — fully parallel processing for maximum speed
- Temporal limits reference
- Batch samples (Java)