Comprehensive Guide to Temporal Integration in ZoopFlow
This guide provides detailed information about ZoopFlow's integration with Temporal.io, a resilient workflow orchestration platform. It covers architecture, implementation details, and best practices.
See our Temporal Integration Examples to get started with working code.
We've released a new Temporal Workflow Adapter that provides an improved, type-safe interface for Temporal integration. Check out the Workflow Adapter Example to see it in action.
Table of Contents
- Introduction to Temporal
- ZoopFlow and Temporal Integration Architecture
- Mapping ZoopFlow Concepts to Temporal
- The Temporal Bridge Component
- Workflow Wrapper Implementation
- Worker Management and Deployment
- Step to Activity Mapping
- Flow to Workflow Mapping
- Signal Handling with Temporal
- Error Handling and Recovery
- State Persistence
- Performance Considerations
- Advanced Temporal Features Integration
- Configuration Options
- Examples of Temporal Integration
Introduction to Temporal
Temporal is a resilient, distributed workflow orchestration platform that enables developers to build reliable, scalable, and maintainable applications. It provides durable execution guarantees, automatic state persistence, and built-in error handling.
Key Temporal features leveraged by ZoopFlow include:
- Durable Execution: Workflows automatically resume after failures, power outages, or network issues
- Workflow History: Complete audit trail of every workflow execution
- Retries and Error Handling: Configurable retry policies and error handling strategies
- Timers and Scheduling: Built-in support for delays, timeouts, and cron schedules
- Signals and Queries: Dynamic control and inspection of running workflows
- Scalability: Horizontal scaling for both workflows and activities
- Multi-language Support: SDKs for multiple programming languages
Temporal serves as the execution engine for ZoopFlow, enabling reliable execution of complex business processes across distributed systems.
ZoopFlow and Temporal Integration Architecture
ZoopFlow's integration with Temporal follows a layered architecture designed to leverage Temporal's capabilities while maintaining flexibility and extensibility:
- ZoopFlow High-Level API: User-facing API for defining flows and steps
- ZoopFlow Core Layer: Internal representation of flows, steps, and their relationships
- Temporal Bridge Layer: Translation between ZoopFlow abstractions and Temporal primitives
- Temporal SDK: Direct interaction with Temporal services
- Temporal Cluster: The runtime environment for workflow execution
This architecture provides several benefits:
- Abstraction: ZoopFlow users don't need to understand Temporal details
- Flexibility: ZoopFlow can evolve independently from Temporal
- Performance: Direct mapping to Temporal primitives for efficient execution
- Extensibility: New features can be added without changing the core API
The integration is composed of several key components:
- Flow Definition Adapter: Converts JSON flow definitions to Temporal-compatible workflows
- Temporal Adapter: Manages the connection between the Flow system and Temporal
- Workflow Wrapper: Encapsulates flow execution within Temporal workflows
- State Management: Handles workflow state persistence and recovery
- Error Handling: Provides mechanisms for error detection, classification, and recovery
- Signal Handlers: Enables dynamic control over workflow execution
These components work together to provide a seamless integration between ZoopFlow's high-level flow abstractions and Temporal's execution capabilities.
Mapping ZoopFlow Concepts to Temporal
ZoopFlow maps its high-level abstractions to Temporal's core primitives:
| ZoopFlow Concept | Temporal Concept | Description |
|---|---|---|
| Step | Activity | A discrete unit of work |
| Flow | Workflow | An orchestration of steps |
| Step Context | Activity Context | Execution context for steps |
| Flow Context | Workflow Context | Orchestration context for flows |
| Retry Config | Retry Policy | Configuration for retrying failed operations |
| Signal | Workflow Signal | Asynchronous message to a running workflow |
| Flow State | Workflow State | Persisted state of a flow execution |
| Error Handler | Error Handler | Component for handling and recovering from errors |
This mapping ensures that ZoopFlow's concepts are efficiently translated to Temporal primitives while maintaining their semantic meaning and behavior.
The Temporal Integration Components
The Temporal integration consists of several components that work together to bridge between the Flow system and Temporal:
- TemporalAdapter: The primary connection point between ZoopFlow and Temporal's API
- SignalRegistry & SignalManager: Manages signal definitions and handlers
- Temporal-Specific Signal Context: Adapts the SignalContext to work with Temporal's workflow context
The ZoopFlow's Temporal integration provides functionality for:
- Registering steps as Temporal activities
- Registering flows as Temporal workflows
- Registering and processing signals
- Starting and managing Temporal workers
- Managing workflow state
- Handling errors consistently
For a concrete implementation, see our basic example which demonstrates the use of the Temporal integration components.
Key Integration Features
import { registerStepAsActivity } from '@zoop/flow/core/temporal/activity';
import { registerFlowAsWorkflow } from '@zoop/flow/core/temporal/workflow';
import { registerTemporalFlowControlSignals } from '@zoop/flow/core/signals/temporal';
// Example of registering a step as an activity
registerStepAsActivity(stringTransformerStep);
// Example of registering a flow as a workflow
registerFlowAsWorkflow(myFlow);
// Example of registering standard signals for a flow
registerTemporalFlowControlSignals(
'my-flow-id',
'execution-123',
temporalContext
);Step to Activity Mapping
ZoopFlow steps are mapped to Temporal activities, which are the fundamental unit of work in Temporal. Here's how the mapping is implemented:
// Define a step in ZoopFlow
const myStep = defineStep<InputType, OutputType>({
id: 'domain.resource.action',
version: '1.0.0',
// Input schema
inputSchema: {
type: 'object',
properties: { /* ... */ },
required: ['field1', 'field2']
},
// Output schema
outputSchema: {
type: 'object',
properties: { /* ... */ },
required: ['result']
},
// Retry configuration
retryConfig: {
maxAttempts: 3,
backoffCoefficient: 1.5,
initialIntervalSeconds: 1
},
// Step implementation
execute: async (input, context) => {
// Step logic here
return { result: 'success' };
}
});
// Register the step as a Temporal activity
temporalBridge.registerStepAsActivity(myStep);The step's retry configuration is converted to a Temporal retry policy, and the step's timeout configuration is mapped to Temporal activity timeouts.
Signal Handling with Temporal
ZoopFlow leverages Temporal's signal handling capabilities to enable dynamic control over workflow execution. The system supports several standard signal types:
Signal Registration
Signals are registered for specific flows:
import { SignalRegistryImpl } from '@zoop/flow/core/signals/core';
import { SignalManager } from '@zoop/flow/core/signals/manager';
import { defineSignal } from '@zoop/flow/core/signals/handlers';
import { registerTemporalFlowControlSignals } from '@zoop/flow/core/signals/temporal';
// Define a signal
const pauseSignal = defineSignal<{ reason?: string }>({
namespace: 'flow',
name: 'pause',
description: 'Signal to pause the workflow',
payloadSchema: {
type: 'object',
properties: {
reason: { type: 'string' }
}
},
handler: async (payload, context) => {
// Signal handler implementation
await context.setState({
paused: true,
pausedAt: new Date().toISOString(),
pauseReason: payload.reason
});
// Create a checkpoint for this action
await context.createCheckpoint({
action: 'pause',
reason: payload.reason
});
// Log the action
context.log('Workflow paused', { reason: payload.reason });
}
});
// Create a registry and register the signal
const registry = new SignalRegistryImpl();
registry.registerSignal(pauseSignal);
// Create a signal manager
const signalManager = new SignalManager(registry);
// In your workflow function, register Temporal integration
function myWorkflow(flowId: string, executionId: string, temporalContext: any) {
// Register standard and custom signals
registerTemporalFlowControlSignals(flowId, executionId, temporalContext);
// Initialize the signal manager
signalManager.markInitialized();
}Sending Signals
Signals can be sent to running workflows:
// Send a signal to a workflow
await client.workflow.signal(
handle,
'flow.pause',
{ reason: 'Manual pause requested by user' }
);Error Handling and Recovery
ZoopFlow provides a standardized error handling interface that works consistently across both the core system and Temporal integration. This unified approach ensures consistent error handling regardless of whether flows are executed directly or through Temporal.
Unified Error Handling Interface
The system implements a standardized error handling interface that works across both environments:
/**
* Unified error context interface combining both core and temporal contexts
*/
export interface UnifiedErrorContext {
// Flow context
flowId?: string;
executionId?: string;
stepId?: string;
// Temporal context
nodeId?: string;
workflowId?: string;
runId?: string;
activityId?: string;
// State information
state?: Record<string, unknown>;
context?: Record<string, unknown>;
// Error information
originalError?: Error;
cause?: Error | unknown;
// Additional metadata
[key: string]: unknown;
}
/**
* Result of error handling
*/
export interface ErrorHandlingResult {
handled: boolean;
strategy: RecoveryStrategy;
continueExecution: boolean;
error?: Error;
context?: Record<string, unknown>;
}
/**
* Interface for unified error handler
*/
export interface UnifiedErrorHandler {
handleError(error: unknown, context?: UnifiedErrorContext): Promise<ErrorHandlingResult>;
}Adapter System
The error handling system includes adapters that convert between core and Temporal error types:
- CoreErrorHandlerAdapter: Adapts the core error handler registry to use the unified interface
- TemporalErrorHandlerAdapter: Adapts the Temporal error handler to use the unified interface
- TemporalErrorAdapter: Provides utilities for converting between error types
Example Usage
// Create a unified error handler - works with both core and Temporal
const errorHandler = UnifiedErrorHandlerFactory.createHandler();
// Handle errors consistently across systems
try {
// Execute operation
await executeOperation();
} catch (error) {
// Handle error using unified interface
const result = await errorHandler.handleError(error, {
flowId: 'order-processing-flow',
stepId: 'payment-processing',
state: getCurrentState()
});
if (result.handled) {
// Error was handled
console.log(`Error handled with strategy: ${result.strategy}`);
if (result.continueExecution) {
// Continue execution
await continueOperation(result.context);
} else {
// Stop execution
await cleanupResources(result.context);
}
} else {
// Error was not handled, propagate it
throw result.error || error;
}
}
// For Temporal workflows, use the Temporal-specific factory
const temporalErrorHandler = TemporalUnifiedErrorHandlerFactory.createHandler({
defaultStrategy: RecoveryStrategy.RETRY_WITH_BACKOFF,
continueOnError: true
});
// Handle Temporal-specific errors with the same interface
const result = await temporalErrorHandler.handleError(workflowError, {
workflowId: workflowInfo.workflowId,
runId: workflowInfo.runId,
nodeId: 'payment-processing'
});Examples of Temporal Integration
For detailed examples of ZoopFlow's Temporal integration, please refer to the following examples:
- Basic Integration Example: Shows fundamental integration between ZoopFlow and Temporal
- Error Handling Example: Demonstrates error recovery strategies
- Signal Handling Example: Shows dynamic workflow control
Full Source Code Reference
The full implementation of ZoopFlow's Temporal integration can be found in the following directories:
View Core Temporal Implementation View Temporal Examples