Flow Interpreter System
The Flow Interpreter System is a central component of ZoopFlow that handles the loading, validation, and execution of flow definitions. It translates the declarative flow definitions into executable runtime instructions and orchestrates the execution of workflows.
Overview
The Flow Interpreter System serves as the bridge between flow definitions and their actual execution. It performs several critical functions:
- Flow Definition Loading: Loads flow definitions from JSON or object representations
- Flow Validation: Ensures flow definitions are well-formed and valid
- Flow Execution: Orchestrates the execution of nodes within a flow
- State Management: Maintains execution state throughout the workflow
- Error Handling: Manages errors and timeouts during flow execution
Architecture Components
The Flow Interpreter System consists of several key components that work together to interpret and execute flows:
1. Flow Interpreter
The FlowInterpreter is responsible for loading and validating flow definitions:
- Parses flow definitions from JSON strings or objects
- Validates the structure and integrity of flow graphs
- Checks for valid entry and exit nodes
- Ensures all node references are valid
- Detects and prevents cycles in the flow graph
- Identifies orphaned nodes
// Core functionality of the Flow Interpreter
public loadFlow<TInput = unknown, TOutput = unknown>(
flowDefinition: string | Record<string, unknown>,
): FlowGraphDefinition<TInput, TOutput> {
// Parse the flow definition if it's a string
const parsedFlow = typeof flowDefinition === 'string'
? this.parseFlowDefinition(flowDefinition)
: flowDefinition;
// Validate the flow definition
const validationResult = this.validateFlow(parsedFlow as unknown as FlowGraphDefinition);
if (!validationResult.valid) {
throw new Error(`Invalid flow definition: ${validationResult.errors?.join(', ')}`);
}
return parsedFlow as unknown as FlowGraphDefinition<TInput, TOutput>;
}2. Flow Execution Engine
The FlowExecutionEngine orchestrates the execution of flow definitions:
- Manages the execution context and state
- Handles node execution through appropriate handlers
- Processes execution paths through the flow graph
- Manages timeouts and execution limits
- Collects execution metrics
- Provides detailed error information
// Core functionality of the Flow Execution Engine
public async executeFlow<TInput = unknown, TOutput = unknown>(
flow: FlowGraphDefinition<TInput, TOutput>,
input: TInput,
options: FlowExecutionOptions = {},
): Promise<FlowExecutionResult<TOutput>> {
// Create execution context
const context = new FlowExecutionContext(
flow as FlowGraphDefinition<unknown, unknown>,
this.nodeHandlerRegistry,
);
try {
// Execute the flow
const result = await this.executeFlowInternal<TInput, TOutput>(
flow,
input,
context
);
// Return successful result with metrics
return {
success: true,
output: result,
metrics: options.collectMetrics ? {
executionTimeMs: context.executionTimeMs,
nodesExecuted: context.nodesExecuted,
executionPath: context.executionPath,
} : undefined,
};
} catch (error) {
// Return error result with metrics
return {
success: false,
error: error instanceof Error ? error : new Error(String(error)),
metrics: options.collectMetrics ? {
executionTimeMs: context.executionTimeMs,
nodesExecuted: context.nodesExecuted,
executionPath: context.executionPath,
} : undefined,
};
}
}3. Flow Execution Context
The FlowExecutionContext maintains the state of flow execution:
- Tracks the current node and execution path
- Stores node data and context data
- Provides logging capabilities
- Manages state access and persistence
- Collects execution metrics
- Maintains history of execution
// Key interfaces of the Flow Execution Context
interface IFlowExecutionContext extends StepContext {
readonly currentNodeId: string;
readonly flow: FlowGraphDefinition;
readonly nodeHandlerRegistry: INodeHandlerRegistry;
readonly executionPath: string[];
getCurrentNode(): unknown;
setNextNodeIds(nodeIds: string[]): void;
}4. Node Handler Registry
The NodeHandlerRegistry manages handlers for different node types:
- Registers handlers for specific node types
- Retrieves appropriate handlers during execution
- Manages handler lifecycle and discovery
// NodeHandlerRegistry implementation
export class NodeHandlerRegistry implements INodeHandlerRegistry {
private readonly handlers: Map<string, INodeHandler> = new Map();
public registerHandler(handler: INodeHandler): void {
if (this.handlers.has(handler.nodeType)) {
throw new Error(`Handler for node type '${handler.nodeType}' is already registered`);
}
this.handlers.set(handler.nodeType, handler);
}
public getHandler(nodeType: string): INodeHandler | undefined {
return this.handlers.get(nodeType);
}
}5. Node Handlers
Node handlers are specialized components that handle the execution of specific node types:
- Start Node Handler: Handles flow entry points
- End Node Handler: Handles flow exit points
- Task Node Handler: Executes steps from the step registry
- Condition Node Handler: Implements conditional branching
- Parallel Node Handler: Manages parallel execution paths
- Join Node Handler: Synchronizes parallel execution paths
// Example of a Task Node Handler
export class TaskNodeHandler extends BaseNodeHandler {
public readonly nodeType = 'task';
constructor(private readonly stepRegistry: StepRegistry) {
super();
}
protected async executeNodeType<TInput, TOutput>(
node: FlowNodeDefinition,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<TOutput> {
// Get the step type from the node configuration
const stepConfig = node.config?.step ?? {};
const stepType = stepConfig.type;
// Get the step from the registry
const step = this.stepRegistry.getStep(stepType);
if (!step) {
throw new Error(`Step type '${stepType}' not found in registry`);
}
// Execute the step and return the result
return await step.execute(input, context, stepConfig);
}
}6. Step Registry
The StepRegistry manages step implementations that can be referenced in flow definitions:
- Registers and retrieves steps with unique identifiers
- Supports versioning for backward compatibility
- Provides validation of steps referenced in flows
- Supports lazy-loading of steps from directories and modules
// Core functionality of the Step Registry
export class StepRegistry implements IStepRegistry {
private readonly steps: Map<string, StepDefinition<unknown, unknown>> = new Map();
private readonly versionedSteps: Map<string, Map<string, StepDefinition<unknown, unknown>>> = new Map();
public registerStep<TInput = unknown, TOutput = unknown>(
type: string,
step: StepDefinition<TInput, TOutput>,
version?: string,
): void {
// Use provided version or step's version or default to '1.0.0'
const stepVersion = version || step.version || '1.0.0';
const versionedKey = `${type}@${stepVersion}`;
// Store the step with its versioned key
this.steps.set(versionedKey, step as StepDefinition<unknown, unknown>);
// Also store in the main map if it's not already there or if this is a newer version
if (!this.steps.has(type) || this.isNewerVersion(stepVersion, this.getStepVersion(type))) {
this.steps.set(type, step as StepDefinition<unknown, unknown>);
}
// Update the versioned steps map
if (!this.versionedSteps.has(type)) {
this.versionedSteps.set(type, new Map());
}
const versions = this.versionedSteps.get(type);
if (versions) {
versions.set(stepVersion, step as StepDefinition<unknown, unknown>);
}
}
public getStep<TInput = unknown, TOutput = unknown>(
type: string,
version?: string,
): StepDefinition<TInput, TOutput> | undefined {
if (version) {
// If version is specified, try to get that specific version
const versionedKey = `${type}@${version}`;
return this.steps.get(versionedKey) as StepDefinition<TInput, TOutput> | undefined;
}
// Otherwise return the latest version
return this.steps.get(type) as StepDefinition<TInput, TOutput> | undefined;
}
}Flow Execution Process
The execution of a flow through the interpreter follows these steps:
1. Loading and Validation
- The flow definition is loaded from JSON or an object representation
- The structure is validated for integrity (entry/exit nodes, cycles, etc.)
- Node references are verified for consistency
- Schema validation ensures proper input/output compatibility
2. Execution Preparation
- The execution context is created with necessary metadata
- Input data is validated against the flow's input schema
- Initial state is established for the flow
3. Node Execution
- Starting with the entry node, the flow execution engine navigates the flow graph
- For each node, the appropriate handler is retrieved from the registry
- The handler executes the node's logic (e.g., running a step for task nodes)
- Node outputs are stored in the execution context
- The handler determines the next nodes to execute
4. Error Handling
- Errors during execution are caught and wrapped with execution context
- Depending on configuration, retries may be attempted
- Error edges in the flow graph may be followed for recovery paths
- If unrecoverable, the error is propagated with detailed context
5. Completion
- When an exit node is reached, the final result is validated against the output schema
- Execution metrics are collected (execution time, nodes executed, path taken)
- The result is returned to the caller
Timeouts and Limits
The Flow Interpreter System includes several safeguards to prevent runaway executions:
- Flow Timeouts: Overall time limits for flow execution
- Step Timeouts: Time limits for individual step executions
- Maximum Execution Depth: Limits on the number of nodes executed to prevent infinite loops
- Cycle Detection: Prevention of cycles in flow graphs
// Example of timeout configuration in flow execution
public async executeFlow<TInput, TOutput>(
flow: FlowGraphDefinition<TInput, TOutput>,
input: TInput,
options: FlowExecutionOptions = {},
): Promise<FlowExecutionResult<TOutput>> {
// Set up timeout handling
let timeoutId: NodeJS.Timeout | undefined;
let timeoutPromise: Promise<never> | undefined;
if (options.timeoutMs && options.timeoutMs > 0) {
timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new FlowTimeoutError(
`Flow execution timed out after ${options.timeoutMs}ms`,
options.timeoutMs,
{ /* context details */ }
),
);
}, options.timeoutMs);
});
}
// Execute the flow with optional timeout
const result = await Promise.race([
this.executeFlowInternal<TInput, TOutput>(flow, input, context),
...(timeoutPromise ? [timeoutPromise] : []),
]);
// Clear timeout if it was set
if (timeoutId) {
clearTimeout(timeoutId);
}
return result;
}Error Handling
The Flow Interpreter System provides comprehensive error handling:
-
Error Types:
- FlowExecutionError: General errors during flow execution
- FlowTimeoutError: Timeouts during execution
- FlowValidationError: Errors in flow definition validation
-
Error Context:
- Flow ID and version
- Execution ID
- Current node information
- Execution path at the time of error
- Original error cause (if available)
-
Error Recovery:
- Optional error edges in flow graphs
- Flow-level error handlers
- Step-level retry configurations
// Example of error handling in flow execution
try {
// Execute node
const nodeResult = await handler.executeNode<unknown, TOutput>(
currentNodeId,
flow as FlowGraphDefinition<unknown, unknown>,
currentInput,
context as StepContext,
);
// Store the node output
result = nodeResult.output;
context.setNodeData(currentNodeId, result);
// Set the next nodes to execute
context.setNextNodeIds(nodeResult.nextNodeIds);
} catch (error) {
// Create rich error with context
throw new FlowExecutionError(
`Error executing node '${currentNodeId}': ${(error as Error).message}`,
{
flowId: flow.id,
executionId: context.info.executionId,
executionPath: context.executionPath,
nodeId: currentNodeId,
nodeType: currentNode.type,
cause: error instanceof Error ? error : undefined,
timestamp: new Date(),
},
);
}Integration with Other Systems
The Flow Interpreter System integrates with several other components in ZoopFlow:
- Flow Definition System: Consumes flow definitions for execution
- Step Definition System: Uses steps registered in the Step Registry
- Validation System: Leverages schema validation for inputs and outputs
- Error Handling System: Uses unified error handling for consistent error reporting
- Temporal Bridge: Provides executed flows to the Temporal integration layer
Best Practices
When working with the Flow Interpreter System, consider these best practices:
-
Flow Design:
- Avoid cycles in flow graphs
- Keep flows focused and avoid excessive nesting
- Use proper error handling edges for recovery paths
- Set appropriate timeouts for flows and steps
-
Performance Optimization:
- Use parallel execution for independent operations
- Minimize state size to reduce serialization overhead
- Consider execution metrics for performance tuning
- Use appropriate node types for different operations
-
Testing:
- Test flows with different input combinations
- Verify error handling paths
- Check timeout behavior
- Test with realistic data volumes
-
Extensions:
- Create custom node handlers for specialized functionality
- Implement step factories for dynamic step creation
- Use hooks for cross-cutting concerns (when implemented)
API Reference
Key Interfaces
// Flow Interpreter Interface
interface IFlowInterpreter {
loadFlow<TInput = unknown, TOutput = unknown>(
flowDefinition: string | Record<string, unknown>,
): FlowGraphDefinition<TInput, TOutput>;
validateFlow(flowDefinition: FlowGraphDefinition): {
valid: boolean;
errors?: string[]
};
}
// Flow Execution Engine Interface
interface IFlowExecutionEngine {
executeFlow<TInput = unknown, TOutput = unknown>(
flow: FlowGraphDefinition<TInput, TOutput>,
input: TInput,
options?: FlowExecutionOptions,
): Promise<FlowExecutionResult<TOutput>>;
}
// Node Handler Interface
interface INodeHandler {
readonly nodeType: string;
executeNode<TInput = unknown, TOutput = unknown>(
nodeId: string,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<{
output: TOutput;
nextNodeIds: string[];
}>;
}
// Flow Execution Result
interface FlowExecutionResult<TOutput = unknown> {
success: boolean;
output?: TOutput;
error?: Error;
metrics?: {
executionTimeMs: number;
nodesExecuted: number;
executionPath: string[];
};
}Common Error Types
// Flow Execution Error
class FlowExecutionError extends Error {
constructor(
message: string,
public readonly context?: {
flowId?: string;
executionId?: string;
nodeId?: string;
nodeType?: string;
executionPath?: string[];
timestamp?: Date;
category?: ErrorCategory;
severity?: ErrorSeverity;
cause?: Error;
},
) {
super(message);
this.name = 'FlowExecutionError';
}
}
// Flow Timeout Error
class FlowTimeoutError extends FlowExecutionError {
constructor(
message: string,
public readonly timeoutMs: number,
context?: Record<string, unknown>,
) {
super(message, context);
this.name = 'FlowTimeoutError';
}
}Future Enhancements
The Flow Interpreter System has some planned enhancements:
- Execution Hooks: Before/after node execution hooks for cross-cutting concerns
- Better Logging: Replacement of console.log statements with proper logging system
- Advanced Metrics: More detailed execution metrics and performance tracking
- Enhanced Visualization: Tools for visualizing flow execution paths and state changes
- Distributed Execution: Support for distributed execution of flow nodes
Conclusion
The Flow Interpreter System is a core component of ZoopFlow that enables the execution of flow definitions. By separating the concerns of flow definition from execution, it provides a flexible, robust framework for orchestrating complex workflows with proper validation, error handling, and state management.