Core Concepts
Flow Definition System

Flow Definition System

Overview

The Flow Definition System is the core mechanism in ZoopFlow that enables developers to define, validate, and execute workflow processes. It provides a declarative approach to designing workflows by composing individual steps into cohesive processes with proper state management, error handling, and validation.

ZoopFlow supports two formats for defining workflows:

  1. Sequential Flow Definition: A linear sequence of steps executed in order
  2. Graph-based Flow Definition: A more flexible representation using nodes and edges for complex workflows with branching, conditions, and parallel execution paths

The Flow Definition System ensures that:

  • All workflow steps have compatible inputs and outputs
  • Proper validation occurs at both flow and step levels
  • Workflows can be versioned and evolved over time
  • Complex orchestration patterns are supported
  • Execution state is properly managed and accessible
  • Proper integration with Temporal.io occurs under the hood

Flow Structure and Components

Sequential Flow Structure

A sequential flow definition is composed of:

  1. Metadata: ID, version, description, category
  2. Schemas: Input and output validation schemas
  3. Steps Array: An ordered sequence of step definitions to execute
  4. Configuration: Timeout and other execution options
  5. Error Handling: Flow-level error handling logic

Graph-based Flow Structure

The graph-based flow definition consists of:

  1. Nodes: Individual steps or decision points in the workflow (Start, End, Task, Condition, Parallel, Join)
  2. Edges: Connections between nodes, defining the flow of execution (Default, Conditional, Error)
  3. Entry Node: The starting point of the workflow
  4. Exit Nodes: Possible ending points of the workflow
interface FlowGraphDefinition<TInput = unknown, TOutput = unknown> extends BaseDefinition {
  nodes: FlowNodeDefinition[];
  edges: FlowEdgeDefinition[];
  entryNodeId: string;
  exitNodeIds: string[];
  // ... other properties
}

Creating a Flow

Sequential Flow Definition

The primary way to create a flow is using the defineFlow function:

// src/flows/user-onboarding.flow.ts
import { defineFlow, schema } from '@zoopflow/core';
import { createUserAccount } from '../steps/create-user-account.step';
import { setupUserPreferences } from '../steps/setup-user-preferences.step';
import { sendWelcomeEmail } from '../steps/send-welcome-email.step';
import { conditionalStep } from '../steps/control-flow/conditional.step';
 
export const userOnboardingFlow = defineFlow({
  id: 'users.onboarding',
  version: '1.0.0',
  description: 'Onboards a new user into the system',
  category: 'User Management',
 
  // Define input schema
  input_schema: schema().object({
    email: schema().string({ format: 'email', required: true }),
    name: schema().string({ required: true }),
    preferences: schema().object({}).required(false),
  }),
 
  // Define output schema
  output_schema: schema().object({
    userId: schema().string({ required: true }),
    accountCreated: schema().boolean({ required: true }),
    welcomeEmailSent: schema().boolean({ required: true }),
  }),
 
  // Flow implementation as an array of steps
  steps: [
    // Create user account
    createUserAccount,
 
    // Conditionally set up preferences if provided
    conditionalStep({
      condition: state => !!state.preferences,
      ifTrue: setupUserPreferences,
      ifFalse: null,
    }),
 
    // Send welcome email
    sendWelcomeEmail,
  ],
  
  // Optional: Flow-level timeout
  timeoutSeconds: 300, // 5 minutes
  
  // Optional: Top-level error handler
  onError: async (error, context) => {
    context.log('Top-level onError handler caught', { error: error.message });
    // Perform cleanup actions, send notifications, etc.
  },
});

Graph-based Flow Definition

For more complex flows, the graph-based definition offers more flexibility:

const flowDefinition: FlowGraphDefinition = {
  id: 'payment-processing',
  version: '1.0.0',
  description: 'Process payment with validation',
  nodes: [
    {
      id: 'start',
      type: FlowNodeType.START,
      name: 'Start',
      config: {},
    },
    {
      id: 'validate-payment',
      type: FlowNodeType.TASK,
      name: 'Validate Payment',
      config: {
        stepId: 'payment-validator',
        input: {
          schema: {
            type: 'object',
            properties: {
              amount: { type: 'number' },
              currency: { type: 'string' },
            },
            required: ['amount', 'currency'],
          },
        },
      },
    },
    {
      id: 'process-payment',
      type: FlowNodeType.TASK,
      name: 'Process Payment',
      config: {
        stepId: 'payment-processor',
        input: {
          method: 'POST',
        },
      },
    },
    {
      id: 'end',
      type: FlowNodeType.END,
      name: 'End',
      config: {},
    },
  ],
  edges: [
    {
      id: 'start-to-validate',
      source: 'start',
      target: 'validate-payment',
      type: FlowEdgeType.DEFAULT,
    },
    {
      id: 'validate-to-process',
      source: 'validate-payment',
      target: 'process-payment',
      type: FlowEdgeType.DEFAULT,
    },
    {
      id: 'process-to-end',
      source: 'process-payment',
      target: 'end',
      type: FlowEdgeType.DEFAULT,
    },
  ],
  entryNodeId: 'start',
  exitNodeIds: ['end'],
  inputSchema: { type: 'object' },
  outputSchema: { type: 'object' },
};

Converting Between Formats

ZoopFlow provides adapters to convert between the two flow definition formats:

import { sequentialToGraph, graphToSequential } from '@zoopflow/core/adapters/flow-adapters';
 
// Convert sequential to graph
const graphFlow = sequentialToGraph(sequentialFlow);
 
// Check if conversion to sequential is possible
if (canConvertToSequential(graphFlow)) {
  // Convert graph to sequential
  const sequentialFlow = graphToSequential(graphFlow);
}

Flow Configuration Options

OptionTypeDescription
idstringUnique identifier for the flow (namespace.name format)
versionstringSemantic version of the flow (x.y.z)
descriptionstringHuman-readable description
categorystringOptional category for organization
input_schemaJSONSchema7Schema for validating inputs
output_schemaJSONSchema7Schema for validating outputs
stepsStepDefinition[]Array of step definitions to execute (sequential)
timeoutSecondsnumberMaximum execution time for entire flow
validateInputFunctionCustom input validation function
validateOutputFunctionCustom output validation function
onErrorFunctionFlow-level error handler function

Complex Flow Patterns

ZoopFlow supports complex flow patterns through specialized control flow steps:

Conditional Execution

import { conditionalStep } from '@zoopflow/core/steps/control-flow';
 
// In flow definition:
steps: [
  // First steps...
 
  // Conditional execution
  conditionalStep({
    condition: state => state.userType === 'premium',
    ifTrue: premiumUserStep,
    ifFalse: standardUserStep,
  }),
 
  // Subsequent steps...
];

Branching Logic

import { branchStep } from '@zoopflow/core/steps/control-flow';
 
// In flow definition:
steps: [
  // First steps...
 
  // Branch to different paths based on state
  branchStep({
    branches: {
      new: {
        condition: state => state.userStatus === 'new',
        steps: [onboardingStep, welcomeStep],
      },
      returning: {
        condition: state => state.userStatus === 'returning',
        steps: [welcomeBackStep],
      },
      inactive: {
        condition: state => state.userStatus === 'inactive',
        steps: [reactivationStep],
      },
    },
    default: [fallbackStep],
  }),
 
  // Subsequent steps...
];

Parallel Execution

import { parallelStep } from '@zoopflow/core/steps/control-flow';
 
// In flow definition:
steps: [
  // First steps...
 
  // Execute steps in parallel
  parallelStep({
    steps: [notificationStep, analyticsStep, auditLogStep],
    mergeStrategy: 'combine', // How to merge the results
  }),
 
  // Subsequent steps...
];

Type Safety and Validation

ZoopFlow provides a comprehensive validation system that ensures type compatibility between steps, verifies that flow definitions are well-formed, and validates inputs and outputs against JSON schemas.

Flow Validation

import { validateFlow } from '@zoopflow/core/validation';
 
// Validate a flow before registration
const validationResult = validateFlow(myFlow);
 
if (!validationResult.valid) {
  console.error('Flow validation failed:', validationResult.errors);
  // Handle validation errors
}

The validator checks:

  1. Input/output schema compatibility between consecutive steps
  2. Required step properties
  3. Step-specific validations
  4. Flow-level integrity

Schema Validation

ZoopFlow uses JSON Schema for validating inputs and outputs:

// Input schema validation
input_schema: schema().object({
  email: schema().string({ format: 'email', required: true }),
  name: schema().string({ required: true }),
  preferences: schema().object({}).required(false),
}),
 
// Output schema validation
output_schema: schema().object({
  userId: schema().string({ required: true }),
  accountCreated: schema().boolean({ required: true }),
  welcomeEmailSent: schema().boolean({ required: true }),
}),

Custom Validation

You can also provide custom validation functions:

validateInput: (input, context) => {
  if (input.amount <= 0) {
    return {
      valid: false,
      errors: [{ path: 'amount', message: 'Amount must be greater than zero' }],
    };
  }
  return { valid: true };
},
 
validateOutput: (output, context) => {
  // Custom output validation logic
  return { valid: true };
},

Flow Context and State Management

Each step in a flow can access flow-level context, which provides access to state, services, and utility functions.

Accessing Flow Context

// In a step's execute function
execute: async (input, context) => {
  // Access flow-level context
  const flowId = context.flow.id;
  const executionId = context.flow.executionId;
 
  // Get flow-level state
  const sharedState = await context.flow.getState('someKey');
 
  // Set flow-level state for other steps to access
  await context.flow.setState('newKey', someValue);
 
  // Return step output
  return {
    /* output */
  };
};

Flow Context Implementation

// Context passed to flows during execution
class FlowContextImpl implements FlowContext {
  constructor(
    private workflowContext: temporal.WorkflowContext,
    private services: ServiceRegistry,
  ) {}
 
  // State management
  async getState<T>(key: string): Promise<T | undefined> {
    return await this.services.stateManager.getState(
      this.workflowContext.info.workflowExecution.workflowId,
      key,
    );
  }
 
  async setState<T>(key: string, value: T): Promise<void> {
    await this.services.stateManager.setState(
      this.workflowContext.info.workflowExecution.workflowId,
      key,
      value,
    );
  }
 
  // Service access
  getService<T>(name: string): T {
    return this.services.getService(name);
  }
 
  // Logging
  log(message: string, metadata?: object): void {
    this.workflowContext.log.info(message, metadata);
  }
}

Flow Versioning

ZoopFlow supports versioning of flows, allowing for backward compatibility and controlled evolution of workflows.

// Flow versioning
export function defineFlowVersion<TInput, TOutput>(
  baseFlow: Flow<TInput, TOutput>,
  options: FlowVersionOptions<TInput, TOutput>,
): Flow<TInput, TOutput> {
  const newFlow = {
    ...baseFlow,
    id: baseFlow.id,
    version: options.version,
    description: options.description || baseFlow.description,
    steps: options.steps,
  };
 
  flowRegistry.registerFlowVersion(newFlow);
 
  return {
    id: newFlow.id,
    version: newFlow.version,
    start: async (input: TInput): Promise<FlowExecutionReference> => {
      // Start with version tag
      const handle = await temporalClient.workflow.start(newFlow.id, {
        args: [input],
        taskQueue: DEFAULT_TASK_QUEUE,
        workflowId: generateWorkflowId(newFlow.id),
        searchAttributes: {
          FlowVersion: [newFlow.version],
        },
      });
 
      return new FlowExecutionReferenceImpl(handle);
    },
  };
}

Key Versioning Principles

  1. Each flow has a unique ID that remains consistent across versions
  2. Semantic versioning (MAJOR.MINOR.PATCH) is used for flow versions
  3. Temporal's versioning capabilities are used to handle workflow code changes
  4. Search attributes are used to track and query flow versions

Integration with Temporal

ZoopFlow integrates with Temporal.io to provide durable, resilient workflow execution. The Flow Definition Adapter acts as a bridge between the declarative flow definitions and Temporal workflows.

Flow Definition Adapter

The Flow Definition Adapter translates flow definitions into executable Temporal workflows:

/**
 * Adapter for converting Flow definitions to Temporal workflows
 */
export class FlowDefinitionAdapter {
  /**
   * Convert a JSON flow definition to a Temporal workflow
   */
  convertToWorkflow(flowDefinition: FlowGraphDefinition): WorkflowDefinition {
    const nodes = this.extractNodes(flowDefinition);
    const edges = this.extractEdges(flowDefinition);
    
    const activities = this.convertNodesToActivities(nodes);
    const controlFlow = this.mapEdgesToControlFlow(edges);
    
    return {
      id: flowDefinition.id,
      version: flowDefinition.version,
      activities,
      controlFlow,
      metadata: flowDefinition.metadata,
    };
  }
  
  // ... implementation details omitted for brevity
}

Node and Edge Mapping

The adapter maps flow nodes to Temporal activities and flow edges to Temporal control flow:

private convertNodesToActivities(nodes: FlowNode[]): ActivityDefinition[] {
  return nodes.map(node => ({
    id: node.id,
    type: this.mapNodeTypeToActivityType(node.type),
    input: this.mapNodeInputToActivityInput(node.input),
    options: this.mapNodeOptionsToActivityOptions(node.options),
  }));
}
 
private mapEdgesToControlFlow(edges: FlowEdge[]): ControlFlow {
  const transitions = edges.map(edge => ({
    from: edge.source,
    to: edge.target,
    condition: this.convertCondition(edge.condition),
  }));
  
  return {
    transitions,
    entryPoints: this.determineEntryPoints(edges),
    exitPoints: this.determineExitPoints(edges),
  };
}

Best Practices

Flow Design

  1. Use Namespaced IDs: Follow the namespace.name format for flow IDs to ensure proper organization and avoid conflicts
  2. Semantic Versioning: Use the x.y.z format for flow versions
  3. Proper Error Handling: Implement proper error handling at both flow and step levels
  4. Comprehensive Schemas: Define thorough JSON Schemas for input and output validation
  5. State Management: Design flows with proper state management, keeping state minimal and relevant
  6. Reusable Steps: Create reusable, single-purpose steps for better maintainability
  7. Flow Boundaries: Design flows with clear boundaries and responsibilities

Performance

  1. Optimize Flow Size: Keep flows focused and avoid excessive nesting
  2. Limit Parallel Steps: Be mindful of resource usage when using parallel execution
  3. Timeout Management: Set appropriate timeouts at both flow and step levels
  4. Efficient Data Handling: Pass only necessary data between steps

Maintainability

  1. Documentation: Add clear descriptions to flows and steps
  2. Consistent Naming: Use consistent naming conventions for flows and steps
  3. Versioning Strategy: Develop a clear versioning strategy for evolving flows
  4. Testing: Write comprehensive tests for flows, including edge cases
  5. Organization: Organize flows by domain or function

API Reference

Core Functions

FunctionDescription
defineFlow()Creates a new flow definition with the specified configuration
defineFlowVersion()Creates a new version of an existing flow
validateFlow()Validates a flow definition for correctness
sequentialToGraph()Converts a sequential flow to a graph-based flow
graphToSequential()Converts a graph-based flow to a sequential flow

Core Interfaces

InterfaceDescription
Flow<TInput, TOutput>Represents a flow definition with execution methods
FlowContextProvides access to flow state, services, and utilities
FlowDefinitionDefines a sequential flow
FlowGraphDefinitionDefines a graph-based flow
FlowExecutionResultContains the result of a flow execution

Control Flow Functions

FunctionDescription
conditionalStep()Creates a conditional step for branching logic
branchStep()Creates a multi-branch decision step
parallelStep()Creates a step for parallel execution

Examples

Basic Flow

import { defineFlow, schema } from '@zoopflow/core';
import { validateEmail } from '../steps/validate-email.step';
import { createUser } from '../steps/create-user.step';
import { sendWelcomeEmail } from '../steps/send-welcome-email.step';
 
export const userRegistrationFlow = defineFlow({
  id: 'users.registration',
  version: '1.0.0',
  description: 'Registers a new user in the system',
  category: 'User Management',
 
  input_schema: schema().object({
    email: schema().string({ format: 'email', required: true }),
    name: schema().string({ required: true }),
    password: schema().string({ minLength: 8, required: true }),
  }),
 
  output_schema: schema().object({
    userId: schema().string({ required: true }),
    success: schema().boolean({ required: true }),
  }),
 
  steps: [
    validateEmail,
    createUser,
    sendWelcomeEmail,
  ],
});

Flow with Error Handling

import { defineFlow, schema } from '@zoopflow/core';
import { processPayment } from '../steps/process-payment.step';
import { createOrder } from '../steps/create-order.step';
import { sendReceipt } from '../steps/send-receipt.step';
 
export const paymentFlow = defineFlow({
  id: 'payments.process',
  version: '1.0.0',
  description: 'Processes a payment and creates an order',
 
  input_schema: schema().object({
    amount: schema().number({ minimum: 0.01, required: true }),
    paymentMethod: schema().string({ required: true }),
    customerId: schema().string({ required: true }),
  }),
 
  output_schema: schema().object({
    success: schema().boolean({ required: true }),
    orderId: schema().string({ required: false }),
    error: schema().string({ required: false }),
  }),
 
  steps: async (input, context) => {
    try {
      // Process payment
      const paymentResult = await processPayment.execute(input, context);
      
      if (!paymentResult.success) {
        return {
          success: false,
          error: paymentResult.errorMessage,
        };
      }
      
      // Create order
      const orderResult = await createOrder.execute({
        customerId: input.customerId,
        paymentId: paymentResult.paymentId,
        amount: input.amount,
      }, context);
      
      // Send receipt
      await sendReceipt.execute({
        email: orderResult.customerEmail,
        orderId: orderResult.orderId,
        amount: input.amount,
      }, context);
      
      return {
        success: true,
        orderId: orderResult.orderId,
      };
    } catch (error) {
      context.log('Error in payment flow', { error: (error as Error).message });
      return {
        success: false,
        error: (error as Error).message,
      };
    }
  },
  
  onError: async (error, context) => {
    context.log('Payment flow error', { error: error.message });
    // Notify customer service
    await context.getService('notifications').sendAlert({
      type: 'payment_error',
      message: `Payment processing error: ${error.message}`,
    });
  },
});

Complex Flow with Parallel Execution

import { defineFlow, schema } from '@zoopflow/core';
import { validateOrder } from '../steps/validate-order.step';
import { reserveInventory } from '../steps/reserve-inventory.step';
import { processPayment } from '../steps/process-payment.step';
import { createShippingLabel } from '../steps/create-shipping-label.step';
import { updateInventory } from '../steps/update-inventory.step';
import { notifyCustomer } from '../steps/notify-customer.step';
import { recordAnalytics } from '../steps/record-analytics.step';
import { parallelStep } from '@zoopflow/core/steps/control-flow';
 
export const orderFulfillmentFlow = defineFlow({
  id: 'orders.fulfillment',
  version: '1.0.0',
  description: 'Processes and fulfills a customer order',
  
  input_schema: schema().object({
    orderId: schema().string({ required: true }),
    customerId: schema().string({ required: true }),
    items: schema().array().items(
      schema().object({
        productId: schema().string({ required: true }),
        quantity: schema().number({ minimum: 1, required: true }),
      })
    ),
    shippingAddress: schema().object({
      // Address fields
    }),
    paymentDetails: schema().object({
      // Payment fields
    }),
  }),
  
  output_schema: schema().object({
    success: schema().boolean({ required: true }),
    trackingNumber: schema().string({ required: false }),
    estimatedDelivery: schema().string({ format: 'date', required: false }),
    error: schema().string({ required: false }),
  }),
  
  steps: [
    // Validate order
    validateOrder,
    
    // Reserve inventory
    reserveInventory,
    
    // Process payment
    processPayment,
    
    // Parallel post-processing steps
    parallelStep({
      steps: [
        createShippingLabel,
        updateInventory,
        notifyCustomer,
        recordAnalytics,
      ],
      mergeStrategy: 'combine',
    }),
  ],
  
  timeoutSeconds: 600, // 10 minutes
});