Temporal Workflow Adapter Guide
This guide explains ZoopFlow's new adapter pattern implementation for Temporal.io integration, its benefits, and how to use it in your applications.
For practical examples, see our Temporal Workflow Adapter Example.
Introduction
The Temporal Workflow Adapter is a core component of ZoopFlow that enables seamless integration with Temporal.io, providing durable, reliable workflow execution with powerful features like automatic retries, error handling, signal processing, and state management.
This new adapter pattern implementation is designed to provide a cleaner, more type-safe interface between ZoopFlow and Temporal, with improved error handling and a more consistent developer experience.
Key Features
- Clear Separation of Concerns: Maintains a clean boundary between ZoopFlow's domain model and Temporal's execution model
- Type Safety: Provides fully typed interfaces that eliminate unnecessary type assertions and improves developer experience
- Improved Error Handling: Consistent error propagation between ZoopFlow and Temporal with proper error conversion
- Signal Integration: Clean signal registration and validation with proper TypeScript typing
- Progress Tracking: Built-in support for monitoring workflow progress through Temporal queries
- Simplified API: Intuitive interface for registering and executing flows with Temporal
Architecture Overview
The Temporal Workflow Adapter follows a layered architecture:
- Adapter Interface Layer (
TemporalAdapter): Defines the contract between ZoopFlow and Temporal - Bridge Implementation Layer (
TemporalBridgeImpl): Implements the adapter interface and handles communication with Temporal - Factory Layer (
ModernWorkflowFactory,ActivityFactory): Creates Temporal-compatible workflow and activity functions - Conversion Layer (
FlowDefinitionAdapter): Converts between ZoopFlow flow definitions and Temporal workflow definitions - Error Handling Layer (
TemporalErrorHandler): Handles error conversion and propagation - Types Layer (
TemporalWorkflowDefinition,TemporalOptions): Defines the types used throughout the adapter
This layered approach ensures a clear separation of concerns and makes the code more maintainable and testable.
Getting Started
Installing Dependencies
Ensure you have the required Temporal dependencies:
npm install @temporalio/client @temporalio/worker @temporalio/workflowCreating a Temporal Adapter
import { NativeConnection } from '@temporalio/worker';
import { createTemporalAdapter } from '@zoopflow/core/temporal';
// Connect to Temporal
const connection = await NativeConnection.connect({
address: 'localhost:7233'
});
// Create a Temporal adapter
const adapter = createTemporalAdapter(connection, 'your-task-queue');Registering Steps and Flows
import { defineStep, defineFlow } from '@zoopflow/core';
// Define a step
const processOrderStep = defineStep({
id: 'order.process',
version: '1.0.0',
description: 'Process an order',
inputSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
items: { type: 'array' }
},
required: ['orderId', 'items']
},
outputSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
status: { type: 'string' }
},
required: ['orderId', 'status']
},
execute: async (input, context) => {
// Process order logic
return {
orderId: input.orderId,
status: 'processed'
};
}
});
// Define a flow
const orderFlow = defineFlow({
id: 'order.flow',
version: '1.0.0',
description: 'Order processing flow',
inputSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
items: { type: 'array' }
},
required: ['orderId', 'items']
},
outputSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
status: { type: 'string' }
},
required: ['orderId', 'status']
},
steps: [processOrderStep]
});
// Register the step and flow with Temporal
adapter.registerStepAsActivity(processOrderStep);
adapter.registerFlowAsWorkflow(orderFlow);Executing a Flow
// Start the Temporal worker
await adapter.startWorker();
// Execute the flow
const result = await adapter.executeFlow('order.flow', {
orderId: '12345',
items: [{ id: 'item1', quantity: 2 }]
}, {
workflowId: 'order-12345',
onProgress: (progress) => {
console.log('Progress:', progress);
}
});
console.log('Flow execution result:', result);
// Stop the worker when done
await adapter.stopWorker();Working with Signals
The adapter makes it easy to work with Temporal signals:
// Define a signal
const cancelOrderSignal = {
id: 'order.cancel',
payloadSchema: {
type: 'object',
properties: {
reason: { type: 'string' }
},
required: ['reason']
},
handler: async (payload, context) => {
await context.setState('cancelled', true);
await context.setState('cancelReason', payload.reason);
return true;
}
};
// Register the signal for a flow
adapter.registerSignalForFlow('order.flow', cancelOrderSignal);
// Send a signal to a running workflow
const client = await adapter.getWorkflowClient();
const handle = client.getHandle('order-12345');
await handle.signal('order.cancel', { reason: 'Customer requested cancellation' });Progress Tracking
The adapter provides built-in support for tracking workflow progress:
// Execute flow with progress tracking
const result = await adapter.executeFlow('order.flow', {
orderId: '12345',
items: [{ id: 'item1', quantity: 2 }]
}, {
onProgress: (progress) => {
console.log(`Status: ${progress.status}`);
console.log(`Current step: ${progress.currentStep}`);
console.log(`Progress: ${progress.stepIndex}/${progress.totalSteps}`);
if (progress.error) {
console.error(`Error: ${progress.error}`);
}
}
});Error Handling
The adapter provides robust error handling between ZoopFlow and Temporal:
try {
const result = await adapter.executeFlow('order.flow', {
orderId: '12345',
items: [{ id: 'item1', quantity: 2 }]
});
console.log('Success:', result);
} catch (error) {
if (error instanceof FlowError) {
console.error('Flow Error:', error.message);
console.error('Error Type:', error.errorType);
console.error('Context:', error.context);
if (error.nonRetryable) {
console.error('This error is not retryable');
}
} else {
console.error('Unexpected error:', error);
}
}Advanced Configuration
The adapter supports advanced configuration options for both workflow registration and execution:
// Register flow with advanced options
adapter.registerFlowAsWorkflow(orderFlow, {
signalOptions: {
validateSignals: true
},
errorHandling: {
retryableErrors: ['TRANSIENT', 'NETWORK'],
recoveryStrategy: 'retry-with-backoff'
}
});
// Execute flow with advanced options
const result = await adapter.executeFlow('order.flow', {
orderId: '12345',
items: [{ id: 'item1', quantity: 2 }]
}, {
taskQueue: 'high-priority-queue',
waitForCompletion: true,
timeoutSeconds: 300,
workflowOptions: {
retryPolicy: {
initialInterval: '1s',
maximumInterval: '10s',
backoffCoefficient: 2,
maximumAttempts: 5
}
}
});API Reference
TemporalAdapter Interface
The core interface for integrating ZoopFlow with Temporal:
export interface TemporalAdapter {
// Convert a flow to a Temporal workflow
convertToTemporalWorkflow<TInput, TOutput>(
flow: FlowDefinition<TInput, TOutput>,
options?: TemporalFlowOptions
): TemporalWorkflowDefinition<TInput, TOutput>;
// Convert a flow graph to a Temporal workflow
convertFlowGraphToTemporalWorkflow(
flowGraph: FlowGraphDefinition,
options?: TemporalFlowOptions
): TemporalWorkflowDefinition<unknown, unknown>;
// Register a step as a Temporal activity
registerStepAsActivity<TInput, TOutput>(
step: StepDefinition<TInput, TOutput>
): void;
// Register a flow as a Temporal workflow
registerFlowAsWorkflow<TInput, TOutput>(
flow: FlowDefinition<TInput, TOutput>,
options?: TemporalRegistrationOptions
): void;
// Execute a flow using Temporal
executeFlow<TInput, TOutput>(
flowId: string,
input: TInput,
options?: TemporalExecutionOptions
): Promise<TOutput>;
// Register a signal for a flow
registerSignalForFlow<T>(
flowId: string,
signal: SignalDefinition<T>
): void;
// Start the Temporal worker
startWorker(): Promise<void>;
// Stop the Temporal worker
stopWorker(): Promise<void>;
// Get the Temporal connection
getConnection(): NativeConnection;
// Get the Temporal task queue
getTaskQueue(): string;
}TemporalRegistrationOptions
Options for registering a flow with Temporal:
export interface TemporalRegistrationOptions {
// Whether to validate input and output
validateData?: boolean;
// Signal handling options
signalOptions?: {
// Whether to validate signal payloads
validateSignals?: boolean;
};
// Error handling configuration
errorHandling?: {
// List of error types that should be considered retryable
retryableErrors?: string[];
// Recovery strategy for handling errors
recoveryStrategy?: 'skip-failed-step' | 'retry-with-backoff' | 'use-default-values' | 'custom';
};
// Temporal-specific workflow options
workflowOptions?: Record<string, unknown>;
}TemporalExecutionOptions
Options for executing a flow with Temporal:
export interface TemporalExecutionOptions {
// The Temporal workflow ID to use
workflowId?: string;
// Whether to wait for workflow completion
waitForCompletion?: boolean;
// Workflow execution timeout in seconds
timeoutSeconds?: number;
// Progress callback
onProgress?: (progress: WorkflowProgress) => void;
// Custom workflow options
workflowOptions?: Record<string, unknown>;
}Comparing with Legacy Integration
The new Temporal Workflow Adapter offers several improvements over the legacy integration:
| Feature | Legacy Integration | New Adapter |
|---|---|---|
| Type Safety | Uses any types and type assertions | Fully typed interfaces with no type assertions |
| Error Handling | Basic error propagation | Comprehensive error handling with proper conversion |
| Signal Integration | Limited signal support | First-class signal support with validation |
| Progress Tracking | Not available | Built-in progress tracking through queries |
| API Design | Complex, low-level API | Simple, high-level, intuitive API |
| Testability | Difficult to test | Easy to test with clear interfaces |
| Documentation | Limited documentation | Comprehensive documentation and examples |
Migration Guide
The new adapter is designed to work alongside the legacy integration, allowing for gradual migration:
Legacy Integration
import { TemporalBridge, createStepActivity } from '@zoopflow/core/temporal';
// Create a bridge
const bridge = new TemporalBridge(connection, 'task-queue');
// Register step and flow
bridge.registerStepAsActivity(step);
bridge.registerFlowAsWorkflow(flow);
// Execute flow
await bridge.executeFlow(flowId, input);New Adapter
import { createTemporalAdapter } from '@zoopflow/core/temporal';
// Create an adapter
const adapter = createTemporalAdapter(connection, 'task-queue');
// Register step and flow
adapter.registerStepAsActivity(step);
adapter.registerFlowAsWorkflow(flow);
// Execute flow
await adapter.executeFlow(flowId, input);The new adapter provides a more intuitive API with better type safety and error handling, while maintaining similar method names to make migration easier.
Conclusion
The Temporal Workflow Adapter provides a powerful, type-safe, and easy-to-use interface for integrating ZoopFlow with Temporal. It simplifies workflow registration, execution, and monitoring while providing advanced features like signal handling, progress tracking, and robust error handling.
By following the examples in this guide, you can leverage the full power of Temporal while maintaining a clean and intuitive API for your applications.
View Adapter Interface Source Code View Adapter Implementation Source Code View Example Usage