Core Concepts
State Management

State Management in ZoopFlow

Overview of State Management in ZoopFlow

State management is a fundamental aspect of workflow orchestration in ZoopFlow. It provides mechanisms for maintaining, accessing, and modifying data throughout a workflow's lifecycle. ZoopFlow leverages Temporal.io's durable execution model while adding its own abstraction layer to make state management more accessible and developer-friendly.

In ZoopFlow:

  • State represents the data and progress of workflows and their steps
  • State persists across system failures, worker restarts, and network issues
  • State can be scoped to different visibility levels (workflow, global)
  • State changes are automatically checkpointed at key execution points
  • External systems can interact with workflow state through Temporal's messaging primitives

State Management Architecture

ZoopFlow's state management architecture consists of several integrated components:

Core Components

  1. StateManager Interface: Defines the contract for state operations with implementations for different environments:

    • InMemoryStateManager (development)
    • RedisStateManager (production)
    • Other pluggable backends
  2. WorkflowStateManager: Manages the comprehensive state of a workflow execution, including:

    • Workflow metadata (IDs, timestamps)
    • Execution status
    • Node states
    • Checkpoints
  3. Context State Methods: context.getState() and context.setState() provide the primary API for state manipulation within workflows and steps

  4. Temporal Integration: Leverages Temporal's durable execution model for persistence and recovery

State Structure

The workflow state structure includes:

export interface WorkflowState {
  // Core identifiers
  flowId: string;
  executionId: string;
  
  // Execution status
  status: WorkflowExecutionStatusType;
  
  // Input/output data
  input: Record<string, unknown>;
  output?: Record<string, unknown>;
  error?: string;
  
  // Current execution point
  currentNodeId?: string;
  currentNodeState?: Record<string, unknown>;
  
  // Node execution history
  nodeStates: Record<string, Record<string, unknown>>;
  
  // Checkpoints for recovery
  checkpoints?: Checkpoint[];
  
  // Lifecycle timestamps and metadata
  createdAt: string;
  startedAt?: string;
  completedAt?: string;
  // Additional timestamps...
 
  // Extension point for additional properties
  [key: string]: unknown;
}

Flow State vs. Step State

ZoopFlow distinguishes between different levels of state granularity:

Flow State

Flow state represents the overall status and data of the entire workflow:

  • Managed by the WorkflowStateManager
  • Contains the global context of execution
  • Includes the comprehensive history of steps
  • Accessible throughout the workflow's lifecycle

Example of manipulating flow state:

// In defineFlow -> steps function:
async (input, context) => {
  // Initialize flow state
  await context.setState('orderStatus', 'PROCESSING');
  
  // Retrieve flow state later
  const status = await context.getState('orderStatus');
  
  // Flow execution logic based on state
  if (status === 'PROCESSING') {
    // Continue processing
  }
};

Step State

Step state represents the data and status of individual steps:

  • Tracked in the nodeStates property of the workflow state
  • Contains input, output, error, and execution metadata for each step
  • Updated automatically before and after step execution

Example of step state tracking:

// Inside the flow interpreter
private async executeNode(nodeId: string, input: Record<string, unknown>): Promise<Record<string, unknown>> {
  // Update node state before execution
  this.stateManager.setCurrentNode(nodeId, {
    status: 'executing',
    input,
    startedAt: new Date().toISOString(),
  });
  
  try {
    // Execute the node
    const output = await this.nodeHandler.executeNode(nodeId, input);
    
    // Update node state after execution
    this.stateManager.updateNodeState(nodeId, {
      status: 'completed',
      output,
      completedAt: new Date().toISOString(),
    });
    
    return output;
  } catch (error) {
    // Update node state on failure
    this.stateManager.updateNodeState(nodeId, {
      status: 'failed',
      error: error.message,
      failedAt: new Date().toISOString(),
    });
    
    throw error;
  }
}

State Scope and Visibility

ZoopFlow provides different scopes for state management, allowing developers to choose the appropriate visibility:

1. Workflow-Scoped State

Workflow-scoped state is isolated to a single workflow execution:

  • Default scope for most state operations
  • Managed by Temporal's built-in state persistence
  • Survives worker restarts and system failures
  • Isolated from other workflow executions
// Standard workflow-scoped state
await context.setState("orderStatus", "PROCESSING");
const status = await context.getState("orderStatus");

2. Global-Scoped State (Advanced)

Global state is shared across multiple workflow executions:

  • Requires explicit scope specification
  • Stored in external systems (e.g., Redis)
  • Enables cross-workflow communication
  • Useful for rate limiting, configuration sharing, etc.
// Explicit global state access
await context.setState("globalCounter", 10, { scope: "global" });
const counter = await context.getState("globalCounter", { scope: "global" });
 
// Helper methods
await context.setGlobalState("apiQuota", 100, { ttl: 3600 });
const quota = await context.getGlobalState("apiQuota");

Persistent State Management

ZoopFlow ensures state persistence through various mechanisms:

Temporal's Event Sourcing

Temporal uses an event sourcing model to maintain workflow state:

  • Every state-changing event is recorded in an append-only log
  • Workflow state is reconstructed by replaying events during recovery
  • Checkpoints occur at deterministic points (after activity calls, timers, signals)

State Manager Implementations

Different StateManager implementations provide appropriate persistence options:

InMemoryStateManager (Development)

class InMemoryStateManager implements StateManager {
  private workflowState = new Map<string, Map<string, any>>();
  private globalState = new Map<string, any>();
 
  // Implementation details...
}

RedisStateManager (Production)

class RedisStateManager implements StateManager {
  constructor(private redisClient: Redis) {}
  
  // Implementation that stores state in Redis
  // with appropriate key structures and TTL handling
}

State Serialization and Deserialization

All state in ZoopFlow must be serializable to ensure proper persistence and recovery:

Serialization Process

  • All state values are serialized to JSON before storage
  • Complex objects must be JSON-serializable
  • Non-serializable data should be handled via references or storage in external systems

Example serialization in RedisStateManager:

async setState<T>(
  workflowId: string,
  key: string,
  value: T,
  options?: StateOptions
): Promise<void> {
  const redisKey = this.getRedisKey(options?.scope || "workflow", workflowId, key);
  const serialized = JSON.stringify(value);
  
  if (options?.ttl) {
    await this.redisClient.setex(redisKey, options.ttl, serialized);
  } else {
    await this.redisClient.set(redisKey, serialized);
  }
}

Deserialization Process

  • State is automatically deserialized when retrieved
  • Type safety is maintained through generic type parameters
async getState<T>(
  workflowId: string,
  key: string,
  options?: StateOptions
): Promise<T | undefined> {
  const redisKey = this.getRedisKey(options?.scope || "workflow", workflowId, key);
  const data = await this.redisClient.get(redisKey);
  return data ? JSON.parse(data) as T : undefined;
}

Checkpointing and Recovery

Checkpoints capture the workflow state at key execution points to enable recovery:

Automatic Checkpointing

ZoopFlow automatically creates checkpoints at strategic execution points:

  • When workflow status changes
  • Before and after node execution
  • When signals are received
  • Before potentially long-running operations
// Creating a checkpoint
stateManager.createCheckpoint({
  type: 'node-execution-start',
  metadata: {
    nodeId: 'validate-payment',
    input: { orderId: '12345' },
  },
});

State Recovery

State recovery in ZoopFlow is primarily handled by Temporal's replay mechanism:

  • During replay, Temporal reconstructs the workflow state by re-executing the workflow code
  • Deterministic execution ensures the state is identical to the original execution
  • The WorkflowStateManager automatically maintains consistent state during replay

State Access Patterns and Best Practices

Common State Access Patterns

  1. Direct Variable Access: Use standard variables for sequential data passing between steps.

    const resultA = await stepA.execute(input, context);
    const resultB = await stepB.execute({ data: resultA.output }, context);
  2. Context State for Persistence: Use context state for data that needs to be accessible from multiple points.

    await context.setState('intermediateResult', resultA.output);
    // Later or in another part of the workflow:
    const data = await context.getState('intermediateResult');
  3. State as Workflow Progress Indicator: Use state to track overall progress.

    await context.setState('progress', { 
      completed: 3,
      total: 10, 
      percentage: 30,
      stage: 'data-validation'
    });
  4. State for External Visibility: Use state to expose workflow progress to external systems.

    // Define query handler
    context.workflow.handleQuery('getProgress', async () => {
      return await context.getState('progress');
    });

Best Practices

  1. State Immutability:

    • Always create new state objects instead of mutating existing ones
    • Use immutable update patterns
    • Avoid direct references to mutable objects
  2. State Size Management:

    • Keep state size manageable (avoid storing large datasets in flow state)
    • Store large data in external systems
    • Use references instead of embedding large objects
  3. Deterministic State Updates:

    • Ensure all state updates are deterministic
    • Use Temporal's deterministic functions for timestamps and IDs
    • Avoid non-deterministic operations in state updates
  4. State Scope Selection:

    • Use workflow-scoped state by default
    • Only use global state when necessary for cross-workflow communication
    • Set appropriate TTL for global state to prevent storage leaks
  5. Error Handling:

    • Implement error handling for state operations
    • Consider state consistency during compensating actions
    • Save error states to aid in debugging

State Management with Temporal

ZoopFlow builds on Temporal's state management capabilities:

Leveraging Temporal's Core Features

  1. Event Sourcing: Temporal records all events in an append-only history
  2. Deterministic Replay: Guarantees consistent state reconstruction
  3. Checkpointing: Automatic state capture at activity boundaries
  4. Durable Execution: Ensures workflow progress survives failures

Integration with Temporal Primitives

ZoopFlow exposes Temporal's messaging primitives for state interaction:

Signals

Signals allow external systems to send asynchronous, one-way messages to update workflow state:

// In the workflow
context.workflow.handleSignal<string>('updateStatus', async (newStatus) => {
  await context.setState('status', newStatus);
});
 
// External client
await client.getHandle(workflowId).signal('updateStatus', 'APPROVED');

Queries

Queries enable read-only access to workflow state:

// In the workflow
context.workflow.handleQuery<[], { status: string, progress: number }>('getWorkflowStatus', async () => {
  const status = await context.getState<string>('status');
  const progress = await context.getState<number>('progress');
  return { status, progress };
});
 
// External client
const status = await client.getHandle(workflowId).query('getWorkflowStatus');

Updates

Updates provide synchronous, transactional state modifications:

// In the workflow
context.workflow.handleUpdate<[string], string[]>('addItemToCart', {
  execute: async (item) => {
    const cart = (await context.getState<string[]>('cartItems')) || [];
    cart.push(item);
    await context.setState('cartItems', cart);
    return cart;
  }
});
 
// External client
const updatedCart = await client.getHandle(workflowId).update('addItemToCart', 'productXYZ');

Advanced State Management Patterns

1. Saga Pattern Implementation

Using state to track saga execution and compensating actions:

async function orderSaga(input, context) {
  // Initialize saga state
  await context.setState('sagaState', {
    steps: [
      { name: 'reserveInventory', status: 'pending', compensate: true },
      { name: 'processPayment', status: 'pending', compensate: true },
      { name: 'scheduleShipping', status: 'pending', compensate: false }
    ],
    currentStep: 0,
    compensating: false
  });
 
  try {
    // Execute saga steps
    await executeStep('reserveInventory', reserveInventoryStep, input, context);
    await executeStep('processPayment', processPaymentStep, input, context);
    await executeStep('scheduleShipping', scheduleShippingStep, input, context);
    
    // Mark saga as complete
    const sagaState = await context.getState('sagaState');
    await context.setState('sagaState', { ...sagaState, status: 'completed' });
    
    return { status: 'success' };
  } catch (error) {
    // Compensation logic
    const sagaState = await context.getState('sagaState');
    await context.setState('sagaState', { ...sagaState, compensating: true });
    
    // Compensate completed steps in reverse order
    for (let i = sagaState.currentStep - 1; i >= 0; i--) {
      const step = sagaState.steps[i];
      if (step.status === 'completed' && step.compensate) {
        await executeCompensation(step.name, input, context);
      }
    }
    
    throw error;
  }
}
 
async function executeStep(stepName, step, input, context) {
  // Get saga state
  const sagaState = await context.getState('sagaState');
  const stepIndex = sagaState.steps.findIndex(s => s.name === stepName);
  
  // Update saga state
  sagaState.steps[stepIndex].status = 'executing';
  sagaState.currentStep = stepIndex;
  await context.setState('sagaState', sagaState);
  
  // Execute step
  await step.execute(input, context);
  
  // Update saga state after completion
  const updatedState = await context.getState('sagaState');
  updatedState.steps[stepIndex].status = 'completed';
  await context.setState('sagaState', updatedState);
}

2. State-Based Correlation

Using state to correlate related events and workflows:

// Store correlation identifier
await context.setGlobalState(`correlation:${correlationId}`, {
  workflows: [workflowId],
  status: 'active',
  createdAt: new Date().toISOString()
});
 
// Add another workflow to correlation
const correlation = await context.getGlobalState(`correlation:${correlationId}`);
correlation.workflows.push(newWorkflowId);
await context.setGlobalState(`correlation:${correlationId}`, correlation);

3. State Versioning

Managing state schema evolution:

// Check state version
const state = await context.getState('myData');
const stateVersion = state?.version || 1;
 
// Handle different versions
if (stateVersion === 1) {
  // Handle v1 schema
  const migratedState = migrateV1ToV2(state);
  await context.setState('myData', {
    ...migratedState,
    version: 2
  });
}
 
// Current code works with v2 schema
const currentState = await context.getState('myData');

Examples

Basic State Management Example

import { defineFlow, defineStep } from 'zoopflow';
 
// Define a payment processing step
const processPaymentStep = defineStep({
  name: 'processPayment',
  execute: async (input, context) => {
    // Use input to process payment
    const result = await paymentService.process(input.orderId, input.amount);
    
    // Store result in workflow state for later use
    await context.setState('paymentResult', result);
    
    return {
      success: result.status === 'approved',
      transactionId: result.transactionId
    };
  }
});
 
// Define an order fulfillment flow
const orderFlow = defineFlow({
  name: 'orderFulfillment',
  steps: async (input, context) => {
    // Initialize workflow state
    await context.setState('orderStatus', 'PROCESSING');
    await context.setState('orderHistory', [
      { status: 'RECEIVED', timestamp: new Date().toISOString() }
    ]);
    
    // Process payment
    const paymentResult = await processPaymentStep.execute(
      { orderId: input.orderId, amount: input.total },
      context
    );
    
    // Update order status based on payment result
    if (paymentResult.success) {
      await context.setState('orderStatus', 'PAYMENT_APPROVED');
      
      // Append to order history
      const history = await context.getState<any[]>('orderHistory');
      await context.setState('orderHistory', [
        ...history,
        { status: 'PAYMENT_APPROVED', timestamp: new Date().toISOString() }
      ]);
      
      // Continue with order processing...
      return { status: 'success', orderId: input.orderId };
    } else {
      await context.setState('orderStatus', 'PAYMENT_FAILED');
      
      // Append to order history
      const history = await context.getState<any[]>('orderHistory');
      await context.setState('orderHistory', [
        ...history,
        { status: 'PAYMENT_FAILED', timestamp: new Date().toISOString() }
      ]);
      
      return { status: 'failed', orderId: input.orderId, reason: 'payment_declined' };
    }
  }
});

Human Interaction with State Management

const approvalWorkflow = defineFlow({
  name: 'documentApproval',
  steps: async (input, context) => {
    // Initialize approval state
    await context.setState('approvalState', {
      status: 'PENDING',
      document: input.documentId,
      assignedTo: input.reviewerId,
      createdAt: new Date().toISOString(),
      comments: []
    });
    
    // Set up signal handler for document approval
    context.workflow.handleSignal<{ approved: boolean, comment: string }>('approvalDecision', async (decision) => {
      const approvalState = await context.getState<any>('approvalState');
      
      const newState = {
        ...approvalState,
        status: decision.approved ? 'APPROVED' : 'REJECTED',
        decidedAt: new Date().toISOString(),
        comments: [
          ...approvalState.comments,
          { text: decision.comment, timestamp: new Date().toISOString() }
        ]
      };
      
      await context.setState('approvalState', newState);
    });
    
    // Set up query for current approval state
    context.workflow.handleQuery<[], any>('getApprovalState', async () => {
      return await context.getState('approvalState');
    });
    
    // Wait for approval decision
    context.log('Waiting for approval decision...');
    await context.waitForCondition(async () => {
      const state = await context.getState<any>('approvalState');
      return state.status !== 'PENDING';
    });
    
    // Get final state
    const finalState = await context.getState<any>('approvalState');
    context.log(`Document ${finalState.status}`);
    
    return {
      documentId: input.documentId,
      approved: finalState.status === 'APPROVED',
      comments: finalState.comments
    };
  }
});

Distributed State Management

const distributedProcessingFlow = defineFlow({
  name: 'batchDataProcessing',
  steps: async (input, context) => {
    // Initialize processing state
    await context.setState('processingState', {
      batchId: input.batchId,
      totalItems: input.items.length,
      processedItems: 0,
      results: [],
      errors: [],
      startTime: new Date().toISOString()
    });
    
    // Process items in parallel with state updates
    const chunks = chunkArray(input.items, 10); // Process in chunks of 10
    
    for (const [chunkIndex, chunk] of chunks.entries()) {
      context.log(`Processing chunk ${chunkIndex + 1}/${chunks.length}`);
      
      // Process chunk in parallel
      const results = await Promise.all(
        chunk.map(async (item) => {
          try {
            const result = await processItemStep.execute({ item }, context);
            
            // Update shared state atomically
            await context.workflow.updateState(async () => {
              const state = await context.getState<any>('processingState');
              return {
                ...state,
                processedItems: state.processedItems + 1,
                results: [...state.results, { item, result, success: true }]
              };
            });
            
            return { item, result, success: true };
          } catch (error) {
            // Update shared state with error
            await context.workflow.updateState(async () => {
              const state = await context.getState<any>('processingState');
              return {
                ...state,
                processedItems: state.processedItems + 1,
                errors: [...state.errors, { item, error: error.message }]
              };
            });
            
            return { item, error: error.message, success: false };
          }
        })
      );
      
      // Update progress for external visibility
      const currentState = await context.getState<any>('processingState');
      await context.setState('progress', {
        processed: currentState.processedItems,
        total: currentState.totalItems,
        percentage: Math.round((currentState.processedItems / currentState.totalItems) * 100)
      });
    }
    
    // Finalize state
    const finalState = await context.getState<any>('processingState');
    await context.setState('processingState', {
      ...finalState,
      status: 'COMPLETED',
      endTime: new Date().toISOString(),
      duration: (new Date().getTime() - new Date(finalState.startTime).getTime()) / 1000
    });
    
    return {
      batchId: input.batchId,
      processed: finalState.processedItems,
      successful: finalState.results.length,
      failed: finalState.errors.length,
      duration: (new Date().getTime() - new Date(finalState.startTime).getTime()) / 1000
    };
  }
});

By following these state management practices and patterns, ZoopFlow developers can build robust, stateful workflows that maintain data integrity across long-running processes, system failures, and complex execution paths.