Examples
Workflow Adapter Example

Temporal Workflow Adapter Example

This example demonstrates how to use the new Temporal Workflow Adapter to integrate ZoopFlow with Temporal.io.

The source code for this example is available in the ZoopFlow repository.

Overview

The Temporal Workflow Adapter provides a clean, type-safe interface for integrating ZoopFlow with Temporal. This example shows how to:

  1. Create a Temporal adapter
  2. Define and register steps and flows
  3. Execute workflows with progress tracking
  4. Handle signals
  5. Work with error handling

Prerequisites

  • Temporal server running locally or accessible
  • ZoopFlow core library installed

Step 1: Create a Temporal Connection and Adapter

First, we need to connect to the Temporal server and create an adapter:

import { NativeConnection } from '@temporalio/worker';
import { createTemporalAdapter } from '@zoopflow/core/temporal';
 
async function runExample() {
  console.log('Starting Temporal adapter example...');
  
  try {
    // Connect to Temporal
    console.log('Connecting to Temporal...');
    const connection = await NativeConnection.connect({
      address: 'localhost:7233'
    });
    
    // Create a Temporal adapter
    console.log('Creating Temporal adapter...');
    const adapter = createTemporalAdapter(connection, 'string-processing');
    
    // Continue with the rest of the example...
  } catch (error) {
    console.error('Error in example:', error instanceof Error ? error.message : error);
  }
}

Step 2: Define Steps and Flows

Next, we'll define some steps and flows to use with our adapter:

import { defineStep, defineFlow } from '@zoopflow/core';
 
// Define a step that reverses a string
const stringReverser = defineStep({
  id: 'example.string.reverse',
  version: '1.0.0',
  description: 'Reverses a string',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: {
      text: { type: 'string' }
    },
    required: ['text']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: {
      result: { type: 'string' }
    },
    required: ['result']
  },
  
  // Execute function
  execute: async (input) => {
    console.log(`Reversing string: ${input.text}`);
    
    // Reverse the string
    const result = input.text.split('').reverse().join('');
    
    return { result };
  }
});
 
// Define a step that transforms a string to uppercase
const stringUppercaser = defineStep({
  id: 'example.string.uppercase',
  version: '1.0.0',
  description: 'Transforms a string to uppercase',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: {
      text: { type: 'string' }
    },
    required: ['text']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: {
      result: { type: 'string' }
    },
    required: ['result']
  },
  
  // Execute function
  execute: async (input) => {
    console.log(`Transforming string to uppercase: ${input.text}`);
    
    // Transform to uppercase
    const result = input.text.toUpperCase();
    
    return { result };
  }
});
 
// Define a signal for cancelling the workflow
const cancelSignal = {
  id: 'workflow.cancel',
  payloadSchema: {
    type: 'object',
    properties: {
      reason: { type: 'string' }
    }
  },
  handler: async (payload, context) => {
    console.log(`Cancelling workflow: ${payload.reason}`);
    
    // Set the cancelled flag
    await context.setState('cancelled', true);
    await context.setState('cancelReason', payload.reason);
    
    return true;
  }
};
 
// Define a flow that combines the steps
const stringProcessingFlow = defineFlow({
  id: 'example.string.process',
  version: '1.0.0',
  description: 'Process a string by reversing and transforming to uppercase',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: {
      text: { type: 'string' }
    },
    required: ['text']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: {
      original: { type: 'string' },
      reversed: { type: 'string' },
      uppercase: { type: 'string' },
      cancelled: { type: 'boolean' }
    }
  },
  
  // Steps
  steps: [stringReverser, stringUppercaser],
  
  // Signals
  signals: [cancelSignal]
});

Step 3: Register Steps and Flow

Now, we'll register the steps and flow with the adapter:

// Register steps and flow
console.log('Registering steps and flow...');
adapter.registerStepAsActivity(stringReverser);
adapter.registerStepAsActivity(stringUppercaser);
adapter.registerFlowAsWorkflow(stringProcessingFlow);

Step 4: Start the Worker and Execute the Flow

Next, we'll start the worker and execute the flow:

// Start the worker
console.log('Starting worker...');
await adapter.startWorker();
 
// Execute the flow
console.log('Executing flow...');
const result = await adapter.executeFlow(
  'example.string.process',
  { text: 'Hello, Temporal!' },
  {
    workflowId: 'string-process-' + Date.now(),
    onProgress: (progress) => {
      console.log('Progress:', JSON.stringify(progress, null, 2));
    }
  }
);
 
// Show the result
console.log('Flow execution completed!');
console.log('Result:', JSON.stringify(result, null, 2));
 
// Stop the worker
console.log('Stopping worker...');
await adapter.stopWorker();

Step 5: Sending Signals to a Running Workflow

We can also demonstrate how to send signals to a running workflow:

// Start the workflow without waiting for completion
const handle = await adapter.executeFlow(
  'example.string.process',
  { text: 'Hello, Temporal!' },
  {
    workflowId: 'string-process-' + Date.now(),
    waitForCompletion: false  // Don't wait for completion
  }
);
 
// Send a signal to the workflow
await adapter.signalWorkflow(
  handle.workflowId,
  'workflow.cancel',
  { reason: 'User requested cancellation' }
);
 
// Wait for the result
const result = await handle.result();
console.log('Result after signal:', JSON.stringify(result, null, 2));

Step 6: Error Handling

Finally, let's demonstrate error handling:

try {
  const result = await adapter.executeFlow(
    'non-existent-workflow',
    { text: 'Hello, Temporal!' }
  );
} catch (error) {
  console.error('Error executing workflow:', error.message);
  
  if (error instanceof FlowError) {
    console.error('Error type:', error.errorType);
    console.error('Context:', error.context);
  }
}

Complete Example

Here's the complete example:

import { NativeConnection } from '@temporalio/worker';
import { defineStep, defineFlow } from '@zoopflow/core';
import { createTemporalAdapter } from '@zoopflow/core/temporal';
 
/**
 * Define a step that reverses a string
 */
const stringReverser = defineStep({
  id: 'example.string.reverse',
  version: '1.0.0',
  description: 'Reverses a string',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: {
      text: { type: 'string' }
    },
    required: ['text']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: {
      result: { type: 'string' }
    },
    required: ['result']
  },
  
  // Execute function
  execute: async (input) => {
    console.log(`Reversing string: ${input.text}`);
    
    // Reverse the string
    const result = input.text.split('').reverse().join('');
    
    return { result };
  }
});
 
/**
 * Define a step that transforms a string to uppercase
 */
const stringUppercaser = defineStep({
  id: 'example.string.uppercase',
  version: '1.0.0',
  description: 'Transforms a string to uppercase',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: {
      text: { type: 'string' }
    },
    required: ['text']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: {
      result: { type: 'string' }
    },
    required: ['result']
  },
  
  // Execute function
  execute: async (input) => {
    console.log(`Transforming string to uppercase: ${input.text}`);
    
    // Transform to uppercase
    const result = input.text.toUpperCase();
    
    return { result };
  }
});
 
/**
 * Define a signal for cancelling the workflow
 */
const cancelSignal = {
  id: 'workflow.cancel',
  payloadSchema: {
    type: 'object',
    properties: {
      reason: { type: 'string' }
    }
  },
  handler: async (payload, context) => {
    console.log(`Cancelling workflow: ${payload.reason}`);
    
    // Set the cancelled flag
    await context.setState('cancelled', true);
    await context.setState('cancelReason', payload.reason);
    
    return true;
  }
};
 
/**
 * Define a flow that combines the steps
 */
const stringProcessingFlow = defineFlow({
  id: 'example.string.process',
  version: '1.0.0',
  description: 'Process a string by reversing and transforming to uppercase',
  
  // Input schema
  inputSchema: {
    type: 'object',
    properties: {
      text: { type: 'string' }
    },
    required: ['text']
  },
  
  // Output schema
  outputSchema: {
    type: 'object',
    properties: {
      original: { type: 'string' },
      reversed: { type: 'string' },
      uppercase: { type: 'string' },
      cancelled: { type: 'boolean' }
    }
  },
  
  // Steps
  steps: [stringReverser, stringUppercaser],
  
  // Signals
  signals: [cancelSignal]
});
 
/**
 * Run the example
 */
async function runExample() {
  console.log('Starting Temporal adapter example...');
  
  try {
    // Connect to Temporal
    console.log('Connecting to Temporal...');
    const connection = await NativeConnection.connect({
      address: 'localhost:7233'
    });
    
    // Create a Temporal adapter
    console.log('Creating Temporal adapter...');
    const adapter = createTemporalAdapter(connection, 'string-processing');
    
    // Register steps and flow
    console.log('Registering steps and flow...');
    adapter.registerStepAsActivity(stringReverser);
    adapter.registerStepAsActivity(stringUppercaser);
    adapter.registerFlowAsWorkflow(stringProcessingFlow);
    
    // Start the worker
    console.log('Starting worker...');
    await adapter.startWorker();
    
    // Execute the flow
    console.log('Executing flow...');
    const result = await adapter.executeFlow(
      'example.string.process',
      { text: 'Hello, Temporal!' },
      {
        workflowId: 'string-process-' + Date.now(),
        onProgress: (progress) => {
          console.log('Progress:', JSON.stringify(progress, null, 2));
        }
      }
    );
    
    // Show the result
    console.log('Flow execution completed!');
    console.log('Result:', JSON.stringify(result, null, 2));
    
    // Stop the worker
    console.log('Stopping worker...');
    await adapter.stopWorker();
    
    console.log('Example completed successfully!');
  } catch (error) {
    console.error('Error in example:', error instanceof Error ? error.message : error);
    console.error('Stack:', error instanceof Error ? error.stack : 'No stack trace');
  }
}
 
// Run the example if this file is executed directly
if (require.main === module) {
  runExample().catch(error => {
    console.error('Unhandled error:', error);
    process.exit(1);
  });
}
 
// Export for use in other examples
export { stringReverser, stringUppercaser, stringProcessingFlow, runExample };

Running the Example

To run this example:

  1. Start Temporal server:

    temporal server start-dev
  2. Run the example:

    npx ts-node src/examples/temporal-adapter-example.ts

Expected Output

Starting Temporal adapter example...
Connecting to Temporal...
Creating Temporal adapter...
Registering steps and flow...
Starting worker...
Executing flow...
Progress: {
  "status": "in-progress",
  "currentStep": "Initializing workflow",
  "stepIndex": 0,
  "totalSteps": 2,
  "state": {
    "text": "Hello, Temporal!"
  }
}
Reversing string: Hello, Temporal!
Progress: {
  "status": "in-progress",
  "currentStep": "example.string.reverse",
  "stepIndex": 1,
  "totalSteps": 2,
  "state": {
    "text": "Hello, Temporal!",
    "result": "!laropmeT ,olleH"
  }
}
Transforming string to uppercase: !laropmeT ,olleH
Progress: {
  "status": "in-progress",
  "currentStep": "example.string.uppercase",
  "stepIndex": 2,
  "totalSteps": 2,
  "state": {
    "text": "Hello, Temporal!",
    "result": "!LAROPMET ,OLLEH"
  }
}
Progress: {
  "status": "completed",
  "stepIndex": 2,
  "totalSteps": 2,
  "state": {
    "text": "Hello, Temporal!",
    "result": "!LAROPMET ,OLLEH",
    "original": "Hello, Temporal!",
    "reversed": "!laropmeT ,olleH",
    "uppercase": "!LAROPMET ,OLLEH"
  }
}
Flow execution completed!
Result: {
  "text": "Hello, Temporal!",
  "result": "!LAROPMET ,OLLEH",
  "original": "Hello, Temporal!",
  "reversed": "!laropmeT ,olleH",
  "uppercase": "!LAROPMET ,OLLEH"
}
Stopping worker...
Example completed successfully!

Key Adapter Benefits

This example demonstrates key benefits of the new Temporal Workflow Adapter:

  1. Type Safety: Fully typed interfaces for steps, flows, and signals
  2. Progress Tracking: Built-in progress tracking with detailed state information
  3. Signal Handling: Clean signal registration and handling
  4. Error Handling: Consistent error propagation and handling
  5. Worker Management: Easy worker lifecycle management
  6. Simplified API: Intuitive API that follows a consistent pattern

Conclusion

This example demonstrates how to use the new Temporal Workflow Adapter to integrate ZoopFlow with Temporal.io. The adapter provides a clean, type-safe interface for registering steps and flows, executing workflows, handling signals, and tracking progress.

View Complete Example View Adapter Interface

For more information, see the Temporal Workflow Adapter Guide.