Examples
Step Context

Step Context Example

This example demonstrates the enhanced StepContext with checkpoint-based recovery, state management, execution status tracking, and history tracking.

Working with Checkpoints

The following example shows how to create and restore from checkpoints during step execution. This is especially useful for handling errors and recovering from failures.

import { StepContextImpl } from '@zoopflow/core';
import { ExecutionInfo } from '@zoopflow/core';
 
// Create a test context
function createTestContext() {
  const executionInfo: ExecutionInfo = {
    stepId: 'example.step',
    flowId: 'example.flow',
    executionId: `execution-${Date.now()}`,
    attemptNumber: 1,
  };
 
  return new StepContextImpl(executionInfo);
}
 
// Example function demonstrating checkpoint-based recovery
async function executeWithCheckpoints() {
  const context = createTestContext();
  
  try {
    console.log('Starting step execution...');
    
    // Initialize state
    await context.setState('status', 'initializing');
    await context.setState('counter', 0);
    
    // Create a checkpoint after initialization
    const initCheckpoint = await context.createCheckpoint({ phase: 'initialization' });
    console.log(`Created initial checkpoint: ${initCheckpoint}`);
    
    // Perform first operation
    await context.setState('status', 'processing');
    await context.setState('counter', 1);
    await context.setState('data', { items: ['item1'] });
    
    // Create checkpoint after first operation
    const firstOpCheckpoint = await context.createCheckpoint({ phase: 'operation-1' });
    console.log(`Created checkpoint after first operation: ${firstOpCheckpoint}`);
    
    // Simulate failure in second operation
    if (Math.random() < 0.7) {
      throw new Error('Simulated failure during second operation');
    }
    
    // Perform second operation
    await context.setState('status', 'finalizing');
    await context.setState('counter', 2);
    await context.setState('data', { 
      items: ['item1', 'item2'],
      completed: true 
    });
    
    // Mark step as completed
    await context.complete({ finalState: 'success' });
    console.log('Step completed successfully');
    
  } catch (error) {
    console.error('Error during step execution:', error.message);
    
    // Get all checkpoints to find the latest one
    const checkpoints = await context.getAllCheckpoints();
    
    if (checkpoints.length > 0) {
      // Sort checkpoints by creation time (newest first)
      const sortedCheckpoints = [...checkpoints].sort(
        (a, b) => b.createdAt.getTime() - a.createdAt.getTime()
      );
      
      const latestCheckpoint = sortedCheckpoints[0];
      console.log(`Attempting recovery using checkpoint: ${latestCheckpoint.id}`);
      
      try {
        // Restore from the latest checkpoint
        await context.restoreFromCheckpoint(latestCheckpoint.id);
        console.log('Restored state from checkpoint:', await context.getStateObject());
        
        // Retry the operation that failed with a safer approach
        await context.setState('status', 'recovery');
        await context.setState('counter', 999);
        await context.setState('data', { 
          items: ['item1', 'recovered-item'],
          recovered: true 
        });
        
        // Mark as completed with recovery info
        await context.complete({ finalState: 'recovered', recoveredFrom: latestCheckpoint.id });
        console.log('Step completed after recovery');
        
      } catch (recoveryError) {
        // Recovery failed, mark step as failed
        await context.fail(
          'Recovery failed',
          { 
            originalError: error.message,
            recoveryError: recoveryError.message
          }
        );
      }
    } else {
      // No checkpoints available, mark as failed
      await context.fail(error.message);
    }
  }
  
  // Print final status and history
  console.log('Final step status:', context.executionMetadata.status);
  console.log('Execution history:');
  context.history.forEach((entry, index) => {
    console.log(`${index + 1}. [${entry.timestamp.toISOString()}] ${entry.type}`, entry.details || '');
  });
}

Execution Status Tracking

The following example demonstrates how to track step execution status:

async function trackExecutionStatus() {
  const context = createTestContext();
  
  console.log('Initial status:', context.executionMetadata.status); // 'pending'
  
  // Simulate a multi-stage step execution
  try {
    // Stage 1
    await context.setState('stage', 'data-validation');
    
    // Add custom history entry
    context.addHistoryEntry('custom', {
      stage: 'data-validation',
      result: 'passed'
    });
    
    // Stage 2
    await context.setState('stage', 'data-processing');
    
    // Create checkpoint between stages
    const checkpoint = await context.createCheckpoint({ stage: 'after-processing' });
    
    // Stage 3 (final)
    await context.setState('stage', 'result-generation');
    
    // Add final history entry
    context.addHistoryEntry('custom', {
      stage: 'result-generation',
      result: 'success'
    });
    
    // Mark step as completed
    await context.complete({ result: 'success' });
    
  } catch (error) {
    // Mark step as failed
    await context.fail(error.message);
  }
  
  // Print final execution metadata
  console.log('Final status:', context.executionMetadata.status);
  console.log('Started:', context.executionMetadata.startedAt);
  console.log('Completed:', context.executionMetadata.completedAt);
  console.log('Succeeded:', context.executionMetadata.succeeded);
  
  // Print history entries
  console.log('History entries:', context.history.length);
  context.history.forEach((entry, index) => {
    console.log(`${index + 1}. [${entry.timestamp.toISOString()}] ${entry.type}`);
  });
}

Implementing a Robust Step with Checkpoints

Here's a complete example of implementing a step with checkpoint-based error handling:

import { defineStep } from '@zoopflow/core';
 
const processDataWithRecovery = defineStep({
  id: 'data.batch.process',
  version: '1.0.0',
  description: 'Processes data in batches with checkpoint-based recovery',
  
  inputSchema: {
    type: 'object',
    properties: {
      items: {
        type: 'array',
        items: { type: 'object' }
      },
      batchSize: {
        type: 'number',
        default: 10
      }
    },
    required: ['items']
  },
  
  outputSchema: {
    type: 'object',
    properties: {
      processed: { type: 'number' },
      failed: { type: 'number' },
      skipped: { type: 'number' },
      results: { type: 'array' }
    },
    required: ['processed', 'failed', 'results']
  },
  
  execute: async (input, context) => {
    const { items, batchSize = 10 } = input;
    let processedCount = 0;
    let failedCount = 0;
    let skippedCount = 0;
    const results = [];
    
    // Initialize state
    await context.setState('processedCount', 0);
    await context.setState('failedCount', 0);
    await context.setState('skippedCount', 0);
    await context.setState('currentBatchIndex', 0);
    await context.setState('results', []);
    
    // Get last checkpoint if this is a retry
    if (context.info.attemptNumber > 1) {
      const checkpoints = await context.getAllCheckpoints();
      if (checkpoints.length > 0) {
        // Sort by creation time (newest first)
        const sortedCheckpoints = [...checkpoints].sort(
          (a, b) => b.createdAt.getTime() - a.createdAt.getTime()
        );
        
        // Restore from latest checkpoint
        await context.restoreFromCheckpoint(sortedCheckpoints[0].id);
        context.log('Restored from checkpoint', {
          checkpointId: sortedCheckpoints[0].id,
          state: await context.getStateObject()
        });
        
        // Retrieve state
        processedCount = await context.getState('processedCount') || 0;
        failedCount = await context.getState('failedCount') || 0;
        skippedCount = await context.getState('skippedCount') || 0;
        const currentBatchIndex = await context.getState('currentBatchIndex') || 0;
        results.push(...(await context.getState('results') || []));
        
        // Skip already processed batches
        const itemsToProcess = items.slice(currentBatchIndex * batchSize);
        context.log('Resuming processing', {
          totalItems: items.length,
          itemsAlreadyProcessed: currentBatchIndex * batchSize,
          itemsToProcess: itemsToProcess.length
        });
      }
    }
    
    // Process items in batches
    for (let i = await context.getState('currentBatchIndex') || 0; i < Math.ceil(items.length / batchSize); i++) {
      // Update current batch index
      await context.setState('currentBatchIndex', i);
      
      // Get current batch
      const start = i * batchSize;
      const end = Math.min(start + batchSize, items.length);
      const batch = items.slice(start, end);
      
      context.log('Processing batch', {
        batchIndex: i,
        batchSize: batch.length,
        progress: `${start}/${items.length}`
      });
      
      // Create checkpoint before processing batch
      const checkpointId = await context.createCheckpoint({
        batchIndex: i,
        progress: Math.floor((start / items.length) * 100)
      });
      
      try {
        // Process each item in the batch
        for (const item of batch) {
          try {
            // Process the item (replace with actual processing logic)
            const result = await processItem(item);
            
            // Update state
            processedCount++;
            await context.setState('processedCount', processedCount);
            
            // Store result
            results.push({
              id: item.id,
              status: 'success',
              result
            });
          } catch (itemError) {
            // Handle item-level failure
            failedCount++;
            await context.setState('failedCount', failedCount);
            
            results.push({
              id: item.id,
              status: 'failed',
              error: itemError.message
            });
            
            context.log('Item processing failed', {
              itemId: item.id,
              error: itemError.message
            });
          }
        }
        
        // Update results after each batch
        await context.setState('results', results);
        
      } catch (batchError) {
        // Serious batch-level error
        context.log('Batch processing failed', {
          batchIndex: i,
          error: batchError.message
        });
        
        // Mark all unprocessed items in this batch as skipped
        for (const item of batch) {
          if (!results.some(r => r.id === item.id)) {
            results.push({
              id: item.id,
              status: 'skipped',
              reason: 'batch_failure'
            });
            skippedCount++;
          }
        }
        
        await context.setState('skippedCount', skippedCount);
        await context.setState('results', results);
        
        // Continue to next batch instead of failing the entire step
        continue;
      }
    }
    
    // Mark step as completed
    await context.complete({
      processed: processedCount,
      failed: failedCount,
      skipped: skippedCount
    });
    
    return {
      processed: processedCount,
      failed: failedCount,
      skipped: skippedCount,
      results
    };
  }
});
 
// Helper function to process a single item
async function processItem(item) {
  // Simulate processing with occasional random failures
  if (Math.random() < 0.1) {
    throw new Error(`Failed to process item ${item.id}`);
  }
  
  // Simulate async processing
  await new Promise(resolve => setTimeout(resolve, 10));
  
  return {
    processed: true,
    timestamp: new Date().toISOString()
  };
}

Key Takeaways

When implementing steps with the enhanced StepContext:

  1. Create Checkpoints Strategically

    • Before risky operations that might fail
    • After completing significant processing stages
    • Before making external service calls
  2. Use the State Management

    • Store progress information using setState/getState
    • Retrieve the complete state object when needed
    • Update multiple values at once with updateState
  3. Track Execution Status

    • Use addHistoryEntry to record significant events
    • Check executionMetadata for current status
    • Explicitly complete or fail the step
  4. Handle Recovery Elegantly

    • Check for existing checkpoints on retry attempts
    • Restore from the most appropriate checkpoint
    • Skip already processed items
    • Provide detailed logs during recovery