Signal Handling
Overview
The signal handling system provides a structured way to manage asynchronous events in workflows. Signals allow external systems to asynchronously send events to a running workflow, enabling it to respond to external events while maintaining deterministic execution. This is particularly useful for long-running workflows that need to react to external events without polling.
Key capabilities of the signal handling system include:
- Type-safe signal definitions with JSON Schema validation
- Middleware support for cross-cutting concerns
- Versioning support for evolving signals over time
- Dynamic signal handling for runtime signal registration
- Integration with Temporal's workflow API
Key Concepts
What are Signals?
Signals provide a way for external systems to asynchronously communicate with running workflows. Unlike regular workflow inputs which are provided at the start, signals can be sent at any time during workflow execution, allowing workflows to react to external events without polling.
Core Components
- Signal Definition: A contract that defines a signal's ID, payload schema, and handler. Created using the
defineSignalfunction. - Signal Registry: A central registry that manages signal definitions and dispatches signals to the appropriate handlers.
- Signal Manager: Coordinates signal processing with middleware support and queue management.
- Signal Handler: A function that processes the signal payload and updates workflow state.
- Signal Middleware: Processing components that can transform signals, add functionality, or handle cross-cutting concerns.
- Signal Context: Execution environment providing state management and history tracking for signal handlers.
Signal Lifecycle
- Definition: Signals are defined using the
defineSignalfunction - Registration: Signals are registered with the SignalRegistry
- Integration: The SignalManager connects signals to Temporal
- Reception: External systems send signals to the workflow
- Processing: Signal payloads are validated and handlers are executed
- State Update: Workflow state is updated based on signal processing
Signal Definition and Registration
Defining a Signal
Signals are defined using the defineSignal function, which creates a type-safe signal definition:
import { defineSignal } from '@zoop/flow/core/signals/handlers';
import { JSONSchema7 } from 'json-schema';
// Define the payload schema
const cancelSchema: JSONSchema7 = {
type: 'object',
properties: {
reason: { type: 'string' },
},
};
// Define the signal with type-checking
const cancelSignal = defineSignal<{ reason?: string }>({
id: 'workflow.cancel', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Signal to cancel the workflow',
payloadSchema: cancelSchema,
handler: async (payload, context) => {
// Handle the signal
const { reason } = payload;
// Get current workflow state
const state = context.getState();
// Update workflow state
await context.setState({
...state,
status: 'cancelled',
cancellationReason: reason || 'No reason provided',
});
// Create a checkpoint for this operation
await context.createCheckpoint({
reason: 'Workflow cancelled',
details: { reason }
});
// Log the cancellation
context.log('Workflow cancelled', { reason });
},
});Signal Registration
Our signal system provides two main approaches for signal registration:
1. Global Signal Registry
The simplest approach is to register signals with the global registry:
import { registerSignal } from '@zoop/flow/core/signals/handlers';
// Register a signal globally
registerSignal(cancelSignal);This makes the signal available throughout the system.
2. Flow-Specific Signal Registration
For more granular control, you can use flow-specific registration:
import { SignalRegistryImpl } from '@zoop/flow/core/signals/core';
import { SignalManager } from '@zoop/flow/core/signals/manager';
// Create a registry specific to your flow
const orderFlowRegistry = new SignalRegistryImpl();
// Register signals with this specific registry
orderFlowRegistry.registerSignal(cancelSignal);
orderFlowRegistry.registerSignal(updateAddressSignal);
// Create a manager for this registry
const orderFlowSignalManager = new SignalManager(orderFlowRegistry, {
// Optional configuration
queueSignalsUntilInitialized: true,
middleware: [loggingMiddleware]
});This approach provides benefits like:
- Isolation: Signals for one flow don't affect others
- Context Relevance: Handlers receive context specific to their flow
- Clearer Boundaries: Explicit association between flows and signals
- Reduced Naming Conflicts: Same signal name can be used in different flows
Signal Integration with Temporal
For Temporal integration, our signal system provides adapters:
import {
registerTemporalFlowControlSignals
} from '@zoop/flow/core/signals/temporal';
// Inside your workflow implementation:
function yourWorkflow() {
// Setup Temporal workflow context
const flowId = "your-flow-id";
const executionId = "your-execution-id";
// Register signals with Temporal
registerTemporalFlowControlSignals(
flowId,
executionId,
temporalContext
);
// Rest of your workflow logic...
}This integration handles:
- Registering signal handlers with Temporal's API
- Creating the proper signal context
- Validating payloads
- Processing signals through middleware
Signal Context and State Management
The SignalContext provides a rich API for managing state and tracking execution:
interface SignalContext {
// Readonly properties
readonly signalId: string;
readonly flowId: string;
readonly executionId: string;
readonly workflowId?: string;
readonly runId?: string;
readonly metadata: SignalExecutionMetadata;
readonly history: SignalHistoryEntry[];
// State management
getState(): Record<string, unknown>;
getStateValue<T = unknown>(key: string): T | undefined;
setState(updates: Record<string, unknown>): Promise<void>;
setStateValue(key: string, value: unknown): Promise<void>;
// Execution tracking
log(message: string, metadata?: Record<string, unknown>): void;
addHistoryEntry(type: 'received' | 'started' | 'checkpoint' | 'completed' | 'failed',
details?: Record<string, unknown>): void;
createCheckpoint(metadata?: Record<string, unknown>): Promise<string>;
complete(details?: Record<string, unknown>): Promise<void>;
fail(reason: string, details?: Record<string, unknown>): Promise<void>;
}Each signal handler receives this context, allowing it to:
- Access and update workflow state
- Track execution progress
- Create checkpoints for important operations
- Log relevant information
- Mark signal processing as complete or failed
## Signal Handling Patterns
### 1. Command Pattern
The command pattern is used for signals that instruct the workflow to perform a specific action.
```tsx
const cancelOrderSignal = defineSignal<{ orderId: string; reason: string }>({
id: 'order.cancel', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Signal to cancel an in-progress order',
payloadSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
reason: { type: 'string' },
},
required: ['orderId', 'reason'],
},
handler: async (payload, context) => {
// Get the current state
const state = context.getState();
// Update state with cancellation info
await context.setState({
...state,
status: 'cancelled',
cancelledAt: new Date().toISOString(),
cancellationReason: payload.reason
});
// Create a checkpoint
await context.createCheckpoint({
action: 'cancel',
orderId: payload.orderId,
reason: payload.reason
});
// Log the action
context.log(`Order ${payload.orderId} cancelled: ${payload.reason}`);
},
});2. State Update Pattern
This pattern is used for signals that update a portion of the workflow state without changing execution path.
const updateAddressSignal = defineSignal<{
orderId: string;
address: {
street: string;
city: string;
state: string;
zip: string;
};
}>({
id: 'order.updateaddress', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Update the shipping address for an order',
payloadSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
address: {
type: 'object',
properties: {
street: { type: 'string' },
city: { type: 'string' },
state: { type: 'string' },
zip: { type: 'string' }
},
required: ['street', 'city', 'state', 'zip']
}
},
required: ['orderId', 'address']
},
handler: async (payload, context) => {
// Get the current state
const state = context.getState();
// Update just the shipping address
await context.setState({
...state,
shippingAddress: payload.address,
addressUpdatedAt: new Date().toISOString()
});
// Create a checkpoint for tracking updates
await context.createCheckpoint({
type: 'state_update',
field: 'shippingAddress',
orderId: payload.orderId
});
context.log(`Shipping address updated for order ${payload.orderId}`);
},
});3. Event Notification Pattern
This pattern is used for signals that inform the workflow of external events.
const paymentProcessedSignal = defineSignal<{
orderId: string;
paymentId: string;
amount: number;
status: 'succeeded' | 'failed';
processorResponse?: string;
}>({
id: 'payment.processed', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Notify workflow that payment processing completed',
payloadSchema: {
type: 'object',
properties: {
orderId: { type: 'string' },
paymentId: { type: 'string' },
amount: { type: 'number' },
status: { type: 'string', enum: ['succeeded', 'failed'] },
processorResponse: { type: 'string' }
},
required: ['orderId', 'paymentId', 'amount', 'status']
},
handler: async (payload, context) => {
// Get the current state
const state = context.getState();
// Create state updates based on payment status
const stateUpdates = {
...state,
paymentStatus: payload.status,
paymentId: payload.paymentId,
paymentAmount: payload.amount,
paymentProcessedAt: new Date().toISOString()
};
// Handle different payment outcomes
if (payload.status === 'succeeded') {
stateUpdates.orderStatus = 'payment_received';
} else {
stateUpdates.orderStatus = 'payment_failed';
stateUpdates.failureReason = payload.processorResponse;
}
// Update state in one operation
await context.setState(stateUpdates);
// Create a checkpoint for this important event
await context.createCheckpoint({
type: 'payment_notification',
status: payload.status,
paymentId: payload.paymentId
});
context.log(`Payment ${payload.status} for order ${payload.orderId}`);
},
});4. Signal Aggregation Pattern
For workflows that need to collect multiple signals before proceeding:
const voteSignal = defineSignal<{ voterId: string; approve: boolean }>({
id: 'approval.vote', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Record an approval vote',
payloadSchema: {
type: 'object',
properties: {
voterId: { type: 'string' },
approve: { type: 'boolean' }
},
required: ['voterId', 'approve']
},
handler: async (payload, context) => {
// Get current state
const state = context.getState();
// Get current votes or initialize an empty object
const votes = (state.votes as Record<string, boolean>) || {};
// Add new vote
votes[payload.voterId] = payload.approve;
// Count approvals and rejections
const results = Object.values(votes).reduce(
(acc, vote) => {
if (vote) acc.approvals++;
else acc.rejections++;
return acc;
},
{ approvals: 0, rejections: 0 },
);
// Update state with all changes at once
await context.setState({
...state,
votes,
approvalCount: results.approvals,
rejectionCount: results.rejections,
lastVoteAt: new Date().toISOString(),
lastVoter: payload.voterId
});
// Create a checkpoint for this vote
await context.createCheckpoint({
type: 'vote_recorded',
voterId: payload.voterId,
vote: payload.approve,
currentTally: results
});
context.log(`Vote received from ${payload.voterId}: ${payload.approve ? 'Approve' : 'Reject'}`);
},
});Best Practices
Signal Design
- Use Correct ID Format: Signal IDs must follow the
namespace.nameformat using only lowercase alphanumeric characters.- Two-part structure:
namespace.name - Only lowercase letters and numbers allowed:
[a-z0-9] - No hyphens, underscores, or special characters
- Examples:
workflow.cancel,order.update,user.notify
- Two-part structure:
- Be Descriptive: Use clear, descriptive names that indicate the event or action.
- Use Past Tense for Events: For signals representing events that have occurred, use past tense (e.g.,
order.placed). - Use Imperative for Commands: For signals representing commands, use imperative form (e.g.,
workflow.cancel). - Keep Payloads Small: Signal payloads should contain just what's needed for the handler to function.
- Include Identifiers: Always include relevant identifier fields (e.g.,
orderId,userId).
Signal Implementation
- Keep Handlers Focused: Handlers should do one thing well.
- Make Handlers Idempotent: Handlers should produce the same result if called multiple times with the same payload.
- Handle Errors Gracefully: Catch and handle errors within the handler when possible.
- Update Workflow State: Update the workflow state properly to reflect the signal's effect.
- Avoid Long-Running Operations: Keep handlers short and efficient; move long operations to activities.
Error Handling
- Validate Before Processing: Always validate payloads before processing.
- Use Middleware for Error Handling: Implement error middleware for centralized error handling.
- Distinguish Between Error Types: Handle different error types appropriately (e.g., validation errors vs. processing errors).
- Log Detailed Error Information: Include relevant context in error logs for debugging.
// Example of proper error handling in a signal handler
handler: async (payload, context) => {
try {
// Process signal
await context.setStateValue('someState', payload.someValue);
} catch (error) {
// Log the error but don't throw
context.log(`Error processing signal: ${error.message}`, {
error: true,
signalId: 'domain.action', // Note: In real code, this would follow the namespace.name pattern
payload,
});
// Record error in workflow state
await context.setState({
lastSignalError: {
signalId: 'domain.action',
message: error.message,
timestamp: new Date().toISOString(),
}
});
}
};Performance Considerations
- Monitor Signal Handling Time: Use the performance middleware to track signal processing time.
- Batch Related State Updates: If a signal requires multiple state updates, batch them when possible.
- Control Queue Size: Be aware of the impact of queued signals on workflow memory usage.
Versioning and Compatibility
- Plan for Evolution: Signals might need to evolve over time; plan for versioning.
- Use the
versionedSignalHelper: Apply version markers to signals when making breaking changes. - Maintain Backward Compatibility: When possible, design new versions to be backward compatible.
import { versionedSignal } from '@zoop/flow/core/signals/handlers';
// Create a new version of an existing signal
const updatedCancelSignal = versionedSignal(cancelSignal, 2, {
changeId: 'add-cancel-code',
// New handler that's backward compatible
handler: async (payload, context) => {
// Handle with new fields if present
const { reason, code } = payload as { reason?: string; code?: string };
// Get current state
const state = context.getState();
// Update state with new field if available
await context.setState({
...state,
status: 'cancelled',
cancellationReason: reason || 'No reason provided',
cancellationCode: code,
});
context.log('Workflow cancelled', { reason, code });
},
});API Reference
For detailed information on the Signal API, including core interfaces, helper functions, middleware, and standard signal handlers, please refer to the Signal API Reference documentation.
Examples
Basic Signal Registration and Usage
// Import necessary dependencies
import { defineSignal, registerSignal } from '@zoop/flow/core/signals/handlers';
import { SignalRegistryImpl } from '@zoop/flow/core/signals/core';
import { SignalManager, createLoggingMiddleware } from '@zoop/flow/core/signals/manager';
// Define your signals
const cancelSignal = defineSignal<{ reason?: string }>({
id: 'order.cancel', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Signal to cancel an order',
payloadSchema: {
type: 'object',
properties: {
reason: { type: 'string' }
}
},
handler: async (payload, context) => {
// Signal handling logic
// ...
}
});
const updateAddressSignal = defineSignal<{ orderId: string; address: ShippingAddress }>({
id: 'order.updateaddress', // Using the correct format: lowercase alphanumeric with namespace.name pattern
description: 'Update the shipping address for an order',
payloadSchema: {
// Schema definition
type: 'object',
properties: {
orderId: { type: 'string' },
address: { type: 'object' }
},
required: ['orderId', 'address']
},
handler: async (payload, context) => {
// Signal handling logic
// ...
}
});
// Option 1: Use the global registry
registerSignal(cancelSignal);
registerSignal(updateAddressSignal);
// Option 2: Create a flow-specific registry and manager
const orderFlowRegistry = new SignalRegistryImpl();
orderFlowRegistry.registerSignal(cancelSignal);
orderFlowRegistry.registerSignal(updateAddressSignal);
// Create a manager with middleware
const orderSignalManager = new SignalManager(orderFlowRegistry, {
queueSignalsUntilInitialized: true,
middleware: [createLoggingMiddleware()]
});
// Initialize the manager
orderSignalManager.markInitialized();Sending Signals to a Workflow
import { Connection, WorkflowClient } from '@temporalio/client';
// Connect to Temporal server
const connection = await Connection.connect();
const client = new WorkflowClient({ connection });
// Get a handle to the running workflow
const handle = client.getHandle('workflow-id');
// Send a signal
await handle.signal('app:order.updateaddress', { // Note: Using lowercase signal ID
orderId: 'order-123',
address: {
street: '123 Main St',
city: 'Anytown',
state: 'CA',
zip: '12345',
},
});Using Signal Timeouts
import { withMiddleware } from '@zoop/flow/core/signals/handlers';
// Create a timeout middleware
const timeoutMiddleware = {
before: async (signalId, payload, context) => {
// Start a timer
const startTime = Date.now();
// Store it in context for the after middleware
await context.setStateValue('__signalStartTime', startTime);
return payload;
},
after: async (signalId, payload, context) => {
// Get the start time
const startTime = context.getStateValue<number>('__signalStartTime');
const duration = Date.now() - startTime;
// Log the execution time
context.log(`Signal ${signalId} took ${duration}ms to process`);
}
};
// Apply the middleware to a signal
const enhancedCancelSignal = {
...cancelSignal,
handler: withMiddleware(cancelSignal.handler, timeoutMiddleware)
};Signal Filtering
import { createSignalHandler } from '@zoop/flow/core/signals/handlers';
// Create a filtered signal handler
const adminOnlyHandler = createSignalHandler<{ reason?: string }>({
validator: async (payload, context) => {
// Get current user from context
const user = context.getStateValue('currentUser');
// Only admins can cancel
if (!user || user.role !== 'admin') {
throw new Error('Only admins can cancel workflows');
}
// Return true to indicate validation passed
return true;
},
handler: async (payload, context) => {
// Handler implementation
context.log('Admin cancelled workflow', { reason: payload.reason });
await context.setState({ status: 'cancelled' });
}
});For more complete examples, see the example folder with full workflow implementations incorporating signals.