Guides
Temporal Integration

Comprehensive Guide to Temporal Integration in ZoopFlow

This guide provides detailed information about ZoopFlow's integration with Temporal.io, a resilient workflow orchestration platform. It covers architecture, implementation details, and best practices.

See our Temporal Integration Examples to get started with working code.

We've released a new Temporal Workflow Adapter that provides an improved, type-safe interface for Temporal integration. Check out the Workflow Adapter Example to see it in action.

Table of Contents

Introduction to Temporal

Temporal is a resilient, distributed workflow orchestration platform that enables developers to build reliable, scalable, and maintainable applications. It provides durable execution guarantees, automatic state persistence, and built-in error handling.

Key Temporal features leveraged by ZoopFlow include:

  • Durable Execution: Workflows automatically resume after failures, power outages, or network issues
  • Workflow History: Complete audit trail of every workflow execution
  • Retries and Error Handling: Configurable retry policies and error handling strategies
  • Timers and Scheduling: Built-in support for delays, timeouts, and cron schedules
  • Signals and Queries: Dynamic control and inspection of running workflows
  • Scalability: Horizontal scaling for both workflows and activities
  • Multi-language Support: SDKs for multiple programming languages

Temporal serves as the execution engine for ZoopFlow, enabling reliable execution of complex business processes across distributed systems.

ZoopFlow and Temporal Integration Architecture

ZoopFlow's integration with Temporal follows a layered architecture designed to leverage Temporal's capabilities while maintaining flexibility and extensibility:

  1. ZoopFlow High-Level API: User-facing API for defining flows and steps
  2. ZoopFlow Core Layer: Internal representation of flows, steps, and their relationships
  3. Temporal Bridge Layer: Translation between ZoopFlow abstractions and Temporal primitives
  4. Temporal SDK: Direct interaction with Temporal services
  5. Temporal Cluster: The runtime environment for workflow execution

This architecture provides several benefits:

  • Abstraction: ZoopFlow users don't need to understand Temporal details
  • Flexibility: ZoopFlow can evolve independently from Temporal
  • Performance: Direct mapping to Temporal primitives for efficient execution
  • Extensibility: New features can be added without changing the core API

The integration is composed of several key components:

  1. Flow Definition Adapter: Converts JSON flow definitions to Temporal-compatible workflows
  2. Temporal Adapter: Manages the connection between the Flow system and Temporal
  3. Workflow Wrapper: Encapsulates flow execution within Temporal workflows
  4. State Management: Handles workflow state persistence and recovery
  5. Error Handling: Provides mechanisms for error detection, classification, and recovery
  6. Signal Handlers: Enables dynamic control over workflow execution

These components work together to provide a seamless integration between ZoopFlow's high-level flow abstractions and Temporal's execution capabilities.

Mapping ZoopFlow Concepts to Temporal

ZoopFlow maps its high-level abstractions to Temporal's core primitives:

ZoopFlow ConceptTemporal ConceptDescription
StepActivityA discrete unit of work
FlowWorkflowAn orchestration of steps
Step ContextActivity ContextExecution context for steps
Flow ContextWorkflow ContextOrchestration context for flows
Retry ConfigRetry PolicyConfiguration for retrying failed operations
SignalWorkflow SignalAsynchronous message to a running workflow
Flow StateWorkflow StatePersisted state of a flow execution
Error HandlerError HandlerComponent for handling and recovering from errors

This mapping ensures that ZoopFlow's concepts are efficiently translated to Temporal primitives while maintaining their semantic meaning and behavior.

The Temporal Integration Components

The Temporal integration consists of several components that work together to bridge between the Flow system and Temporal:

  1. TemporalAdapter: The primary connection point between ZoopFlow and Temporal's API
  2. SignalRegistry & SignalManager: Manages signal definitions and handlers
  3. Temporal-Specific Signal Context: Adapts the SignalContext to work with Temporal's workflow context

The ZoopFlow's Temporal integration provides functionality for:

  • Registering steps as Temporal activities
  • Registering flows as Temporal workflows
  • Registering and processing signals
  • Starting and managing Temporal workers
  • Managing workflow state
  • Handling errors consistently

For a concrete implementation, see our basic example which demonstrates the use of the Temporal integration components.

Key Integration Features

import { registerStepAsActivity } from '@zoop/flow/core/temporal/activity';
import { registerFlowAsWorkflow } from '@zoop/flow/core/temporal/workflow';
import { registerTemporalFlowControlSignals } from '@zoop/flow/core/signals/temporal';
 
// Example of registering a step as an activity
registerStepAsActivity(stringTransformerStep);
 
// Example of registering a flow as a workflow
registerFlowAsWorkflow(myFlow);
 
// Example of registering standard signals for a flow
registerTemporalFlowControlSignals(
  'my-flow-id',
  'execution-123',
  temporalContext
);

Step to Activity Mapping

ZoopFlow steps are mapped to Temporal activities, which are the fundamental unit of work in Temporal. Here's how the mapping is implemented:

// Define a step in ZoopFlow
const myStep = defineStep<InputType, OutputType>({
  id: 'domain.resource.action',
  version: '1.0.0',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: { /* ... */ },
    required: ['field1', 'field2']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: { /* ... */ },
    required: ['result']
  },
  
  // Retry configuration
  retryConfig: {
    maxAttempts: 3,
    backoffCoefficient: 1.5,
    initialIntervalSeconds: 1
  },
  
  // Step implementation
  execute: async (input, context) => {
    // Step logic here
    return { result: 'success' };
  }
});
 
// Register the step as a Temporal activity
temporalBridge.registerStepAsActivity(myStep);

The step's retry configuration is converted to a Temporal retry policy, and the step's timeout configuration is mapped to Temporal activity timeouts.

Signal Handling with Temporal

ZoopFlow leverages Temporal's signal handling capabilities to enable dynamic control over workflow execution. The system supports several standard signal types:

Signal Registration

Signals are registered for specific flows:

import { SignalRegistryImpl } from '@zoop/flow/core/signals/core';
import { SignalManager } from '@zoop/flow/core/signals/manager';
import { defineSignal } from '@zoop/flow/core/signals/handlers';
import { registerTemporalFlowControlSignals } from '@zoop/flow/core/signals/temporal';
 
// Define a signal
const pauseSignal = defineSignal<{ reason?: string }>({
  namespace: 'flow',
  name: 'pause',
  description: 'Signal to pause the workflow',
  payloadSchema: {
    type: 'object',
    properties: {
      reason: { type: 'string' }
    }
  },
  handler: async (payload, context) => {
    // Signal handler implementation
    await context.setState({ 
      paused: true, 
      pausedAt: new Date().toISOString(),
      pauseReason: payload.reason
    });
    
    // Create a checkpoint for this action
    await context.createCheckpoint({
      action: 'pause',
      reason: payload.reason
    });
    
    // Log the action
    context.log('Workflow paused', { reason: payload.reason });
  }
});
 
// Create a registry and register the signal
const registry = new SignalRegistryImpl();
registry.registerSignal(pauseSignal);
 
// Create a signal manager
const signalManager = new SignalManager(registry);
 
// In your workflow function, register Temporal integration
function myWorkflow(flowId: string, executionId: string, temporalContext: any) {
  // Register standard and custom signals
  registerTemporalFlowControlSignals(flowId, executionId, temporalContext);
  
  // Initialize the signal manager
  signalManager.markInitialized();
}

Sending Signals

Signals can be sent to running workflows:

// Send a signal to a workflow
await client.workflow.signal(
  handle, 
  'flow.pause',
  { reason: 'Manual pause requested by user' }
);

Error Handling and Recovery

ZoopFlow provides a standardized error handling interface that works consistently across both the core system and Temporal integration. This unified approach ensures consistent error handling regardless of whether flows are executed directly or through Temporal.

Unified Error Handling Interface

The system implements a standardized error handling interface that works across both environments:

/**
 * Unified error context interface combining both core and temporal contexts
 */
export interface UnifiedErrorContext {
  // Flow context
  flowId?: string;
  executionId?: string;
  stepId?: string;
  
  // Temporal context
  nodeId?: string;
  workflowId?: string;
  runId?: string;
  activityId?: string;
  
  // State information
  state?: Record<string, unknown>;
  context?: Record<string, unknown>;
  
  // Error information
  originalError?: Error;
  cause?: Error | unknown;
  
  // Additional metadata
  [key: string]: unknown;
}
 
/**
 * Result of error handling
 */
export interface ErrorHandlingResult {
  handled: boolean;
  strategy: RecoveryStrategy;
  continueExecution: boolean;
  error?: Error;
  context?: Record<string, unknown>;
}
 
/**
 * Interface for unified error handler
 */
export interface UnifiedErrorHandler {
  handleError(error: unknown, context?: UnifiedErrorContext): Promise<ErrorHandlingResult>;
}

Adapter System

The error handling system includes adapters that convert between core and Temporal error types:

  1. CoreErrorHandlerAdapter: Adapts the core error handler registry to use the unified interface
  2. TemporalErrorHandlerAdapter: Adapts the Temporal error handler to use the unified interface
  3. TemporalErrorAdapter: Provides utilities for converting between error types

Example Usage

// Create a unified error handler - works with both core and Temporal
const errorHandler = UnifiedErrorHandlerFactory.createHandler();
 
// Handle errors consistently across systems
try {
  // Execute operation
  await executeOperation();
} catch (error) {
  // Handle error using unified interface
  const result = await errorHandler.handleError(error, {
    flowId: 'order-processing-flow',
    stepId: 'payment-processing',
    state: getCurrentState()
  });
  
  if (result.handled) {
    // Error was handled
    console.log(`Error handled with strategy: ${result.strategy}`);
    
    if (result.continueExecution) {
      // Continue execution
      await continueOperation(result.context);
    } else {
      // Stop execution
      await cleanupResources(result.context);
    }
  } else {
    // Error was not handled, propagate it
    throw result.error || error;
  }
}
 
// For Temporal workflows, use the Temporal-specific factory
const temporalErrorHandler = TemporalUnifiedErrorHandlerFactory.createHandler({
  defaultStrategy: RecoveryStrategy.RETRY_WITH_BACKOFF,
  continueOnError: true
});
 
// Handle Temporal-specific errors with the same interface
const result = await temporalErrorHandler.handleError(workflowError, {
  workflowId: workflowInfo.workflowId,
  runId: workflowInfo.runId,
  nodeId: 'payment-processing'
});

Examples of Temporal Integration

For detailed examples of ZoopFlow's Temporal integration, please refer to the following examples:

  1. Basic Integration Example: Shows fundamental integration between ZoopFlow and Temporal
  2. Error Handling Example: Demonstrates error recovery strategies
  3. Signal Handling Example: Shows dynamic workflow control
View All Temporal Examples

Full Source Code Reference

The full implementation of ZoopFlow's Temporal integration can be found in the following directories:

View Core Temporal Implementation View Temporal Examples