Temporal Workflow Adapter Example
This example demonstrates how to use the new Temporal Workflow Adapter to integrate ZoopFlow with Temporal.io.
The source code for this example is available in the ZoopFlow repository.
Overview
The Temporal Workflow Adapter provides a clean, type-safe interface for integrating ZoopFlow with Temporal. This example shows how to:
- Create a Temporal adapter
- Define and register steps and flows
- Execute workflows with progress tracking
- Handle signals
- Work with error handling
Prerequisites
- Temporal server running locally or accessible
- ZoopFlow core library installed
Step 1: Create a Temporal Connection and Adapter
First, we need to connect to the Temporal server and create an adapter:
import { NativeConnection } from '@temporalio/worker';
import { createTemporalAdapter } from '@zoopflow/core/temporal';
async function runExample() {
console.log('Starting Temporal adapter example...');
try {
// Connect to Temporal
console.log('Connecting to Temporal...');
const connection = await NativeConnection.connect({
address: 'localhost:7233'
});
// Create a Temporal adapter
console.log('Creating Temporal adapter...');
const adapter = createTemporalAdapter(connection, 'string-processing');
// Continue with the rest of the example...
} catch (error) {
console.error('Error in example:', error instanceof Error ? error.message : error);
}
}Step 2: Define Steps and Flows
Next, we'll define some steps and flows to use with our adapter:
import { defineStep, defineFlow } from '@zoopflow/core';
// Define a step that reverses a string
const stringReverser = defineStep({
id: 'example.string.reverse',
version: '1.0.0',
description: 'Reverses a string',
// Input schema
inputSchema: {
type: 'object',
properties: {
text: { type: 'string' }
},
required: ['text']
},
// Output schema
outputSchema: {
type: 'object',
properties: {
result: { type: 'string' }
},
required: ['result']
},
// Execute function
execute: async (input) => {
console.log(`Reversing string: ${input.text}`);
// Reverse the string
const result = input.text.split('').reverse().join('');
return { result };
}
});
// Define a step that transforms a string to uppercase
const stringUppercaser = defineStep({
id: 'example.string.uppercase',
version: '1.0.0',
description: 'Transforms a string to uppercase',
// Input schema
inputSchema: {
type: 'object',
properties: {
text: { type: 'string' }
},
required: ['text']
},
// Output schema
outputSchema: {
type: 'object',
properties: {
result: { type: 'string' }
},
required: ['result']
},
// Execute function
execute: async (input) => {
console.log(`Transforming string to uppercase: ${input.text}`);
// Transform to uppercase
const result = input.text.toUpperCase();
return { result };
}
});
// Define a signal for cancelling the workflow
const cancelSignal = {
id: 'workflow.cancel',
payloadSchema: {
type: 'object',
properties: {
reason: { type: 'string' }
}
},
handler: async (payload, context) => {
console.log(`Cancelling workflow: ${payload.reason}`);
// Set the cancelled flag
await context.setState('cancelled', true);
await context.setState('cancelReason', payload.reason);
return true;
}
};
// Define a flow that combines the steps
const stringProcessingFlow = defineFlow({
id: 'example.string.process',
version: '1.0.0',
description: 'Process a string by reversing and transforming to uppercase',
// Input schema
inputSchema: {
type: 'object',
properties: {
text: { type: 'string' }
},
required: ['text']
},
// Output schema
outputSchema: {
type: 'object',
properties: {
original: { type: 'string' },
reversed: { type: 'string' },
uppercase: { type: 'string' },
cancelled: { type: 'boolean' }
}
},
// Steps
steps: [stringReverser, stringUppercaser],
// Signals
signals: [cancelSignal]
});Step 3: Register Steps and Flow
Now, we'll register the steps and flow with the adapter:
// Register steps and flow
console.log('Registering steps and flow...');
adapter.registerStepAsActivity(stringReverser);
adapter.registerStepAsActivity(stringUppercaser);
adapter.registerFlowAsWorkflow(stringProcessingFlow);Step 4: Start the Worker and Execute the Flow
Next, we'll start the worker and execute the flow:
// Start the worker
console.log('Starting worker...');
await adapter.startWorker();
// Execute the flow
console.log('Executing flow...');
const result = await adapter.executeFlow(
'example.string.process',
{ text: 'Hello, Temporal!' },
{
workflowId: 'string-process-' + Date.now(),
onProgress: (progress) => {
console.log('Progress:', JSON.stringify(progress, null, 2));
}
}
);
// Show the result
console.log('Flow execution completed!');
console.log('Result:', JSON.stringify(result, null, 2));
// Stop the worker
console.log('Stopping worker...');
await adapter.stopWorker();Step 5: Sending Signals to a Running Workflow
We can also demonstrate how to send signals to a running workflow:
// Start the workflow without waiting for completion
const handle = await adapter.executeFlow(
'example.string.process',
{ text: 'Hello, Temporal!' },
{
workflowId: 'string-process-' + Date.now(),
waitForCompletion: false // Don't wait for completion
}
);
// Send a signal to the workflow
await adapter.signalWorkflow(
handle.workflowId,
'workflow.cancel',
{ reason: 'User requested cancellation' }
);
// Wait for the result
const result = await handle.result();
console.log('Result after signal:', JSON.stringify(result, null, 2));Step 6: Error Handling
Finally, let's demonstrate error handling:
try {
const result = await adapter.executeFlow(
'non-existent-workflow',
{ text: 'Hello, Temporal!' }
);
} catch (error) {
console.error('Error executing workflow:', error.message);
if (error instanceof FlowError) {
console.error('Error type:', error.errorType);
console.error('Context:', error.context);
}
}Complete Example
Here's the complete example:
import { NativeConnection } from '@temporalio/worker';
import { defineStep, defineFlow } from '@zoopflow/core';
import { createTemporalAdapter } from '@zoopflow/core/temporal';
/**
* Define a step that reverses a string
*/
const stringReverser = defineStep({
id: 'example.string.reverse',
version: '1.0.0',
description: 'Reverses a string',
// Input schema
inputSchema: {
type: 'object',
properties: {
text: { type: 'string' }
},
required: ['text']
},
// Output schema
outputSchema: {
type: 'object',
properties: {
result: { type: 'string' }
},
required: ['result']
},
// Execute function
execute: async (input) => {
console.log(`Reversing string: ${input.text}`);
// Reverse the string
const result = input.text.split('').reverse().join('');
return { result };
}
});
/**
* Define a step that transforms a string to uppercase
*/
const stringUppercaser = defineStep({
id: 'example.string.uppercase',
version: '1.0.0',
description: 'Transforms a string to uppercase',
// Input schema
inputSchema: {
type: 'object',
properties: {
text: { type: 'string' }
},
required: ['text']
},
// Output schema
outputSchema: {
type: 'object',
properties: {
result: { type: 'string' }
},
required: ['result']
},
// Execute function
execute: async (input) => {
console.log(`Transforming string to uppercase: ${input.text}`);
// Transform to uppercase
const result = input.text.toUpperCase();
return { result };
}
});
/**
* Define a signal for cancelling the workflow
*/
const cancelSignal = {
id: 'workflow.cancel',
payloadSchema: {
type: 'object',
properties: {
reason: { type: 'string' }
}
},
handler: async (payload, context) => {
console.log(`Cancelling workflow: ${payload.reason}`);
// Set the cancelled flag
await context.setState('cancelled', true);
await context.setState('cancelReason', payload.reason);
return true;
}
};
/**
* Define a flow that combines the steps
*/
const stringProcessingFlow = defineFlow({
id: 'example.string.process',
version: '1.0.0',
description: 'Process a string by reversing and transforming to uppercase',
// Input schema
inputSchema: {
type: 'object',
properties: {
text: { type: 'string' }
},
required: ['text']
},
// Output schema
outputSchema: {
type: 'object',
properties: {
original: { type: 'string' },
reversed: { type: 'string' },
uppercase: { type: 'string' },
cancelled: { type: 'boolean' }
}
},
// Steps
steps: [stringReverser, stringUppercaser],
// Signals
signals: [cancelSignal]
});
/**
* Run the example
*/
async function runExample() {
console.log('Starting Temporal adapter example...');
try {
// Connect to Temporal
console.log('Connecting to Temporal...');
const connection = await NativeConnection.connect({
address: 'localhost:7233'
});
// Create a Temporal adapter
console.log('Creating Temporal adapter...');
const adapter = createTemporalAdapter(connection, 'string-processing');
// Register steps and flow
console.log('Registering steps and flow...');
adapter.registerStepAsActivity(stringReverser);
adapter.registerStepAsActivity(stringUppercaser);
adapter.registerFlowAsWorkflow(stringProcessingFlow);
// Start the worker
console.log('Starting worker...');
await adapter.startWorker();
// Execute the flow
console.log('Executing flow...');
const result = await adapter.executeFlow(
'example.string.process',
{ text: 'Hello, Temporal!' },
{
workflowId: 'string-process-' + Date.now(),
onProgress: (progress) => {
console.log('Progress:', JSON.stringify(progress, null, 2));
}
}
);
// Show the result
console.log('Flow execution completed!');
console.log('Result:', JSON.stringify(result, null, 2));
// Stop the worker
console.log('Stopping worker...');
await adapter.stopWorker();
console.log('Example completed successfully!');
} catch (error) {
console.error('Error in example:', error instanceof Error ? error.message : error);
console.error('Stack:', error instanceof Error ? error.stack : 'No stack trace');
}
}
// Run the example if this file is executed directly
if (require.main === module) {
runExample().catch(error => {
console.error('Unhandled error:', error);
process.exit(1);
});
}
// Export for use in other examples
export { stringReverser, stringUppercaser, stringProcessingFlow, runExample };Running the Example
To run this example:
-
Start Temporal server:
temporal server start-dev -
Run the example:
npx ts-node src/examples/temporal-adapter-example.ts
Expected Output
Starting Temporal adapter example...
Connecting to Temporal...
Creating Temporal adapter...
Registering steps and flow...
Starting worker...
Executing flow...
Progress: {
"status": "in-progress",
"currentStep": "Initializing workflow",
"stepIndex": 0,
"totalSteps": 2,
"state": {
"text": "Hello, Temporal!"
}
}
Reversing string: Hello, Temporal!
Progress: {
"status": "in-progress",
"currentStep": "example.string.reverse",
"stepIndex": 1,
"totalSteps": 2,
"state": {
"text": "Hello, Temporal!",
"result": "!laropmeT ,olleH"
}
}
Transforming string to uppercase: !laropmeT ,olleH
Progress: {
"status": "in-progress",
"currentStep": "example.string.uppercase",
"stepIndex": 2,
"totalSteps": 2,
"state": {
"text": "Hello, Temporal!",
"result": "!LAROPMET ,OLLEH"
}
}
Progress: {
"status": "completed",
"stepIndex": 2,
"totalSteps": 2,
"state": {
"text": "Hello, Temporal!",
"result": "!LAROPMET ,OLLEH",
"original": "Hello, Temporal!",
"reversed": "!laropmeT ,olleH",
"uppercase": "!LAROPMET ,OLLEH"
}
}
Flow execution completed!
Result: {
"text": "Hello, Temporal!",
"result": "!LAROPMET ,OLLEH",
"original": "Hello, Temporal!",
"reversed": "!laropmeT ,olleH",
"uppercase": "!LAROPMET ,OLLEH"
}
Stopping worker...
Example completed successfully!Key Adapter Benefits
This example demonstrates key benefits of the new Temporal Workflow Adapter:
- Type Safety: Fully typed interfaces for steps, flows, and signals
- Progress Tracking: Built-in progress tracking with detailed state information
- Signal Handling: Clean signal registration and handling
- Error Handling: Consistent error propagation and handling
- Worker Management: Easy worker lifecycle management
- Simplified API: Intuitive API that follows a consistent pattern
Conclusion
This example demonstrates how to use the new Temporal Workflow Adapter to integrate ZoopFlow with Temporal.io. The adapter provides a clean, type-safe interface for registering steps and flows, executing workflows, handling signals, and tracking progress.
View Complete Example View Adapter InterfaceFor more information, see the Temporal Workflow Adapter Guide.