Flow Interpreter API
This page documents the API for the Flow Interpreter system, which is responsible for loading, validating, and executing flow definitions.
Core Interfaces
IFlowInterpreter
The primary interface for loading and validating flow definitions.
interface IFlowInterpreter {
/**
* Load a flow definition from a JSON string or object
* @param flowDefinition - The flow definition as a JSON string or object
* @returns The loaded flow graph definition
* @throws If the flow definition is invalid
*/
loadFlow<TInput = unknown, TOutput = unknown>(
flowDefinition: string | Record<string, unknown>,
): FlowGraphDefinition<TInput, TOutput>;
/**
* Validate a flow definition
* @param flowDefinition - The flow definition to validate
* @returns An object indicating if the flow is valid and any validation errors
*/
validateFlow(flowDefinition: FlowGraphDefinition): {
valid: boolean;
errors?: string[]
};
}IFlowExecutionEngine
Responsible for executing flow definitions.
interface IFlowExecutionEngine {
/**
* Execute a flow with the given input data
* @param flow - The flow definition to execute
* @param input - The input data for the flow
* @param options - Execution options
* @returns A promise that resolves to the flow execution result
*/
executeFlow<TInput = unknown, TOutput = unknown>(
flow: FlowGraphDefinition<TInput, TOutput>,
input: TInput,
options?: FlowExecutionOptions,
): Promise<FlowExecutionResult<TOutput>>;
/**
* Execute a specific node in a flow, typically for testing or debugging
* @param nodeId - The ID of the node to execute
* @param flow - The flow definition
* @param input - The input data for the node
* @param options - Execution options
* @param parentContext - Optional parent context if called recursively
* @returns A promise that resolves with the execution result
*/
executeNode<TInput = unknown, TOutput = unknown>(
nodeId: string,
flow: FlowGraphDefinition<unknown, unknown>,
input: TInput,
options?: FlowExecutionOptions,
parentContext?: StepContext,
): Promise<FlowExecutionResult<TOutput>>;
}IFlowExecutionContext
Provides access to the current state of flow execution.
interface IFlowExecutionContext extends StepContext {
/**
* The current node ID being executed
*/
readonly currentNodeId: string;
/**
* The flow definition being executed
*/
readonly flow: FlowGraphDefinition;
/**
* The node handler registry
*/
readonly nodeHandlerRegistry: INodeHandlerRegistry;
/**
* The execution path (node IDs in order of execution)
*/
readonly executionPath: string[];
/**
* Get the current node definition
* @returns The current node definition
*/
getCurrentNode(): FlowNodeDefinition;
/**
* Set the next node IDs to execute
* @param nodeIds - The IDs of the next nodes to execute
*/
setNextNodeIds(nodeIds: string[]): void;
/**
* Get the next node IDs to execute
* @returns The next node IDs
*/
getNextNodeIds(): string[];
/**
* Clear the next node IDs
*/
clearNextNodeIds(): void;
/**
* Set data for a node
* @param nodeId - The ID of the node
* @param data - The data to store
*/
setNodeData(nodeId: string, data: unknown): void;
/**
* Get data for a node
* @param nodeId - The ID of the node
* @returns The stored data, or undefined if not found
*/
getNodeData<T>(nodeId: string): T | undefined;
/**
* Set context data
* @param key - The key to store the data under
* @param value - The data to store
*/
setContextData(key: string, value: unknown): void;
/**
* Get context data
* @param key - The key to retrieve data for
* @returns The stored data, or undefined if not found
*/
getContextData<T>(key: string): T | undefined;
}INodeHandler
Implements execution logic for a specific type of node.
interface INodeHandler {
/**
* The type of node this handler can process
*/
readonly nodeType: string;
/**
* Execute a node
* @param nodeId - The ID of the node to execute
* @param flow - The flow definition
* @param input - The input data for the node
* @param context - The execution context
* @returns A promise that resolves to the node execution result
*/
executeNode<TInput = unknown, TOutput = unknown>(
nodeId: string,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<{
output: TOutput;
nextNodeIds: string[];
}>;
}INodeHandlerRegistry
Manages node handlers for different node types.
interface INodeHandlerRegistry {
/**
* Register a node handler
* @param handler - The node handler to register
*/
registerHandler(handler: INodeHandler): void;
/**
* Get a node handler for the specified node type
* @param nodeType - The type of node to get a handler for
* @returns The node handler for the specified type, or undefined if not found
*/
getHandler(nodeType: string): INodeHandler | undefined;
/**
* Check if a handler exists for the specified node type
* @param nodeType - The type of node to check
* @returns True if a handler exists, false otherwise
*/
hasHandler(nodeType: string): boolean;
/**
* Get all registered node handlers
* @returns An array of all registered node handlers
*/
getAllHandlers(): INodeHandler[];
}IStepRegistry
Manages step implementations that can be referenced in flow definitions.
interface IStepRegistry {
/**
* Register a step implementation
* @param type - The type identifier for the step
* @param step - The step implementation
* @param version - Optional version for the step
*/
registerStep<TInput = unknown, TOutput = unknown>(
type: string,
step: StepDefinition<TInput, TOutput>,
version?: string,
): void;
/**
* Get a step implementation by type and optional version
* @param type - The type identifier for the step
* @param version - Optional version for the step
* @returns The step implementation, or undefined if not found
*/
getStep<TInput = unknown, TOutput = unknown>(
type: string,
version?: string,
): StepDefinition<TInput, TOutput> | undefined;
/**
* Check if a step implementation exists for the given type and optional version
* @param type - The type identifier to check
* @param version - Optional version to check
* @returns True if a step implementation exists, false otherwise
*/
hasStep(type: string, version?: string): boolean;
/**
* Validate that all steps referenced in a flow are available in the registry
* @param flow - The flow definition to validate
* @returns Object with validation result and missing steps if any
*/
validateFlowSteps(flow: FlowGraphDefinition): {
valid: boolean;
missingSteps?: string[];
};
}Type Definitions
FlowExecutionOptions
Options for flow execution.
interface FlowExecutionOptions {
/**
* Timeout in milliseconds for the entire flow execution
*/
timeoutMs?: number;
/**
* Whether to collect execution metrics
*/
collectMetrics?: boolean;
/**
* Custom context data to be passed to steps
*/
contextData?: Record<string, unknown>;
}FlowExecutionResult
Result of flow execution.
interface FlowExecutionResult<TOutput = unknown> {
/**
* Whether the flow execution was successful
*/
success: boolean;
/**
* The output data produced by the flow execution
*/
output?: TOutput;
/**
* Error information if the flow execution failed
*/
error?: Error;
/**
* Execution metrics and statistics
*/
metrics?: {
/**
* Total execution time in milliseconds
*/
executionTimeMs: number;
/**
* Number of nodes executed
*/
nodesExecuted: number;
/**
* Execution path (node IDs in order of execution)
*/
executionPath: string[];
};
/**
* Next node IDs (only used for executeNode results)
*/
nextNodeIds?: string[];
}Error Types
FlowExecutionError
Base error type for flow execution errors.
class FlowExecutionError extends Error {
constructor(
message: string,
public readonly context?: {
flowId?: string;
executionId?: string;
nodeId?: string;
nodeType?: string;
executionPath?: string[];
timestamp?: Date;
category?: ErrorCategory;
severity?: ErrorSeverity;
cause?: Error;
},
) {
super(message);
this.name = 'FlowExecutionError';
}
}FlowTimeoutError
Error thrown when a flow execution times out.
class FlowTimeoutError extends FlowExecutionError {
constructor(
message: string,
public readonly timeoutMs: number,
context?: Record<string, unknown>,
) {
super(message, context);
this.name = 'FlowTimeoutError';
}
}Implementations
FlowInterpreter
Concrete implementation of the IFlowInterpreter interface.
class FlowInterpreter implements IFlowInterpreter {
public loadFlow<TInput, TOutput>(
flowDefinition: string | Record<string, unknown>,
): FlowGraphDefinition<TInput, TOutput>;
public validateFlow(
flowDefinition: FlowGraphDefinition,
): { valid: boolean; errors?: string[] };
private parseFlowDefinition(
flowDefinitionJson: string,
): Record<string, unknown>;
private validateFlowIntegrity(
flowGraph: FlowGraphDefinition,
): string[];
private validateEntryNode(
flowGraph: FlowGraphDefinition,
errors: string[],
): void;
private validateExitNodes(
flowGraph: FlowGraphDefinition,
errors: string[],
): void;
private validateEdgeReferences(
flowGraph: FlowGraphDefinition,
errors: string[],
): void;
private validateOrphanedNodes(
flowGraph: FlowGraphDefinition,
errors: string[],
): void;
private validateNoCycles(
flowGraph: FlowGraphDefinition,
errors: string[],
): void;
private detectCycles(
flowGraph: FlowGraphDefinition,
): { hasCycle: boolean; cycle: string[] };
}FlowExecutionEngine
Concrete implementation of the IFlowExecutionEngine interface.
class FlowExecutionEngine implements IFlowExecutionEngine {
constructor(
private readonly nodeHandlerRegistry: INodeHandlerRegistry,
) {}
public async executeFlow<TInput, TOutput>(
flow: FlowGraphDefinition<TInput, TOutput>,
input: TInput,
options: FlowExecutionOptions = {},
): Promise<FlowExecutionResult<TOutput>>;
private async executeFlowInternal<TInput, TOutput>(
flow: FlowGraphDefinition<TInput, TOutput>,
input: TInput,
context: FlowExecutionContext,
): Promise<TOutput>;
public async executeNode<TInput, TOutput>(
nodeId: string,
flow: FlowGraphDefinition<unknown, unknown>,
input: TInput,
options?: FlowExecutionOptions,
parentContext?: StepContext,
): Promise<FlowExecutionResult<TOutput>>;
private async executeNodeInternal<TInput, TOutput>(
nodeId: string,
flow: FlowGraphDefinition<unknown, unknown>,
input: TInput,
context: FlowExecutionContext,
): Promise<{ output: TOutput; nextNodeIds: string[] }>;
private async executeBeforeNodeHooks(
nodeId: string,
flow: FlowGraphDefinition<unknown, unknown>,
input: unknown,
context: FlowExecutionContext,
): Promise<void>;
private async executeAfterNodeHooks(
nodeId: string,
flow: FlowGraphDefinition<unknown, unknown>,
input: unknown,
output: unknown,
context: FlowExecutionContext,
): Promise<void>;
}NodeHandlerRegistry
Concrete implementation of the INodeHandlerRegistry interface.
class NodeHandlerRegistry implements INodeHandlerRegistry {
private readonly handlers: Map<string, INodeHandler> = new Map();
public registerHandler(handler: INodeHandler): void;
public getHandler(nodeType: string): INodeHandler | undefined;
public hasHandler(nodeType: string): boolean;
public getAllHandlers(): INodeHandler[];
public clearHandlers(): void;
}Node Handlers
BaseNodeHandler
Base class for node handlers.
abstract class BaseNodeHandler implements INodeHandler {
public abstract readonly nodeType: string;
public async executeNode<TInput, TOutput>(
nodeId: string,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<{
output: TOutput;
nextNodeIds: string[];
}>;
protected abstract executeNodeType<TInput, TOutput>(
node: FlowNodeDefinition,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<TOutput>;
protected getOutgoingEdges(
flow: FlowGraphDefinition,
nodeId: string,
): FlowEdgeDefinition[];
}TaskNodeHandler
Handles execution of task nodes.
class TaskNodeHandler extends BaseNodeHandler {
public readonly nodeType = 'task';
constructor(private readonly stepRegistry: StepRegistry) {
super();
}
protected async executeNodeType<TInput, TOutput>(
node: FlowNodeDefinition,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<TOutput>;
}ConditionNodeHandler
Handles execution of condition nodes.
class ConditionNodeHandler extends BaseNodeHandler {
public readonly nodeType = 'condition';
protected async executeNodeType<TInput, TOutput>(
node: FlowNodeDefinition,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<TOutput>;
}ParallelNodeHandler
Handles execution of parallel nodes.
class ParallelNodeHandler extends BaseNodeHandler {
public readonly nodeType = 'parallel';
constructor(private readonly flowExecutionEngine: IFlowExecutionEngine) {
super();
}
protected async executeNodeType<TInput, TOutput>(
node: FlowNodeDefinition,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<TOutput>;
}Usage Examples
Loading and Validating a Flow
import { FlowInterpreter } from '@zoopflow/core/interpreter';
// Create a new interpreter
const interpreter = new FlowInterpreter();
// Load a flow definition from JSON
const flowJson = `{
"id": "example-flow",
"version": "1.0.0",
"nodes": [
{ "id": "start", "type": "start", "name": "Start" },
{ "id": "task1", "type": "task", "name": "Task 1", "config": { "step": { "type": "example-step" } } },
{ "id": "end", "type": "end", "name": "End" }
],
"edges": [
{ "id": "start-to-task1", "source": "start", "target": "task1", "type": "default" },
{ "id": "task1-to-end", "source": "task1", "target": "end", "type": "default" }
],
"entryNodeId": "start",
"exitNodeIds": ["end"]
}`;
try {
// Load and validate the flow
const flowDefinition = interpreter.loadFlow(flowJson);
console.log('Flow loaded successfully:', flowDefinition.id);
} catch (error) {
console.error('Error loading flow:', error.message);
}Executing a Flow
import { FlowExecutionEngine, NodeHandlerRegistry } from '@zoopflow/core/interpreter';
import { StartNodeHandler, EndNodeHandler, TaskNodeHandler } from '@zoopflow/core/interpreter/handlers';
import { StepRegistry } from '@zoopflow/core/interpreter';
// Create the node handler registry
const nodeHandlerRegistry = new NodeHandlerRegistry();
// Create the step registry
const stepRegistry = new StepRegistry();
// Register handlers for different node types
nodeHandlerRegistry.registerHandler(new StartNodeHandler());
nodeHandlerRegistry.registerHandler(new EndNodeHandler());
nodeHandlerRegistry.registerHandler(new TaskNodeHandler(stepRegistry));
// Create the execution engine
const executionEngine = new FlowExecutionEngine(nodeHandlerRegistry);
// Execute a flow
const flowDefinition = {
id: 'example-flow',
version: '1.0.0',
nodes: [
{ id: 'start', type: 'start', name: 'Start' },
{ id: 'task1', type: 'task', name: 'Task 1', config: { step: { type: 'example-step' } } },
{ id: 'end', type: 'end', name: 'End' }
],
edges: [
{ id: 'start-to-task1', source: 'start', target: 'task1', type: 'default' },
{ id: 'task1-to-end', source: 'task1', target: 'end', type: 'default' }
],
entryNodeId: 'start',
exitNodeIds: ['end']
};
const input = {
value: 42
};
// Execute the flow
executionEngine.executeFlow(flowDefinition, input, {
timeoutMs: 30000,
collectMetrics: true
})
.then(result => {
if (result.success) {
console.log('Flow executed successfully:', result.output);
console.log('Metrics:', result.metrics);
} else {
console.error('Flow execution failed:', result.error?.message);
}
})
.catch(error => {
console.error('Error executing flow:', error);
});Creating a Custom Node Handler
import { BaseNodeHandler } from '@zoopflow/core/interpreter/handlers';
import { FlowGraphDefinition, FlowNodeDefinition } from '@zoopflow/core/interfaces/flow-graph';
import { StepContext } from '@zoopflow/core/interfaces/step-context';
class CustomNodeHandler extends BaseNodeHandler {
public readonly nodeType = 'custom';
protected async executeNodeType<TInput, TOutput>(
node: FlowNodeDefinition,
flow: FlowGraphDefinition,
input: TInput,
context: StepContext,
): Promise<TOutput> {
// Custom node execution logic
console.log(`Executing custom node: ${node.id}`);
// Perform custom operations
const result = {
...input,
customField: 'custom value',
};
return result as unknown as TOutput;
}
}
// Register the custom handler
nodeHandlerRegistry.registerHandler(new CustomNodeHandler());Browser Support
The Flow Interpreter System is designed to work in both Node.js and browser environments, with some considerations:
- In Node.js, all features are fully supported
- In browsers, some features like file system operations for loading steps from directories are not available
- When using in browsers, prefer to manually register steps rather than loading from directories
See Also
- Flow Definition System - Learn how to define flows
- Flow Interpreter System - Comprehensive documentation of the Flow Interpreter
- Step API - API for defining steps used by the Flow Interpreter