Guides
Step Context

Step Context

The StepContext is a powerful component that provides utilities, services, and state management during step execution. It enables robust error handling, checkpointing, and execution tracking for more resilient workflows.

What is the Step Context?

The StepContext is an execution environment that provides step implementations with:

  • Access to workflow state
  • Checkpointing and recovery capabilities
  • Execution history tracking
  • Service discovery and access
  • Structured logging
  • Lifecycle management

Every step receives a StepContext instance when executed, giving it access to these capabilities.

Core Features

The StepContext offers several key features to make step implementation more robust and maintainable.

Execution Information

Access information about the current execution:

// Access execution information
const stepContext = context;
const { stepId, flowId, executionId, attemptNumber } = context.info;

Step Metadata

Access metadata about the current step:

// Access step metadata
const { id, version, displayName, description } = context.stepMetadata;

Execution Status

Check the current status of step execution:

// Access execution metadata
const { status, startedAt, completedAt, succeeded } = context.executionMetadata;
 
// Check if step is in a specific state
if (context.executionMetadata.status === 'executing') {
  // Step is currently executing
}

State Management

StepContext provides comprehensive state management capabilities:

Basic State Operations

// Store a value
await context.setState('counter', 5);
 
// Retrieve a value
const counter = await context.getState<number>('counter');
if (counter !== undefined) {
  // Use the counter value
}

Advanced State Management

// Get the entire state
const stateObject = await context.getStateObject();
 
// Update multiple state values at once
await context.updateState({
  status: 'processing',
  counter: 10,
  lastUpdated: new Date()
});

Checkpointing and Recovery

One of the most powerful features of StepContext is its ability to create checkpoints and restore from them.

Creating Checkpoints

Checkpoints save the current state, allowing you to resume execution from that point if failures occur:

// Create a checkpoint with metadata
const checkpointId = await context.createCheckpoint({
  reason: 'Before network operation',
  data: { step: 'data_processing', progress: 50 }
});
 
// Store checkpoint ID for potential recovery
await context.setState('lastCheckpointId', checkpointId);

Restoring from Checkpoints

When errors occur, you can restore to a previous checkpoint:

try {
  // Risky operation that might fail
  await performRiskyOperation();
} catch (error) {
  context.log('Operation failed, attempting recovery');
  
  // Get the last checkpoint ID
  const checkpointId = await context.getState<string>('lastCheckpointId');
  
  if (checkpointId) {
    // Restore from checkpoint
    await context.restoreFromCheckpoint(checkpointId);
    context.log('Restored from checkpoint', { checkpointId });
    
    // Try alternative approach
    await performBackupOperation();
  }
}

Checkpoint Management

StepContext provides utilities to manage checkpoints:

// Get information about a specific checkpoint
const checkpointInfo = await context.getCheckpointInfo(checkpointId);
if (checkpointInfo) {
  console.log(`Checkpoint created at: ${checkpointInfo.createdAt}`);
  console.log(`Checkpoint state: ${checkpointInfo.state}`);
}
 
// Get all checkpoints for the current execution
const allCheckpoints = await context.getAllCheckpoints();
console.log(`Found ${allCheckpoints.length} checkpoints`);
 
// Find the most recent checkpoint
const sortedCheckpoints = [...allCheckpoints].sort(
  (a, b) => b.createdAt.getTime() - a.createdAt.getTime()
);
const latestCheckpoint = sortedCheckpoints[0];

Execution History

The StepContext tracks the execution history, providing visibility into the step's execution:

// Access all history entries
for (const entry of context.history) {
  console.log(`[${entry.timestamp}] ${entry.type}: ${JSON.stringify(entry.details)}`);
}
 
// Find specific history events
const checkpointEvents = context.history.filter(entry => entry.type === 'checkpoint');
console.log(`Created ${checkpointEvents.length} checkpoints`);

Custom History Entries

You can add custom entries to the history:

// Add a custom history entry
context.addHistoryEntry('custom', {
  operation: 'data_validation',
  recordsProcessed: 1000,
  validRecords: 990,
  invalidRecords: 10
});

Lifecycle Management

StepContext helps manage the step's lifecycle:

// Mark the step as completed
await context.complete({
  status: 'success',
  processedItems: 100,
  errors: 0
});
 
// Mark the step as failed
await context.fail(
  'Processing failed due to invalid data format',
  { errorCode: 'INVALID_FORMAT', details: 'Expected CSV, got JSON' }
);

Logging

The StepContext provides structured logging capabilities:

// Basic logging
context.log('Starting data processing');
 
// Logging with structured metadata
context.log('Item processed', {
  itemId: '12345',
  processingTime: 150,
  status: 'success'
});

Service Access

Get access to registered services:

// Get a service instance
const httpClient = context.getService<HttpClient>('httpClient');
 
// Use the service
const response = await httpClient.get('https://api.example.com/data');

Complete Example

Here's a comprehensive example showing how to use StepContext:

import { defineStep } from '@zoopflow/core';
 
const processDataWithRecoveryStep = defineStep({
  id: 'data.processing.withRecovery',
  version: '1.0.0',
  description: 'Processes data with checkpoint-based recovery',
  
  // Schema definitions here...
  
  execute: async (input, context) => {
    const { items, batchSize = 10 } = input;
    let processedCount = 0;
    let failedCount = 0;
    const results = [];
    
    // Initialize state
    await context.setState('status', 'initializing');
    await context.setState('progress', 0);
    
    context.log('Starting batch processing', { 
      totalItems: items.length, 
      batchSize 
    });
    
    // Process items in batches
    for (let i = 0; i < items.length; i += batchSize) {
      const batch = items.slice(i, i + batchSize);
      
      // Create checkpoint before processing batch
      const checkpointId = await context.createCheckpoint({
        batchIndex: i,
        progress: Math.floor((i / items.length) * 100)
      });
      
      // Update state with current progress
      await context.updateState({
        status: 'processing',
        batchIndex: i,
        progress: Math.floor((i / items.length) * 100),
        processedCount,
        failedCount
      });
      
      try {
        // Process each item in the batch
        for (const item of batch) {
          try {
            const result = await processItem(item);
            results.push({
              id: item.id,
              success: true,
              result
            });
            processedCount++;
          } catch (itemError) {
            results.push({
              id: item.id,
              success: false,
              error: itemError.message
            });
            failedCount++;
            context.log('Item processing failed', {
              itemId: item.id,
              error: itemError.message
            });
          }
        }
      } catch (batchError) {
        // Serious error processing batch
        context.log('Batch processing failed, restoring from checkpoint', {
          error: batchError.message,
          checkpointId
        });
        
        // Restore from checkpoint
        await context.restoreFromCheckpoint(checkpointId);
        
        // Try alternative processing approach
        // ... alternative implementation ...
      }
    }
    
    // Mark step as completed
    await context.complete({
      status: failedCount === 0 ? 'success' : 'partial',
      processedCount,
      failedCount
    });
    
    return {
      totalItems: items.length,
      processedCount,
      failedCount,
      results
    };
  }
});
 
// Helper function
async function processItem(item) {
  // Implementation...
}

Best Practices

Use Checkpoints Strategically

Create checkpoints at points where failures are likely or recovery would be valuable:

  • Before external service calls
  • Before processing large batches of data
  • Before transitions between major processing phases
// Create checkpoints at strategic points
const checkpointId = await context.createCheckpoint({
  phase: 'beforeApiCall',
  progress: 65
});

Track Execution Progress

Use state and history to track execution progress:

// Update progress regularly
await context.updateState({
  status: 'processing',
  progress: 75,
  itemsProcessed: 750,
  totalItems: 1000
});
 
// Add history entries for significant events
context.addHistoryEntry('custom', {
  phase: 'validation',
  valid: 980,
  invalid: 20
});

Handle Errors at Multiple Levels

Use appropriate error handling strategies at different levels:

// Batch-level error handling with checkpoint restoration
try {
  await processBatch(batch);
} catch (batchError) {
  await context.restoreFromCheckpoint(checkpointId);
  await alternativeProcessing(batch);
}
 
// Item-level error handling without disrupting the batch
for (const item of batch) {
  try {
    await processItem(item);
    successCount++;
  } catch (itemError) {
    failureCount++;
    errors.push({ itemId: item.id, error: itemError.message });
  }
}

Use Explicit Completion

Always mark steps as completed or failed:

// Mark successful completion
await context.complete({
  result: 'success',
  data: processedData
});
 
// Mark failure
await context.fail(
  'Processing failed due to data validation errors',
  { errorCount, validationErrors }
);

Interface Reference

The StepContext interface provides the following methods and properties:

interface StepContext {
  // Execution metadata
  info: ExecutionInfo;
  stepMetadata: StepMetadata;
  readonly executionMetadata: StepExecutionMetadata;
  readonly history: StepHistoryEntry[];
  
  // State management
  getState<T>(key: string): Promise<T | undefined>;
  setState<T>(key: string, value: T): Promise<void>;
  getStateObject(): Promise<Record<string, unknown>>;
  updateState(stateUpdates: Record<string, unknown>): Promise<void>;
  
  // Checkpointing and recovery
  createCheckpoint(metadata?: Record<string, unknown>): Promise<string>;
  restoreFromCheckpoint(checkpointId: string): Promise<void>;
  getCheckpointInfo(checkpointId: string): Promise<CheckpointInfo | undefined>;
  getAllCheckpoints(): Promise<CheckpointInfo[]>;
  
  // Logging and history
  log(message: string, metadata?: Record<string, unknown>): void;
  addHistoryEntry(type: 'custom', details: Record<string, unknown>): void;
  
  // Lifecycle management
  complete(details?: Record<string, unknown>): Promise<void>;
  fail(reason: string, details?: Record<string, unknown>): Promise<void>;
  
  // Service access
  getService<T>(name: string, options?: Record<string, unknown>): T;
}