Flow Definition System
Overview
The Flow Definition System is the core mechanism in ZoopFlow that enables developers to define, validate, and execute workflow processes. It provides a declarative approach to designing workflows by composing individual steps into cohesive processes with proper state management, error handling, and validation.
ZoopFlow supports two formats for defining workflows:
- Sequential Flow Definition: A linear sequence of steps executed in order
- Graph-based Flow Definition: A more flexible representation using nodes and edges for complex workflows with branching, conditions, and parallel execution paths
The Flow Definition System ensures that:
- All workflow steps have compatible inputs and outputs
- Proper validation occurs at both flow and step levels
- Workflows can be versioned and evolved over time
- Complex orchestration patterns are supported
- Execution state is properly managed and accessible
- Proper integration with Temporal.io occurs under the hood
Flow Structure and Components
Sequential Flow Structure
A sequential flow definition is composed of:
- Metadata: ID, version, description, category
- Schemas: Input and output validation schemas
- Steps Array: An ordered sequence of step definitions to execute
- Configuration: Timeout and other execution options
- Error Handling: Flow-level error handling logic
Graph-based Flow Structure
The graph-based flow definition consists of:
- Nodes: Individual steps or decision points in the workflow (Start, End, Task, Condition, Parallel, Join)
- Edges: Connections between nodes, defining the flow of execution (Default, Conditional, Error)
- Entry Node: The starting point of the workflow
- Exit Nodes: Possible ending points of the workflow
interface FlowGraphDefinition<TInput = unknown, TOutput = unknown> extends BaseDefinition {
nodes: FlowNodeDefinition[];
edges: FlowEdgeDefinition[];
entryNodeId: string;
exitNodeIds: string[];
// ... other properties
}Creating a Flow
Sequential Flow Definition
The primary way to create a flow is using the defineFlow function:
// src/flows/user-onboarding.flow.ts
import { defineFlow, schema } from '@zoopflow/core';
import { createUserAccount } from '../steps/create-user-account.step';
import { setupUserPreferences } from '../steps/setup-user-preferences.step';
import { sendWelcomeEmail } from '../steps/send-welcome-email.step';
import { conditionalStep } from '../steps/control-flow/conditional.step';
export const userOnboardingFlow = defineFlow({
id: 'users.onboarding',
version: '1.0.0',
description: 'Onboards a new user into the system',
category: 'User Management',
// Define input schema
input_schema: schema().object({
email: schema().string({ format: 'email', required: true }),
name: schema().string({ required: true }),
preferences: schema().object({}).required(false),
}),
// Define output schema
output_schema: schema().object({
userId: schema().string({ required: true }),
accountCreated: schema().boolean({ required: true }),
welcomeEmailSent: schema().boolean({ required: true }),
}),
// Flow implementation as an array of steps
steps: [
// Create user account
createUserAccount,
// Conditionally set up preferences if provided
conditionalStep({
condition: state => !!state.preferences,
ifTrue: setupUserPreferences,
ifFalse: null,
}),
// Send welcome email
sendWelcomeEmail,
],
// Optional: Flow-level timeout
timeoutSeconds: 300, // 5 minutes
// Optional: Top-level error handler
onError: async (error, context) => {
context.log('Top-level onError handler caught', { error: error.message });
// Perform cleanup actions, send notifications, etc.
},
});Graph-based Flow Definition
For more complex flows, the graph-based definition offers more flexibility:
const flowDefinition: FlowGraphDefinition = {
id: 'payment-processing',
version: '1.0.0',
description: 'Process payment with validation',
nodes: [
{
id: 'start',
type: FlowNodeType.START,
name: 'Start',
config: {},
},
{
id: 'validate-payment',
type: FlowNodeType.TASK,
name: 'Validate Payment',
config: {
stepId: 'payment-validator',
input: {
schema: {
type: 'object',
properties: {
amount: { type: 'number' },
currency: { type: 'string' },
},
required: ['amount', 'currency'],
},
},
},
},
{
id: 'process-payment',
type: FlowNodeType.TASK,
name: 'Process Payment',
config: {
stepId: 'payment-processor',
input: {
method: 'POST',
},
},
},
{
id: 'end',
type: FlowNodeType.END,
name: 'End',
config: {},
},
],
edges: [
{
id: 'start-to-validate',
source: 'start',
target: 'validate-payment',
type: FlowEdgeType.DEFAULT,
},
{
id: 'validate-to-process',
source: 'validate-payment',
target: 'process-payment',
type: FlowEdgeType.DEFAULT,
},
{
id: 'process-to-end',
source: 'process-payment',
target: 'end',
type: FlowEdgeType.DEFAULT,
},
],
entryNodeId: 'start',
exitNodeIds: ['end'],
inputSchema: { type: 'object' },
outputSchema: { type: 'object' },
};Converting Between Formats
ZoopFlow provides adapters to convert between the two flow definition formats:
import { sequentialToGraph, graphToSequential } from '@zoopflow/core/adapters/flow-adapters';
// Convert sequential to graph
const graphFlow = sequentialToGraph(sequentialFlow);
// Check if conversion to sequential is possible
if (canConvertToSequential(graphFlow)) {
// Convert graph to sequential
const sequentialFlow = graphToSequential(graphFlow);
}Flow Configuration Options
| Option | Type | Description |
|---|---|---|
id | string | Unique identifier for the flow (namespace.name format) |
version | string | Semantic version of the flow (x.y.z) |
description | string | Human-readable description |
category | string | Optional category for organization |
input_schema | JSONSchema7 | Schema for validating inputs |
output_schema | JSONSchema7 | Schema for validating outputs |
steps | StepDefinition[] | Array of step definitions to execute (sequential) |
timeoutSeconds | number | Maximum execution time for entire flow |
validateInput | Function | Custom input validation function |
validateOutput | Function | Custom output validation function |
onError | Function | Flow-level error handler function |
Complex Flow Patterns
ZoopFlow supports complex flow patterns through specialized control flow steps:
Conditional Execution
import { conditionalStep } from '@zoopflow/core/steps/control-flow';
// In flow definition:
steps: [
// First steps...
// Conditional execution
conditionalStep({
condition: state => state.userType === 'premium',
ifTrue: premiumUserStep,
ifFalse: standardUserStep,
}),
// Subsequent steps...
];Branching Logic
import { branchStep } from '@zoopflow/core/steps/control-flow';
// In flow definition:
steps: [
// First steps...
// Branch to different paths based on state
branchStep({
branches: {
new: {
condition: state => state.userStatus === 'new',
steps: [onboardingStep, welcomeStep],
},
returning: {
condition: state => state.userStatus === 'returning',
steps: [welcomeBackStep],
},
inactive: {
condition: state => state.userStatus === 'inactive',
steps: [reactivationStep],
},
},
default: [fallbackStep],
}),
// Subsequent steps...
];Parallel Execution
import { parallelStep } from '@zoopflow/core/steps/control-flow';
// In flow definition:
steps: [
// First steps...
// Execute steps in parallel
parallelStep({
steps: [notificationStep, analyticsStep, auditLogStep],
mergeStrategy: 'combine', // How to merge the results
}),
// Subsequent steps...
];Type Safety and Validation
ZoopFlow provides a comprehensive validation system that ensures type compatibility between steps, verifies that flow definitions are well-formed, and validates inputs and outputs against JSON schemas.
Flow Validation
import { validateFlow } from '@zoopflow/core/validation';
// Validate a flow before registration
const validationResult = validateFlow(myFlow);
if (!validationResult.valid) {
console.error('Flow validation failed:', validationResult.errors);
// Handle validation errors
}The validator checks:
- Input/output schema compatibility between consecutive steps
- Required step properties
- Step-specific validations
- Flow-level integrity
Schema Validation
ZoopFlow uses JSON Schema for validating inputs and outputs:
// Input schema validation
input_schema: schema().object({
email: schema().string({ format: 'email', required: true }),
name: schema().string({ required: true }),
preferences: schema().object({}).required(false),
}),
// Output schema validation
output_schema: schema().object({
userId: schema().string({ required: true }),
accountCreated: schema().boolean({ required: true }),
welcomeEmailSent: schema().boolean({ required: true }),
}),Custom Validation
You can also provide custom validation functions:
validateInput: (input, context) => {
if (input.amount <= 0) {
return {
valid: false,
errors: [{ path: 'amount', message: 'Amount must be greater than zero' }],
};
}
return { valid: true };
},
validateOutput: (output, context) => {
// Custom output validation logic
return { valid: true };
},Flow Context and State Management
Each step in a flow can access flow-level context, which provides access to state, services, and utility functions.
Accessing Flow Context
// In a step's execute function
execute: async (input, context) => {
// Access flow-level context
const flowId = context.flow.id;
const executionId = context.flow.executionId;
// Get flow-level state
const sharedState = await context.flow.getState('someKey');
// Set flow-level state for other steps to access
await context.flow.setState('newKey', someValue);
// Return step output
return {
/* output */
};
};Flow Context Implementation
// Context passed to flows during execution
class FlowContextImpl implements FlowContext {
constructor(
private workflowContext: temporal.WorkflowContext,
private services: ServiceRegistry,
) {}
// State management
async getState<T>(key: string): Promise<T | undefined> {
return await this.services.stateManager.getState(
this.workflowContext.info.workflowExecution.workflowId,
key,
);
}
async setState<T>(key: string, value: T): Promise<void> {
await this.services.stateManager.setState(
this.workflowContext.info.workflowExecution.workflowId,
key,
value,
);
}
// Service access
getService<T>(name: string): T {
return this.services.getService(name);
}
// Logging
log(message: string, metadata?: object): void {
this.workflowContext.log.info(message, metadata);
}
}Flow Versioning
ZoopFlow supports versioning of flows, allowing for backward compatibility and controlled evolution of workflows.
// Flow versioning
export function defineFlowVersion<TInput, TOutput>(
baseFlow: Flow<TInput, TOutput>,
options: FlowVersionOptions<TInput, TOutput>,
): Flow<TInput, TOutput> {
const newFlow = {
...baseFlow,
id: baseFlow.id,
version: options.version,
description: options.description || baseFlow.description,
steps: options.steps,
};
flowRegistry.registerFlowVersion(newFlow);
return {
id: newFlow.id,
version: newFlow.version,
start: async (input: TInput): Promise<FlowExecutionReference> => {
// Start with version tag
const handle = await temporalClient.workflow.start(newFlow.id, {
args: [input],
taskQueue: DEFAULT_TASK_QUEUE,
workflowId: generateWorkflowId(newFlow.id),
searchAttributes: {
FlowVersion: [newFlow.version],
},
});
return new FlowExecutionReferenceImpl(handle);
},
};
}Key Versioning Principles
- Each flow has a unique ID that remains consistent across versions
- Semantic versioning (MAJOR.MINOR.PATCH) is used for flow versions
- Temporal's versioning capabilities are used to handle workflow code changes
- Search attributes are used to track and query flow versions
Integration with Temporal
ZoopFlow integrates with Temporal.io to provide durable, resilient workflow execution. The Flow Definition Adapter acts as a bridge between the declarative flow definitions and Temporal workflows.
Flow Definition Adapter
The Flow Definition Adapter translates flow definitions into executable Temporal workflows:
/**
* Adapter for converting Flow definitions to Temporal workflows
*/
export class FlowDefinitionAdapter {
/**
* Convert a JSON flow definition to a Temporal workflow
*/
convertToWorkflow(flowDefinition: FlowGraphDefinition): WorkflowDefinition {
const nodes = this.extractNodes(flowDefinition);
const edges = this.extractEdges(flowDefinition);
const activities = this.convertNodesToActivities(nodes);
const controlFlow = this.mapEdgesToControlFlow(edges);
return {
id: flowDefinition.id,
version: flowDefinition.version,
activities,
controlFlow,
metadata: flowDefinition.metadata,
};
}
// ... implementation details omitted for brevity
}Node and Edge Mapping
The adapter maps flow nodes to Temporal activities and flow edges to Temporal control flow:
private convertNodesToActivities(nodes: FlowNode[]): ActivityDefinition[] {
return nodes.map(node => ({
id: node.id,
type: this.mapNodeTypeToActivityType(node.type),
input: this.mapNodeInputToActivityInput(node.input),
options: this.mapNodeOptionsToActivityOptions(node.options),
}));
}
private mapEdgesToControlFlow(edges: FlowEdge[]): ControlFlow {
const transitions = edges.map(edge => ({
from: edge.source,
to: edge.target,
condition: this.convertCondition(edge.condition),
}));
return {
transitions,
entryPoints: this.determineEntryPoints(edges),
exitPoints: this.determineExitPoints(edges),
};
}Best Practices
Flow Design
- Use Namespaced IDs: Follow the
namespace.nameformat for flow IDs to ensure proper organization and avoid conflicts - Semantic Versioning: Use the
x.y.zformat for flow versions - Proper Error Handling: Implement proper error handling at both flow and step levels
- Comprehensive Schemas: Define thorough JSON Schemas for input and output validation
- State Management: Design flows with proper state management, keeping state minimal and relevant
- Reusable Steps: Create reusable, single-purpose steps for better maintainability
- Flow Boundaries: Design flows with clear boundaries and responsibilities
Performance
- Optimize Flow Size: Keep flows focused and avoid excessive nesting
- Limit Parallel Steps: Be mindful of resource usage when using parallel execution
- Timeout Management: Set appropriate timeouts at both flow and step levels
- Efficient Data Handling: Pass only necessary data between steps
Maintainability
- Documentation: Add clear descriptions to flows and steps
- Consistent Naming: Use consistent naming conventions for flows and steps
- Versioning Strategy: Develop a clear versioning strategy for evolving flows
- Testing: Write comprehensive tests for flows, including edge cases
- Organization: Organize flows by domain or function
API Reference
Core Functions
| Function | Description |
|---|---|
defineFlow() | Creates a new flow definition with the specified configuration |
defineFlowVersion() | Creates a new version of an existing flow |
validateFlow() | Validates a flow definition for correctness |
sequentialToGraph() | Converts a sequential flow to a graph-based flow |
graphToSequential() | Converts a graph-based flow to a sequential flow |
Core Interfaces
| Interface | Description |
|---|---|
Flow<TInput, TOutput> | Represents a flow definition with execution methods |
FlowContext | Provides access to flow state, services, and utilities |
FlowDefinition | Defines a sequential flow |
FlowGraphDefinition | Defines a graph-based flow |
FlowExecutionResult | Contains the result of a flow execution |
Control Flow Functions
| Function | Description |
|---|---|
conditionalStep() | Creates a conditional step for branching logic |
branchStep() | Creates a multi-branch decision step |
parallelStep() | Creates a step for parallel execution |
Examples
Basic Flow
import { defineFlow, schema } from '@zoopflow/core';
import { validateEmail } from '../steps/validate-email.step';
import { createUser } from '../steps/create-user.step';
import { sendWelcomeEmail } from '../steps/send-welcome-email.step';
export const userRegistrationFlow = defineFlow({
id: 'users.registration',
version: '1.0.0',
description: 'Registers a new user in the system',
category: 'User Management',
input_schema: schema().object({
email: schema().string({ format: 'email', required: true }),
name: schema().string({ required: true }),
password: schema().string({ minLength: 8, required: true }),
}),
output_schema: schema().object({
userId: schema().string({ required: true }),
success: schema().boolean({ required: true }),
}),
steps: [
validateEmail,
createUser,
sendWelcomeEmail,
],
});Flow with Error Handling
import { defineFlow, schema } from '@zoopflow/core';
import { processPayment } from '../steps/process-payment.step';
import { createOrder } from '../steps/create-order.step';
import { sendReceipt } from '../steps/send-receipt.step';
export const paymentFlow = defineFlow({
id: 'payments.process',
version: '1.0.0',
description: 'Processes a payment and creates an order',
input_schema: schema().object({
amount: schema().number({ minimum: 0.01, required: true }),
paymentMethod: schema().string({ required: true }),
customerId: schema().string({ required: true }),
}),
output_schema: schema().object({
success: schema().boolean({ required: true }),
orderId: schema().string({ required: false }),
error: schema().string({ required: false }),
}),
steps: async (input, context) => {
try {
// Process payment
const paymentResult = await processPayment.execute(input, context);
if (!paymentResult.success) {
return {
success: false,
error: paymentResult.errorMessage,
};
}
// Create order
const orderResult = await createOrder.execute({
customerId: input.customerId,
paymentId: paymentResult.paymentId,
amount: input.amount,
}, context);
// Send receipt
await sendReceipt.execute({
email: orderResult.customerEmail,
orderId: orderResult.orderId,
amount: input.amount,
}, context);
return {
success: true,
orderId: orderResult.orderId,
};
} catch (error) {
context.log('Error in payment flow', { error: (error as Error).message });
return {
success: false,
error: (error as Error).message,
};
}
},
onError: async (error, context) => {
context.log('Payment flow error', { error: error.message });
// Notify customer service
await context.getService('notifications').sendAlert({
type: 'payment_error',
message: `Payment processing error: ${error.message}`,
});
},
});Complex Flow with Parallel Execution
import { defineFlow, schema } from '@zoopflow/core';
import { validateOrder } from '../steps/validate-order.step';
import { reserveInventory } from '../steps/reserve-inventory.step';
import { processPayment } from '../steps/process-payment.step';
import { createShippingLabel } from '../steps/create-shipping-label.step';
import { updateInventory } from '../steps/update-inventory.step';
import { notifyCustomer } from '../steps/notify-customer.step';
import { recordAnalytics } from '../steps/record-analytics.step';
import { parallelStep } from '@zoopflow/core/steps/control-flow';
export const orderFulfillmentFlow = defineFlow({
id: 'orders.fulfillment',
version: '1.0.0',
description: 'Processes and fulfills a customer order',
input_schema: schema().object({
orderId: schema().string({ required: true }),
customerId: schema().string({ required: true }),
items: schema().array().items(
schema().object({
productId: schema().string({ required: true }),
quantity: schema().number({ minimum: 1, required: true }),
})
),
shippingAddress: schema().object({
// Address fields
}),
paymentDetails: schema().object({
// Payment fields
}),
}),
output_schema: schema().object({
success: schema().boolean({ required: true }),
trackingNumber: schema().string({ required: false }),
estimatedDelivery: schema().string({ format: 'date', required: false }),
error: schema().string({ required: false }),
}),
steps: [
// Validate order
validateOrder,
// Reserve inventory
reserveInventory,
// Process payment
processPayment,
// Parallel post-processing steps
parallelStep({
steps: [
createShippingLabel,
updateInventory,
notifyCustomer,
recordAnalytics,
],
mergeStrategy: 'combine',
}),
],
timeoutSeconds: 600, // 10 minutes
});