Step Context
The StepContext is a powerful component that provides utilities, services, and state management during step execution. It enables robust error handling, checkpointing, and execution tracking for more resilient workflows.
What is the Step Context?
The StepContext is an execution environment that provides step implementations with:
- Access to workflow state
- Checkpointing and recovery capabilities
- Execution history tracking
- Service discovery and access
- Structured logging
- Lifecycle management
Every step receives a StepContext instance when executed, giving it access to these capabilities.
Core Features
The StepContext offers several key features to make step implementation more robust and maintainable.
Execution Information
Access information about the current execution:
// Access execution information
const stepContext = context;
const { stepId, flowId, executionId, attemptNumber } = context.info;Step Metadata
Access metadata about the current step:
// Access step metadata
const { id, version, displayName, description } = context.stepMetadata;Execution Status
Check the current status of step execution:
// Access execution metadata
const { status, startedAt, completedAt, succeeded } = context.executionMetadata;
// Check if step is in a specific state
if (context.executionMetadata.status === 'executing') {
// Step is currently executing
}State Management
StepContext provides comprehensive state management capabilities:
Basic State Operations
// Store a value
await context.setState('counter', 5);
// Retrieve a value
const counter = await context.getState<number>('counter');
if (counter !== undefined) {
// Use the counter value
}Advanced State Management
// Get the entire state
const stateObject = await context.getStateObject();
// Update multiple state values at once
await context.updateState({
status: 'processing',
counter: 10,
lastUpdated: new Date()
});Checkpointing and Recovery
One of the most powerful features of StepContext is its ability to create checkpoints and restore from them.
Creating Checkpoints
Checkpoints save the current state, allowing you to resume execution from that point if failures occur:
// Create a checkpoint with metadata
const checkpointId = await context.createCheckpoint({
reason: 'Before network operation',
data: { step: 'data_processing', progress: 50 }
});
// Store checkpoint ID for potential recovery
await context.setState('lastCheckpointId', checkpointId);Restoring from Checkpoints
When errors occur, you can restore to a previous checkpoint:
try {
// Risky operation that might fail
await performRiskyOperation();
} catch (error) {
context.log('Operation failed, attempting recovery');
// Get the last checkpoint ID
const checkpointId = await context.getState<string>('lastCheckpointId');
if (checkpointId) {
// Restore from checkpoint
await context.restoreFromCheckpoint(checkpointId);
context.log('Restored from checkpoint', { checkpointId });
// Try alternative approach
await performBackupOperation();
}
}Checkpoint Management
StepContext provides utilities to manage checkpoints:
// Get information about a specific checkpoint
const checkpointInfo = await context.getCheckpointInfo(checkpointId);
if (checkpointInfo) {
console.log(`Checkpoint created at: ${checkpointInfo.createdAt}`);
console.log(`Checkpoint state: ${checkpointInfo.state}`);
}
// Get all checkpoints for the current execution
const allCheckpoints = await context.getAllCheckpoints();
console.log(`Found ${allCheckpoints.length} checkpoints`);
// Find the most recent checkpoint
const sortedCheckpoints = [...allCheckpoints].sort(
(a, b) => b.createdAt.getTime() - a.createdAt.getTime()
);
const latestCheckpoint = sortedCheckpoints[0];Execution History
The StepContext tracks the execution history, providing visibility into the step's execution:
// Access all history entries
for (const entry of context.history) {
console.log(`[${entry.timestamp}] ${entry.type}: ${JSON.stringify(entry.details)}`);
}
// Find specific history events
const checkpointEvents = context.history.filter(entry => entry.type === 'checkpoint');
console.log(`Created ${checkpointEvents.length} checkpoints`);Custom History Entries
You can add custom entries to the history:
// Add a custom history entry
context.addHistoryEntry('custom', {
operation: 'data_validation',
recordsProcessed: 1000,
validRecords: 990,
invalidRecords: 10
});Lifecycle Management
StepContext helps manage the step's lifecycle:
// Mark the step as completed
await context.complete({
status: 'success',
processedItems: 100,
errors: 0
});
// Mark the step as failed
await context.fail(
'Processing failed due to invalid data format',
{ errorCode: 'INVALID_FORMAT', details: 'Expected CSV, got JSON' }
);Logging
The StepContext provides structured logging capabilities:
// Basic logging
context.log('Starting data processing');
// Logging with structured metadata
context.log('Item processed', {
itemId: '12345',
processingTime: 150,
status: 'success'
});Service Access
Get access to registered services:
// Get a service instance
const httpClient = context.getService<HttpClient>('httpClient');
// Use the service
const response = await httpClient.get('https://api.example.com/data');Complete Example
Here's a comprehensive example showing how to use StepContext:
import { defineStep } from '@zoopflow/core';
const processDataWithRecoveryStep = defineStep({
id: 'data.processing.withRecovery',
version: '1.0.0',
description: 'Processes data with checkpoint-based recovery',
// Schema definitions here...
execute: async (input, context) => {
const { items, batchSize = 10 } = input;
let processedCount = 0;
let failedCount = 0;
const results = [];
// Initialize state
await context.setState('status', 'initializing');
await context.setState('progress', 0);
context.log('Starting batch processing', {
totalItems: items.length,
batchSize
});
// Process items in batches
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
// Create checkpoint before processing batch
const checkpointId = await context.createCheckpoint({
batchIndex: i,
progress: Math.floor((i / items.length) * 100)
});
// Update state with current progress
await context.updateState({
status: 'processing',
batchIndex: i,
progress: Math.floor((i / items.length) * 100),
processedCount,
failedCount
});
try {
// Process each item in the batch
for (const item of batch) {
try {
const result = await processItem(item);
results.push({
id: item.id,
success: true,
result
});
processedCount++;
} catch (itemError) {
results.push({
id: item.id,
success: false,
error: itemError.message
});
failedCount++;
context.log('Item processing failed', {
itemId: item.id,
error: itemError.message
});
}
}
} catch (batchError) {
// Serious error processing batch
context.log('Batch processing failed, restoring from checkpoint', {
error: batchError.message,
checkpointId
});
// Restore from checkpoint
await context.restoreFromCheckpoint(checkpointId);
// Try alternative processing approach
// ... alternative implementation ...
}
}
// Mark step as completed
await context.complete({
status: failedCount === 0 ? 'success' : 'partial',
processedCount,
failedCount
});
return {
totalItems: items.length,
processedCount,
failedCount,
results
};
}
});
// Helper function
async function processItem(item) {
// Implementation...
}Best Practices
Use Checkpoints Strategically
Create checkpoints at points where failures are likely or recovery would be valuable:
- Before external service calls
- Before processing large batches of data
- Before transitions between major processing phases
// Create checkpoints at strategic points
const checkpointId = await context.createCheckpoint({
phase: 'beforeApiCall',
progress: 65
});Track Execution Progress
Use state and history to track execution progress:
// Update progress regularly
await context.updateState({
status: 'processing',
progress: 75,
itemsProcessed: 750,
totalItems: 1000
});
// Add history entries for significant events
context.addHistoryEntry('custom', {
phase: 'validation',
valid: 980,
invalid: 20
});Handle Errors at Multiple Levels
Use appropriate error handling strategies at different levels:
// Batch-level error handling with checkpoint restoration
try {
await processBatch(batch);
} catch (batchError) {
await context.restoreFromCheckpoint(checkpointId);
await alternativeProcessing(batch);
}
// Item-level error handling without disrupting the batch
for (const item of batch) {
try {
await processItem(item);
successCount++;
} catch (itemError) {
failureCount++;
errors.push({ itemId: item.id, error: itemError.message });
}
}Use Explicit Completion
Always mark steps as completed or failed:
// Mark successful completion
await context.complete({
result: 'success',
data: processedData
});
// Mark failure
await context.fail(
'Processing failed due to data validation errors',
{ errorCount, validationErrors }
);Interface Reference
The StepContext interface provides the following methods and properties:
interface StepContext {
// Execution metadata
info: ExecutionInfo;
stepMetadata: StepMetadata;
readonly executionMetadata: StepExecutionMetadata;
readonly history: StepHistoryEntry[];
// State management
getState<T>(key: string): Promise<T | undefined>;
setState<T>(key: string, value: T): Promise<void>;
getStateObject(): Promise<Record<string, unknown>>;
updateState(stateUpdates: Record<string, unknown>): Promise<void>;
// Checkpointing and recovery
createCheckpoint(metadata?: Record<string, unknown>): Promise<string>;
restoreFromCheckpoint(checkpointId: string): Promise<void>;
getCheckpointInfo(checkpointId: string): Promise<CheckpointInfo | undefined>;
getAllCheckpoints(): Promise<CheckpointInfo[]>;
// Logging and history
log(message: string, metadata?: Record<string, unknown>): void;
addHistoryEntry(type: 'custom', details: Record<string, unknown>): void;
// Lifecycle management
complete(details?: Record<string, unknown>): Promise<void>;
fail(reason: string, details?: Record<string, unknown>): Promise<void>;
// Service access
getService<T>(name: string, options?: Record<string, unknown>): T;
}