State Management in ZoopFlow
Overview of State Management in ZoopFlow
State management is a fundamental aspect of workflow orchestration in ZoopFlow. It provides mechanisms for maintaining, accessing, and modifying data throughout a workflow's lifecycle. ZoopFlow leverages Temporal.io's durable execution model while adding its own abstraction layer to make state management more accessible and developer-friendly.
In ZoopFlow:
- State represents the data and progress of workflows and their steps
- State persists across system failures, worker restarts, and network issues
- State can be scoped to different visibility levels (workflow, global)
- State changes are automatically checkpointed at key execution points
- External systems can interact with workflow state through Temporal's messaging primitives
State Management Architecture
ZoopFlow's state management architecture consists of several integrated components:
Core Components
-
StateManager Interface: Defines the contract for state operations with implementations for different environments:
- InMemoryStateManager (development)
- RedisStateManager (production)
- Other pluggable backends
-
WorkflowStateManager: Manages the comprehensive state of a workflow execution, including:
- Workflow metadata (IDs, timestamps)
- Execution status
- Node states
- Checkpoints
-
Context State Methods:
context.getState()andcontext.setState()provide the primary API for state manipulation within workflows and steps -
Temporal Integration: Leverages Temporal's durable execution model for persistence and recovery
State Structure
The workflow state structure includes:
export interface WorkflowState {
// Core identifiers
flowId: string;
executionId: string;
// Execution status
status: WorkflowExecutionStatusType;
// Input/output data
input: Record<string, unknown>;
output?: Record<string, unknown>;
error?: string;
// Current execution point
currentNodeId?: string;
currentNodeState?: Record<string, unknown>;
// Node execution history
nodeStates: Record<string, Record<string, unknown>>;
// Checkpoints for recovery
checkpoints?: Checkpoint[];
// Lifecycle timestamps and metadata
createdAt: string;
startedAt?: string;
completedAt?: string;
// Additional timestamps...
// Extension point for additional properties
[key: string]: unknown;
}Flow State vs. Step State
ZoopFlow distinguishes between different levels of state granularity:
Flow State
Flow state represents the overall status and data of the entire workflow:
- Managed by the WorkflowStateManager
- Contains the global context of execution
- Includes the comprehensive history of steps
- Accessible throughout the workflow's lifecycle
Example of manipulating flow state:
// In defineFlow -> steps function:
async (input, context) => {
// Initialize flow state
await context.setState('orderStatus', 'PROCESSING');
// Retrieve flow state later
const status = await context.getState('orderStatus');
// Flow execution logic based on state
if (status === 'PROCESSING') {
// Continue processing
}
};Step State
Step state represents the data and status of individual steps:
- Tracked in the
nodeStatesproperty of the workflow state - Contains input, output, error, and execution metadata for each step
- Updated automatically before and after step execution
Example of step state tracking:
// Inside the flow interpreter
private async executeNode(nodeId: string, input: Record<string, unknown>): Promise<Record<string, unknown>> {
// Update node state before execution
this.stateManager.setCurrentNode(nodeId, {
status: 'executing',
input,
startedAt: new Date().toISOString(),
});
try {
// Execute the node
const output = await this.nodeHandler.executeNode(nodeId, input);
// Update node state after execution
this.stateManager.updateNodeState(nodeId, {
status: 'completed',
output,
completedAt: new Date().toISOString(),
});
return output;
} catch (error) {
// Update node state on failure
this.stateManager.updateNodeState(nodeId, {
status: 'failed',
error: error.message,
failedAt: new Date().toISOString(),
});
throw error;
}
}State Scope and Visibility
ZoopFlow provides different scopes for state management, allowing developers to choose the appropriate visibility:
1. Workflow-Scoped State
Workflow-scoped state is isolated to a single workflow execution:
- Default scope for most state operations
- Managed by Temporal's built-in state persistence
- Survives worker restarts and system failures
- Isolated from other workflow executions
// Standard workflow-scoped state
await context.setState("orderStatus", "PROCESSING");
const status = await context.getState("orderStatus");2. Global-Scoped State (Advanced)
Global state is shared across multiple workflow executions:
- Requires explicit scope specification
- Stored in external systems (e.g., Redis)
- Enables cross-workflow communication
- Useful for rate limiting, configuration sharing, etc.
// Explicit global state access
await context.setState("globalCounter", 10, { scope: "global" });
const counter = await context.getState("globalCounter", { scope: "global" });
// Helper methods
await context.setGlobalState("apiQuota", 100, { ttl: 3600 });
const quota = await context.getGlobalState("apiQuota");Persistent State Management
ZoopFlow ensures state persistence through various mechanisms:
Temporal's Event Sourcing
Temporal uses an event sourcing model to maintain workflow state:
- Every state-changing event is recorded in an append-only log
- Workflow state is reconstructed by replaying events during recovery
- Checkpoints occur at deterministic points (after activity calls, timers, signals)
State Manager Implementations
Different StateManager implementations provide appropriate persistence options:
InMemoryStateManager (Development)
class InMemoryStateManager implements StateManager {
private workflowState = new Map<string, Map<string, any>>();
private globalState = new Map<string, any>();
// Implementation details...
}RedisStateManager (Production)
class RedisStateManager implements StateManager {
constructor(private redisClient: Redis) {}
// Implementation that stores state in Redis
// with appropriate key structures and TTL handling
}State Serialization and Deserialization
All state in ZoopFlow must be serializable to ensure proper persistence and recovery:
Serialization Process
- All state values are serialized to JSON before storage
- Complex objects must be JSON-serializable
- Non-serializable data should be handled via references or storage in external systems
Example serialization in RedisStateManager:
async setState<T>(
workflowId: string,
key: string,
value: T,
options?: StateOptions
): Promise<void> {
const redisKey = this.getRedisKey(options?.scope || "workflow", workflowId, key);
const serialized = JSON.stringify(value);
if (options?.ttl) {
await this.redisClient.setex(redisKey, options.ttl, serialized);
} else {
await this.redisClient.set(redisKey, serialized);
}
}Deserialization Process
- State is automatically deserialized when retrieved
- Type safety is maintained through generic type parameters
async getState<T>(
workflowId: string,
key: string,
options?: StateOptions
): Promise<T | undefined> {
const redisKey = this.getRedisKey(options?.scope || "workflow", workflowId, key);
const data = await this.redisClient.get(redisKey);
return data ? JSON.parse(data) as T : undefined;
}Checkpointing and Recovery
Checkpoints capture the workflow state at key execution points to enable recovery:
Automatic Checkpointing
ZoopFlow automatically creates checkpoints at strategic execution points:
- When workflow status changes
- Before and after node execution
- When signals are received
- Before potentially long-running operations
// Creating a checkpoint
stateManager.createCheckpoint({
type: 'node-execution-start',
metadata: {
nodeId: 'validate-payment',
input: { orderId: '12345' },
},
});State Recovery
State recovery in ZoopFlow is primarily handled by Temporal's replay mechanism:
- During replay, Temporal reconstructs the workflow state by re-executing the workflow code
- Deterministic execution ensures the state is identical to the original execution
- The WorkflowStateManager automatically maintains consistent state during replay
State Access Patterns and Best Practices
Common State Access Patterns
-
Direct Variable Access: Use standard variables for sequential data passing between steps.
const resultA = await stepA.execute(input, context); const resultB = await stepB.execute({ data: resultA.output }, context); -
Context State for Persistence: Use context state for data that needs to be accessible from multiple points.
await context.setState('intermediateResult', resultA.output); // Later or in another part of the workflow: const data = await context.getState('intermediateResult'); -
State as Workflow Progress Indicator: Use state to track overall progress.
await context.setState('progress', { completed: 3, total: 10, percentage: 30, stage: 'data-validation' }); -
State for External Visibility: Use state to expose workflow progress to external systems.
// Define query handler context.workflow.handleQuery('getProgress', async () => { return await context.getState('progress'); });
Best Practices
-
State Immutability:
- Always create new state objects instead of mutating existing ones
- Use immutable update patterns
- Avoid direct references to mutable objects
-
State Size Management:
- Keep state size manageable (avoid storing large datasets in flow state)
- Store large data in external systems
- Use references instead of embedding large objects
-
Deterministic State Updates:
- Ensure all state updates are deterministic
- Use Temporal's deterministic functions for timestamps and IDs
- Avoid non-deterministic operations in state updates
-
State Scope Selection:
- Use workflow-scoped state by default
- Only use global state when necessary for cross-workflow communication
- Set appropriate TTL for global state to prevent storage leaks
-
Error Handling:
- Implement error handling for state operations
- Consider state consistency during compensating actions
- Save error states to aid in debugging
State Management with Temporal
ZoopFlow builds on Temporal's state management capabilities:
Leveraging Temporal's Core Features
- Event Sourcing: Temporal records all events in an append-only history
- Deterministic Replay: Guarantees consistent state reconstruction
- Checkpointing: Automatic state capture at activity boundaries
- Durable Execution: Ensures workflow progress survives failures
Integration with Temporal Primitives
ZoopFlow exposes Temporal's messaging primitives for state interaction:
Signals
Signals allow external systems to send asynchronous, one-way messages to update workflow state:
// In the workflow
context.workflow.handleSignal<string>('updateStatus', async (newStatus) => {
await context.setState('status', newStatus);
});
// External client
await client.getHandle(workflowId).signal('updateStatus', 'APPROVED');Queries
Queries enable read-only access to workflow state:
// In the workflow
context.workflow.handleQuery<[], { status: string, progress: number }>('getWorkflowStatus', async () => {
const status = await context.getState<string>('status');
const progress = await context.getState<number>('progress');
return { status, progress };
});
// External client
const status = await client.getHandle(workflowId).query('getWorkflowStatus');Updates
Updates provide synchronous, transactional state modifications:
// In the workflow
context.workflow.handleUpdate<[string], string[]>('addItemToCart', {
execute: async (item) => {
const cart = (await context.getState<string[]>('cartItems')) || [];
cart.push(item);
await context.setState('cartItems', cart);
return cart;
}
});
// External client
const updatedCart = await client.getHandle(workflowId).update('addItemToCart', 'productXYZ');Advanced State Management Patterns
1. Saga Pattern Implementation
Using state to track saga execution and compensating actions:
async function orderSaga(input, context) {
// Initialize saga state
await context.setState('sagaState', {
steps: [
{ name: 'reserveInventory', status: 'pending', compensate: true },
{ name: 'processPayment', status: 'pending', compensate: true },
{ name: 'scheduleShipping', status: 'pending', compensate: false }
],
currentStep: 0,
compensating: false
});
try {
// Execute saga steps
await executeStep('reserveInventory', reserveInventoryStep, input, context);
await executeStep('processPayment', processPaymentStep, input, context);
await executeStep('scheduleShipping', scheduleShippingStep, input, context);
// Mark saga as complete
const sagaState = await context.getState('sagaState');
await context.setState('sagaState', { ...sagaState, status: 'completed' });
return { status: 'success' };
} catch (error) {
// Compensation logic
const sagaState = await context.getState('sagaState');
await context.setState('sagaState', { ...sagaState, compensating: true });
// Compensate completed steps in reverse order
for (let i = sagaState.currentStep - 1; i >= 0; i--) {
const step = sagaState.steps[i];
if (step.status === 'completed' && step.compensate) {
await executeCompensation(step.name, input, context);
}
}
throw error;
}
}
async function executeStep(stepName, step, input, context) {
// Get saga state
const sagaState = await context.getState('sagaState');
const stepIndex = sagaState.steps.findIndex(s => s.name === stepName);
// Update saga state
sagaState.steps[stepIndex].status = 'executing';
sagaState.currentStep = stepIndex;
await context.setState('sagaState', sagaState);
// Execute step
await step.execute(input, context);
// Update saga state after completion
const updatedState = await context.getState('sagaState');
updatedState.steps[stepIndex].status = 'completed';
await context.setState('sagaState', updatedState);
}2. State-Based Correlation
Using state to correlate related events and workflows:
// Store correlation identifier
await context.setGlobalState(`correlation:${correlationId}`, {
workflows: [workflowId],
status: 'active',
createdAt: new Date().toISOString()
});
// Add another workflow to correlation
const correlation = await context.getGlobalState(`correlation:${correlationId}`);
correlation.workflows.push(newWorkflowId);
await context.setGlobalState(`correlation:${correlationId}`, correlation);3. State Versioning
Managing state schema evolution:
// Check state version
const state = await context.getState('myData');
const stateVersion = state?.version || 1;
// Handle different versions
if (stateVersion === 1) {
// Handle v1 schema
const migratedState = migrateV1ToV2(state);
await context.setState('myData', {
...migratedState,
version: 2
});
}
// Current code works with v2 schema
const currentState = await context.getState('myData');Examples
Basic State Management Example
import { defineFlow, defineStep } from 'zoopflow';
// Define a payment processing step
const processPaymentStep = defineStep({
name: 'processPayment',
execute: async (input, context) => {
// Use input to process payment
const result = await paymentService.process(input.orderId, input.amount);
// Store result in workflow state for later use
await context.setState('paymentResult', result);
return {
success: result.status === 'approved',
transactionId: result.transactionId
};
}
});
// Define an order fulfillment flow
const orderFlow = defineFlow({
name: 'orderFulfillment',
steps: async (input, context) => {
// Initialize workflow state
await context.setState('orderStatus', 'PROCESSING');
await context.setState('orderHistory', [
{ status: 'RECEIVED', timestamp: new Date().toISOString() }
]);
// Process payment
const paymentResult = await processPaymentStep.execute(
{ orderId: input.orderId, amount: input.total },
context
);
// Update order status based on payment result
if (paymentResult.success) {
await context.setState('orderStatus', 'PAYMENT_APPROVED');
// Append to order history
const history = await context.getState<any[]>('orderHistory');
await context.setState('orderHistory', [
...history,
{ status: 'PAYMENT_APPROVED', timestamp: new Date().toISOString() }
]);
// Continue with order processing...
return { status: 'success', orderId: input.orderId };
} else {
await context.setState('orderStatus', 'PAYMENT_FAILED');
// Append to order history
const history = await context.getState<any[]>('orderHistory');
await context.setState('orderHistory', [
...history,
{ status: 'PAYMENT_FAILED', timestamp: new Date().toISOString() }
]);
return { status: 'failed', orderId: input.orderId, reason: 'payment_declined' };
}
}
});Human Interaction with State Management
const approvalWorkflow = defineFlow({
name: 'documentApproval',
steps: async (input, context) => {
// Initialize approval state
await context.setState('approvalState', {
status: 'PENDING',
document: input.documentId,
assignedTo: input.reviewerId,
createdAt: new Date().toISOString(),
comments: []
});
// Set up signal handler for document approval
context.workflow.handleSignal<{ approved: boolean, comment: string }>('approvalDecision', async (decision) => {
const approvalState = await context.getState<any>('approvalState');
const newState = {
...approvalState,
status: decision.approved ? 'APPROVED' : 'REJECTED',
decidedAt: new Date().toISOString(),
comments: [
...approvalState.comments,
{ text: decision.comment, timestamp: new Date().toISOString() }
]
};
await context.setState('approvalState', newState);
});
// Set up query for current approval state
context.workflow.handleQuery<[], any>('getApprovalState', async () => {
return await context.getState('approvalState');
});
// Wait for approval decision
context.log('Waiting for approval decision...');
await context.waitForCondition(async () => {
const state = await context.getState<any>('approvalState');
return state.status !== 'PENDING';
});
// Get final state
const finalState = await context.getState<any>('approvalState');
context.log(`Document ${finalState.status}`);
return {
documentId: input.documentId,
approved: finalState.status === 'APPROVED',
comments: finalState.comments
};
}
});Distributed State Management
const distributedProcessingFlow = defineFlow({
name: 'batchDataProcessing',
steps: async (input, context) => {
// Initialize processing state
await context.setState('processingState', {
batchId: input.batchId,
totalItems: input.items.length,
processedItems: 0,
results: [],
errors: [],
startTime: new Date().toISOString()
});
// Process items in parallel with state updates
const chunks = chunkArray(input.items, 10); // Process in chunks of 10
for (const [chunkIndex, chunk] of chunks.entries()) {
context.log(`Processing chunk ${chunkIndex + 1}/${chunks.length}`);
// Process chunk in parallel
const results = await Promise.all(
chunk.map(async (item) => {
try {
const result = await processItemStep.execute({ item }, context);
// Update shared state atomically
await context.workflow.updateState(async () => {
const state = await context.getState<any>('processingState');
return {
...state,
processedItems: state.processedItems + 1,
results: [...state.results, { item, result, success: true }]
};
});
return { item, result, success: true };
} catch (error) {
// Update shared state with error
await context.workflow.updateState(async () => {
const state = await context.getState<any>('processingState');
return {
...state,
processedItems: state.processedItems + 1,
errors: [...state.errors, { item, error: error.message }]
};
});
return { item, error: error.message, success: false };
}
})
);
// Update progress for external visibility
const currentState = await context.getState<any>('processingState');
await context.setState('progress', {
processed: currentState.processedItems,
total: currentState.totalItems,
percentage: Math.round((currentState.processedItems / currentState.totalItems) * 100)
});
}
// Finalize state
const finalState = await context.getState<any>('processingState');
await context.setState('processingState', {
...finalState,
status: 'COMPLETED',
endTime: new Date().toISOString(),
duration: (new Date().getTime() - new Date(finalState.startTime).getTime()) / 1000
});
return {
batchId: input.batchId,
processed: finalState.processedItems,
successful: finalState.results.length,
failed: finalState.errors.length,
duration: (new Date().getTime() - new Date(finalState.startTime).getTime()) / 1000
};
}
});By following these state management practices and patterns, ZoopFlow developers can build robust, stateful workflows that maintain data integrity across long-running processes, system failures, and complex execution paths.