Guides
Temporal Workflow Adapter

Temporal Workflow Adapter Guide

This guide explains ZoopFlow's new adapter pattern implementation for Temporal.io integration, its benefits, and how to use it in your applications.

Introduction

The Temporal Workflow Adapter is a core component of ZoopFlow that enables seamless integration with Temporal.io, providing durable, reliable workflow execution with powerful features like automatic retries, error handling, signal processing, and state management.

This new adapter pattern implementation is designed to provide a cleaner, more type-safe interface between ZoopFlow and Temporal, with improved error handling and a more consistent developer experience.

Key Features

  • Clear Separation of Concerns: Maintains a clean boundary between ZoopFlow's domain model and Temporal's execution model
  • Type Safety: Provides fully typed interfaces that eliminate unnecessary type assertions and improves developer experience
  • Improved Error Handling: Consistent error propagation between ZoopFlow and Temporal with proper error conversion
  • Signal Integration: Clean signal registration and validation with proper TypeScript typing
  • Progress Tracking: Built-in support for monitoring workflow progress through Temporal queries
  • Simplified API: Intuitive interface for registering and executing flows with Temporal

Architecture Overview

The Temporal Workflow Adapter follows a layered architecture:

  1. Adapter Interface Layer (TemporalAdapter): Defines the contract between ZoopFlow and Temporal
  2. Bridge Implementation Layer (TemporalBridgeImpl): Implements the adapter interface and handles communication with Temporal
  3. Factory Layer (ModernWorkflowFactory, ActivityFactory): Creates Temporal-compatible workflow and activity functions
  4. Conversion Layer (FlowDefinitionAdapter): Converts between ZoopFlow flow definitions and Temporal workflow definitions
  5. Error Handling Layer (TemporalErrorHandler): Handles error conversion and propagation
  6. Types Layer (TemporalWorkflowDefinition, TemporalOptions): Defines the types used throughout the adapter

This layered approach ensures a clear separation of concerns and makes the code more maintainable and testable.

Getting Started

Installing Dependencies

Ensure you have the required Temporal dependencies:

npm install @temporalio/client @temporalio/worker @temporalio/workflow

Creating a Temporal Adapter

import { NativeConnection } from '@temporalio/worker';
import { createTemporalAdapter } from '@zoopflow/core/temporal';
 
// Connect to Temporal
const connection = await NativeConnection.connect({
  address: 'localhost:7233'
});
 
// Create a Temporal adapter
const adapter = createTemporalAdapter(connection, 'your-task-queue');

Registering Steps and Flows

import { defineStep, defineFlow } from '@zoopflow/core';
 
// Define a step
const processOrderStep = defineStep({
  id: 'order.process',
  version: '1.0.0',
  description: 'Process an order',
  inputSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      items: { type: 'array' }
    },
    required: ['orderId', 'items']
  },
  outputSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      status: { type: 'string' }
    },
    required: ['orderId', 'status']
  },
  execute: async (input, context) => {
    // Process order logic
    return {
      orderId: input.orderId,
      status: 'processed'
    };
  }
});
 
// Define a flow
const orderFlow = defineFlow({
  id: 'order.flow',
  version: '1.0.0',
  description: 'Order processing flow',
  inputSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      items: { type: 'array' }
    },
    required: ['orderId', 'items']
  },
  outputSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      status: { type: 'string' }
    },
    required: ['orderId', 'status']
  },
  steps: [processOrderStep]
});
 
// Register the step and flow with Temporal
adapter.registerStepAsActivity(processOrderStep);
adapter.registerFlowAsWorkflow(orderFlow);

Executing a Flow

// Start the Temporal worker
await adapter.startWorker();
 
// Execute the flow
const result = await adapter.executeFlow('order.flow', {
  orderId: '12345',
  items: [{ id: 'item1', quantity: 2 }]
}, {
  workflowId: 'order-12345',
  onProgress: (progress) => {
    console.log('Progress:', progress);
  }
});
 
console.log('Flow execution result:', result);
 
// Stop the worker when done
await adapter.stopWorker();

Working with Signals

The adapter makes it easy to work with Temporal signals:

// Define a signal
const cancelOrderSignal = {
  id: 'order.cancel',
  payloadSchema: {
    type: 'object',
    properties: {
      reason: { type: 'string' }
    },
    required: ['reason']
  },
  handler: async (payload, context) => {
    await context.setState('cancelled', true);
    await context.setState('cancelReason', payload.reason);
    return true;
  }
};
 
// Register the signal for a flow
adapter.registerSignalForFlow('order.flow', cancelOrderSignal);
 
// Send a signal to a running workflow
const client = await adapter.getWorkflowClient();
const handle = client.getHandle('order-12345');
await handle.signal('order.cancel', { reason: 'Customer requested cancellation' });

Progress Tracking

The adapter provides built-in support for tracking workflow progress:

// Execute flow with progress tracking
const result = await adapter.executeFlow('order.flow', {
  orderId: '12345',
  items: [{ id: 'item1', quantity: 2 }]
}, {
  onProgress: (progress) => {
    console.log(`Status: ${progress.status}`);
    console.log(`Current step: ${progress.currentStep}`);
    console.log(`Progress: ${progress.stepIndex}/${progress.totalSteps}`);
    
    if (progress.error) {
      console.error(`Error: ${progress.error}`);
    }
  }
});

Error Handling

The adapter provides robust error handling between ZoopFlow and Temporal:

try {
  const result = await adapter.executeFlow('order.flow', {
    orderId: '12345',
    items: [{ id: 'item1', quantity: 2 }]
  });
  console.log('Success:', result);
} catch (error) {
  if (error instanceof FlowError) {
    console.error('Flow Error:', error.message);
    console.error('Error Type:', error.errorType);
    console.error('Context:', error.context);
    
    if (error.nonRetryable) {
      console.error('This error is not retryable');
    }
  } else {
    console.error('Unexpected error:', error);
  }
}

Advanced Configuration

The adapter supports advanced configuration options for both workflow registration and execution:

// Register flow with advanced options
adapter.registerFlowAsWorkflow(orderFlow, {
  signalOptions: {
    validateSignals: true
  },
  errorHandling: {
    retryableErrors: ['TRANSIENT', 'NETWORK'],
    recoveryStrategy: 'retry-with-backoff'
  }
});
 
// Execute flow with advanced options
const result = await adapter.executeFlow('order.flow', {
  orderId: '12345',
  items: [{ id: 'item1', quantity: 2 }]
}, {
  taskQueue: 'high-priority-queue',
  waitForCompletion: true,
  timeoutSeconds: 300,
  workflowOptions: {
    retryPolicy: {
      initialInterval: '1s',
      maximumInterval: '10s',
      backoffCoefficient: 2,
      maximumAttempts: 5
    }
  }
});

API Reference

TemporalAdapter Interface

The core interface for integrating ZoopFlow with Temporal:

export interface TemporalAdapter {
  // Convert a flow to a Temporal workflow
  convertToTemporalWorkflow<TInput, TOutput>(
    flow: FlowDefinition<TInput, TOutput>,
    options?: TemporalFlowOptions
  ): TemporalWorkflowDefinition<TInput, TOutput>;
 
  // Convert a flow graph to a Temporal workflow
  convertFlowGraphToTemporalWorkflow(
    flowGraph: FlowGraphDefinition,
    options?: TemporalFlowOptions
  ): TemporalWorkflowDefinition<unknown, unknown>;
  
  // Register a step as a Temporal activity
  registerStepAsActivity<TInput, TOutput>(
    step: StepDefinition<TInput, TOutput>
  ): void;
  
  // Register a flow as a Temporal workflow
  registerFlowAsWorkflow<TInput, TOutput>(
    flow: FlowDefinition<TInput, TOutput>,
    options?: TemporalRegistrationOptions
  ): void;
  
  // Execute a flow using Temporal
  executeFlow<TInput, TOutput>(
    flowId: string,
    input: TInput,
    options?: TemporalExecutionOptions
  ): Promise<TOutput>;
  
  // Register a signal for a flow
  registerSignalForFlow<T>(
    flowId: string, 
    signal: SignalDefinition<T>
  ): void;
 
  // Start the Temporal worker
  startWorker(): Promise<void>;
 
  // Stop the Temporal worker
  stopWorker(): Promise<void>;
 
  // Get the Temporal connection
  getConnection(): NativeConnection;
 
  // Get the Temporal task queue
  getTaskQueue(): string;
}

TemporalRegistrationOptions

Options for registering a flow with Temporal:

export interface TemporalRegistrationOptions {
  // Whether to validate input and output
  validateData?: boolean;
 
  // Signal handling options
  signalOptions?: {
    // Whether to validate signal payloads
    validateSignals?: boolean;
  };
 
  // Error handling configuration
  errorHandling?: {
    // List of error types that should be considered retryable
    retryableErrors?: string[];
 
    // Recovery strategy for handling errors
    recoveryStrategy?: 'skip-failed-step' | 'retry-with-backoff' | 'use-default-values' | 'custom';
  };
 
  // Temporal-specific workflow options
  workflowOptions?: Record<string, unknown>;
}

TemporalExecutionOptions

Options for executing a flow with Temporal:

export interface TemporalExecutionOptions {
  // The Temporal workflow ID to use
  workflowId?: string;
 
  // Whether to wait for workflow completion
  waitForCompletion?: boolean;
 
  // Workflow execution timeout in seconds
  timeoutSeconds?: number;
 
  // Progress callback
  onProgress?: (progress: WorkflowProgress) => void;
 
  // Custom workflow options
  workflowOptions?: Record<string, unknown>;
}

Comparing with Legacy Integration

The new Temporal Workflow Adapter offers several improvements over the legacy integration:

FeatureLegacy IntegrationNew Adapter
Type SafetyUses any types and type assertionsFully typed interfaces with no type assertions
Error HandlingBasic error propagationComprehensive error handling with proper conversion
Signal IntegrationLimited signal supportFirst-class signal support with validation
Progress TrackingNot availableBuilt-in progress tracking through queries
API DesignComplex, low-level APISimple, high-level, intuitive API
TestabilityDifficult to testEasy to test with clear interfaces
DocumentationLimited documentationComprehensive documentation and examples

Migration Guide

The new adapter is designed to work alongside the legacy integration, allowing for gradual migration:

Legacy Integration

import { TemporalBridge, createStepActivity } from '@zoopflow/core/temporal';
 
// Create a bridge
const bridge = new TemporalBridge(connection, 'task-queue');
 
// Register step and flow
bridge.registerStepAsActivity(step);
bridge.registerFlowAsWorkflow(flow);
 
// Execute flow
await bridge.executeFlow(flowId, input);

New Adapter

import { createTemporalAdapter } from '@zoopflow/core/temporal';
 
// Create an adapter
const adapter = createTemporalAdapter(connection, 'task-queue');
 
// Register step and flow
adapter.registerStepAsActivity(step);
adapter.registerFlowAsWorkflow(flow);
 
// Execute flow
await adapter.executeFlow(flowId, input);

The new adapter provides a more intuitive API with better type safety and error handling, while maintaining similar method names to make migration easier.

Conclusion

The Temporal Workflow Adapter provides a powerful, type-safe, and easy-to-use interface for integrating ZoopFlow with Temporal. It simplifies workflow registration, execution, and monitoring while providing advanced features like signal handling, progress tracking, and robust error handling.

By following the examples in this guide, you can leverage the full power of Temporal while maintaining a clean and intuitive API for your applications.

View Adapter Interface Source Code View Adapter Implementation Source Code View Example Usage