Step Context Example
This example demonstrates the enhanced StepContext with checkpoint-based recovery, state management, execution status tracking, and history tracking.
Working with Checkpoints
The following example shows how to create and restore from checkpoints during step execution. This is especially useful for handling errors and recovering from failures.
import { StepContextImpl } from '@zoopflow/core';
import { ExecutionInfo } from '@zoopflow/core';
// Create a test context
function createTestContext() {
const executionInfo: ExecutionInfo = {
stepId: 'example.step',
flowId: 'example.flow',
executionId: `execution-${Date.now()}`,
attemptNumber: 1,
};
return new StepContextImpl(executionInfo);
}
// Example function demonstrating checkpoint-based recovery
async function executeWithCheckpoints() {
const context = createTestContext();
try {
console.log('Starting step execution...');
// Initialize state
await context.setState('status', 'initializing');
await context.setState('counter', 0);
// Create a checkpoint after initialization
const initCheckpoint = await context.createCheckpoint({ phase: 'initialization' });
console.log(`Created initial checkpoint: ${initCheckpoint}`);
// Perform first operation
await context.setState('status', 'processing');
await context.setState('counter', 1);
await context.setState('data', { items: ['item1'] });
// Create checkpoint after first operation
const firstOpCheckpoint = await context.createCheckpoint({ phase: 'operation-1' });
console.log(`Created checkpoint after first operation: ${firstOpCheckpoint}`);
// Simulate failure in second operation
if (Math.random() < 0.7) {
throw new Error('Simulated failure during second operation');
}
// Perform second operation
await context.setState('status', 'finalizing');
await context.setState('counter', 2);
await context.setState('data', {
items: ['item1', 'item2'],
completed: true
});
// Mark step as completed
await context.complete({ finalState: 'success' });
console.log('Step completed successfully');
} catch (error) {
console.error('Error during step execution:', error.message);
// Get all checkpoints to find the latest one
const checkpoints = await context.getAllCheckpoints();
if (checkpoints.length > 0) {
// Sort checkpoints by creation time (newest first)
const sortedCheckpoints = [...checkpoints].sort(
(a, b) => b.createdAt.getTime() - a.createdAt.getTime()
);
const latestCheckpoint = sortedCheckpoints[0];
console.log(`Attempting recovery using checkpoint: ${latestCheckpoint.id}`);
try {
// Restore from the latest checkpoint
await context.restoreFromCheckpoint(latestCheckpoint.id);
console.log('Restored state from checkpoint:', await context.getStateObject());
// Retry the operation that failed with a safer approach
await context.setState('status', 'recovery');
await context.setState('counter', 999);
await context.setState('data', {
items: ['item1', 'recovered-item'],
recovered: true
});
// Mark as completed with recovery info
await context.complete({ finalState: 'recovered', recoveredFrom: latestCheckpoint.id });
console.log('Step completed after recovery');
} catch (recoveryError) {
// Recovery failed, mark step as failed
await context.fail(
'Recovery failed',
{
originalError: error.message,
recoveryError: recoveryError.message
}
);
}
} else {
// No checkpoints available, mark as failed
await context.fail(error.message);
}
}
// Print final status and history
console.log('Final step status:', context.executionMetadata.status);
console.log('Execution history:');
context.history.forEach((entry, index) => {
console.log(`${index + 1}. [${entry.timestamp.toISOString()}] ${entry.type}`, entry.details || '');
});
}Execution Status Tracking
The following example demonstrates how to track step execution status:
async function trackExecutionStatus() {
const context = createTestContext();
console.log('Initial status:', context.executionMetadata.status); // 'pending'
// Simulate a multi-stage step execution
try {
// Stage 1
await context.setState('stage', 'data-validation');
// Add custom history entry
context.addHistoryEntry('custom', {
stage: 'data-validation',
result: 'passed'
});
// Stage 2
await context.setState('stage', 'data-processing');
// Create checkpoint between stages
const checkpoint = await context.createCheckpoint({ stage: 'after-processing' });
// Stage 3 (final)
await context.setState('stage', 'result-generation');
// Add final history entry
context.addHistoryEntry('custom', {
stage: 'result-generation',
result: 'success'
});
// Mark step as completed
await context.complete({ result: 'success' });
} catch (error) {
// Mark step as failed
await context.fail(error.message);
}
// Print final execution metadata
console.log('Final status:', context.executionMetadata.status);
console.log('Started:', context.executionMetadata.startedAt);
console.log('Completed:', context.executionMetadata.completedAt);
console.log('Succeeded:', context.executionMetadata.succeeded);
// Print history entries
console.log('History entries:', context.history.length);
context.history.forEach((entry, index) => {
console.log(`${index + 1}. [${entry.timestamp.toISOString()}] ${entry.type}`);
});
}Implementing a Robust Step with Checkpoints
Here's a complete example of implementing a step with checkpoint-based error handling:
import { defineStep } from '@zoopflow/core';
const processDataWithRecovery = defineStep({
id: 'data.batch.process',
version: '1.0.0',
description: 'Processes data in batches with checkpoint-based recovery',
inputSchema: {
type: 'object',
properties: {
items: {
type: 'array',
items: { type: 'object' }
},
batchSize: {
type: 'number',
default: 10
}
},
required: ['items']
},
outputSchema: {
type: 'object',
properties: {
processed: { type: 'number' },
failed: { type: 'number' },
skipped: { type: 'number' },
results: { type: 'array' }
},
required: ['processed', 'failed', 'results']
},
execute: async (input, context) => {
const { items, batchSize = 10 } = input;
let processedCount = 0;
let failedCount = 0;
let skippedCount = 0;
const results = [];
// Initialize state
await context.setState('processedCount', 0);
await context.setState('failedCount', 0);
await context.setState('skippedCount', 0);
await context.setState('currentBatchIndex', 0);
await context.setState('results', []);
// Get last checkpoint if this is a retry
if (context.info.attemptNumber > 1) {
const checkpoints = await context.getAllCheckpoints();
if (checkpoints.length > 0) {
// Sort by creation time (newest first)
const sortedCheckpoints = [...checkpoints].sort(
(a, b) => b.createdAt.getTime() - a.createdAt.getTime()
);
// Restore from latest checkpoint
await context.restoreFromCheckpoint(sortedCheckpoints[0].id);
context.log('Restored from checkpoint', {
checkpointId: sortedCheckpoints[0].id,
state: await context.getStateObject()
});
// Retrieve state
processedCount = await context.getState('processedCount') || 0;
failedCount = await context.getState('failedCount') || 0;
skippedCount = await context.getState('skippedCount') || 0;
const currentBatchIndex = await context.getState('currentBatchIndex') || 0;
results.push(...(await context.getState('results') || []));
// Skip already processed batches
const itemsToProcess = items.slice(currentBatchIndex * batchSize);
context.log('Resuming processing', {
totalItems: items.length,
itemsAlreadyProcessed: currentBatchIndex * batchSize,
itemsToProcess: itemsToProcess.length
});
}
}
// Process items in batches
for (let i = await context.getState('currentBatchIndex') || 0; i < Math.ceil(items.length / batchSize); i++) {
// Update current batch index
await context.setState('currentBatchIndex', i);
// Get current batch
const start = i * batchSize;
const end = Math.min(start + batchSize, items.length);
const batch = items.slice(start, end);
context.log('Processing batch', {
batchIndex: i,
batchSize: batch.length,
progress: `${start}/${items.length}`
});
// Create checkpoint before processing batch
const checkpointId = await context.createCheckpoint({
batchIndex: i,
progress: Math.floor((start / items.length) * 100)
});
try {
// Process each item in the batch
for (const item of batch) {
try {
// Process the item (replace with actual processing logic)
const result = await processItem(item);
// Update state
processedCount++;
await context.setState('processedCount', processedCount);
// Store result
results.push({
id: item.id,
status: 'success',
result
});
} catch (itemError) {
// Handle item-level failure
failedCount++;
await context.setState('failedCount', failedCount);
results.push({
id: item.id,
status: 'failed',
error: itemError.message
});
context.log('Item processing failed', {
itemId: item.id,
error: itemError.message
});
}
}
// Update results after each batch
await context.setState('results', results);
} catch (batchError) {
// Serious batch-level error
context.log('Batch processing failed', {
batchIndex: i,
error: batchError.message
});
// Mark all unprocessed items in this batch as skipped
for (const item of batch) {
if (!results.some(r => r.id === item.id)) {
results.push({
id: item.id,
status: 'skipped',
reason: 'batch_failure'
});
skippedCount++;
}
}
await context.setState('skippedCount', skippedCount);
await context.setState('results', results);
// Continue to next batch instead of failing the entire step
continue;
}
}
// Mark step as completed
await context.complete({
processed: processedCount,
failed: failedCount,
skipped: skippedCount
});
return {
processed: processedCount,
failed: failedCount,
skipped: skippedCount,
results
};
}
});
// Helper function to process a single item
async function processItem(item) {
// Simulate processing with occasional random failures
if (Math.random() < 0.1) {
throw new Error(`Failed to process item ${item.id}`);
}
// Simulate async processing
await new Promise(resolve => setTimeout(resolve, 10));
return {
processed: true,
timestamp: new Date().toISOString()
};
}Key Takeaways
When implementing steps with the enhanced StepContext:
-
Create Checkpoints Strategically
- Before risky operations that might fail
- After completing significant processing stages
- Before making external service calls
-
Use the State Management
- Store progress information using setState/getState
- Retrieve the complete state object when needed
- Update multiple values at once with updateState
-
Track Execution Status
- Use addHistoryEntry to record significant events
- Check executionMetadata for current status
- Explicitly complete or fail the step
-
Handle Recovery Elegantly
- Check for existing checkpoints on retry attempts
- Restore from the most appropriate checkpoint
- Skip already processed items
- Provide detailed logs during recovery