Core Concepts
Signal Handling

Signal Handling

Overview

The signal handling system provides a structured way to manage asynchronous events in workflows. Signals allow external systems to asynchronously send events to a running workflow, enabling it to respond to external events while maintaining deterministic execution. This is particularly useful for long-running workflows that need to react to external events without polling.

Key capabilities of the signal handling system include:

  • Type-safe signal definitions with JSON Schema validation
  • Middleware support for cross-cutting concerns
  • Versioning support for evolving signals over time
  • Dynamic signal handling for runtime signal registration
  • Integration with Temporal's workflow API

Key Concepts

What are Signals?

Signals provide a way for external systems to asynchronously communicate with running workflows. Unlike regular workflow inputs which are provided at the start, signals can be sent at any time during workflow execution, allowing workflows to react to external events without polling.

Core Components

  • Signal Definition: A contract that defines a signal's ID, payload schema, and handler. Created using the defineSignal function.
  • Signal Registry: A central registry that manages signal definitions and dispatches signals to the appropriate handlers.
  • Signal Manager: Coordinates signal processing with middleware support and queue management.
  • Signal Handler: A function that processes the signal payload and updates workflow state.
  • Signal Middleware: Processing components that can transform signals, add functionality, or handle cross-cutting concerns.
  • Signal Context: Execution environment providing state management and history tracking for signal handlers.

Signal Lifecycle

  1. Definition: Signals are defined using the defineSignal function
  2. Registration: Signals are registered with the SignalRegistry
  3. Integration: The SignalManager connects signals to Temporal
  4. Reception: External systems send signals to the workflow
  5. Processing: Signal payloads are validated and handlers are executed
  6. State Update: Workflow state is updated based on signal processing

Signal Definition and Registration

Defining a Signal

Signals are defined using the defineSignal function, which creates a type-safe signal definition:

import { defineSignal } from '@zoop/flow/core/signals/handlers';
import { JSONSchema7 } from 'json-schema';
 
// Define the payload schema
const cancelSchema: JSONSchema7 = {
  type: 'object',
  properties: {
    reason: { type: 'string' },
  },
};
 
// Define the signal with type-checking
const cancelSignal = defineSignal<{ reason?: string }>({
  id: 'workflow.cancel',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Signal to cancel the workflow',
  payloadSchema: cancelSchema,
  handler: async (payload, context) => {
    // Handle the signal
    const { reason } = payload;
 
    // Get current workflow state
    const state = context.getState();
 
    // Update workflow state
    await context.setState({
      ...state,
      status: 'cancelled',
      cancellationReason: reason || 'No reason provided',
    });
 
    // Create a checkpoint for this operation
    await context.createCheckpoint({
      reason: 'Workflow cancelled',
      details: { reason }
    });
 
    // Log the cancellation
    context.log('Workflow cancelled', { reason });
  },
});

Signal Registration

Our signal system provides two main approaches for signal registration:

1. Global Signal Registry

The simplest approach is to register signals with the global registry:

import { registerSignal } from '@zoop/flow/core/signals/handlers';
 
// Register a signal globally
registerSignal(cancelSignal);

This makes the signal available throughout the system.

2. Flow-Specific Signal Registration

For more granular control, you can use flow-specific registration:

import { SignalRegistryImpl } from '@zoop/flow/core/signals/core';
import { SignalManager } from '@zoop/flow/core/signals/manager';
 
// Create a registry specific to your flow
const orderFlowRegistry = new SignalRegistryImpl();
 
// Register signals with this specific registry
orderFlowRegistry.registerSignal(cancelSignal);
orderFlowRegistry.registerSignal(updateAddressSignal);
 
// Create a manager for this registry
const orderFlowSignalManager = new SignalManager(orderFlowRegistry, {
  // Optional configuration
  queueSignalsUntilInitialized: true, 
  middleware: [loggingMiddleware]
});

This approach provides benefits like:

  • Isolation: Signals for one flow don't affect others
  • Context Relevance: Handlers receive context specific to their flow
  • Clearer Boundaries: Explicit association between flows and signals
  • Reduced Naming Conflicts: Same signal name can be used in different flows

Signal Integration with Temporal

For Temporal integration, our signal system provides adapters:

import { 
  registerTemporalFlowControlSignals 
} from '@zoop/flow/core/signals/temporal';
 
// Inside your workflow implementation:
function yourWorkflow() {
  // Setup Temporal workflow context
  const flowId = "your-flow-id";
  const executionId = "your-execution-id";
  
  // Register signals with Temporal
  registerTemporalFlowControlSignals(
    flowId,
    executionId,
    temporalContext
  );
  
  // Rest of your workflow logic...
}

This integration handles:

  • Registering signal handlers with Temporal's API
  • Creating the proper signal context
  • Validating payloads
  • Processing signals through middleware

Signal Context and State Management

The SignalContext provides a rich API for managing state and tracking execution:

interface SignalContext {
  // Readonly properties
  readonly signalId: string;
  readonly flowId: string;
  readonly executionId: string;
  readonly workflowId?: string;
  readonly runId?: string;
  readonly metadata: SignalExecutionMetadata;
  readonly history: SignalHistoryEntry[];
 
  // State management
  getState(): Record<string, unknown>;
  getStateValue<T = unknown>(key: string): T | undefined;
  setState(updates: Record<string, unknown>): Promise<void>;
  setStateValue(key: string, value: unknown): Promise<void>;
 
  // Execution tracking
  log(message: string, metadata?: Record<string, unknown>): void;
  addHistoryEntry(type: 'received' | 'started' | 'checkpoint' | 'completed' | 'failed', 
                  details?: Record<string, unknown>): void;
  createCheckpoint(metadata?: Record<string, unknown>): Promise<string>;
  complete(details?: Record<string, unknown>): Promise<void>;
  fail(reason: string, details?: Record<string, unknown>): Promise<void>;
}

Each signal handler receives this context, allowing it to:

  • Access and update workflow state
  • Track execution progress
  • Create checkpoints for important operations
  • Log relevant information
  • Mark signal processing as complete or failed

## Signal Handling Patterns

### 1. Command Pattern

The command pattern is used for signals that instruct the workflow to perform a specific action.

```tsx
const cancelOrderSignal = defineSignal<{ orderId: string; reason: string }>({
  id: 'order.cancel',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Signal to cancel an in-progress order',
  payloadSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      reason: { type: 'string' },
    },
    required: ['orderId', 'reason'],
  },
  handler: async (payload, context) => {
    // Get the current state
    const state = context.getState();
    
    // Update state with cancellation info
    await context.setState({
      ...state,
      status: 'cancelled',
      cancelledAt: new Date().toISOString(),
      cancellationReason: payload.reason
    });
    
    // Create a checkpoint
    await context.createCheckpoint({
      action: 'cancel',
      orderId: payload.orderId,
      reason: payload.reason
    });
    
    // Log the action
    context.log(`Order ${payload.orderId} cancelled: ${payload.reason}`);
  },
});

2. State Update Pattern

This pattern is used for signals that update a portion of the workflow state without changing execution path.

const updateAddressSignal = defineSignal<{
  orderId: string;
  address: {
    street: string;
    city: string;
    state: string;
    zip: string;
  };
}>({
  id: 'order.updateaddress',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Update the shipping address for an order',
  payloadSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      address: {
        type: 'object',
        properties: {
          street: { type: 'string' },
          city: { type: 'string' },
          state: { type: 'string' },
          zip: { type: 'string' }
        },
        required: ['street', 'city', 'state', 'zip']
      }
    },
    required: ['orderId', 'address']
  },
  handler: async (payload, context) => {
    // Get the current state
    const state = context.getState();
    
    // Update just the shipping address
    await context.setState({
      ...state,
      shippingAddress: payload.address,
      addressUpdatedAt: new Date().toISOString()
    });
    
    // Create a checkpoint for tracking updates
    await context.createCheckpoint({
      type: 'state_update',
      field: 'shippingAddress',
      orderId: payload.orderId
    });
    
    context.log(`Shipping address updated for order ${payload.orderId}`);
  },
});

3. Event Notification Pattern

This pattern is used for signals that inform the workflow of external events.

const paymentProcessedSignal = defineSignal<{
  orderId: string;
  paymentId: string;
  amount: number;
  status: 'succeeded' | 'failed';
  processorResponse?: string;
}>({
  id: 'payment.processed',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Notify workflow that payment processing completed',
  payloadSchema: {
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      paymentId: { type: 'string' },
      amount: { type: 'number' },
      status: { type: 'string', enum: ['succeeded', 'failed'] },
      processorResponse: { type: 'string' }
    },
    required: ['orderId', 'paymentId', 'amount', 'status']
  },
  handler: async (payload, context) => {
    // Get the current state
    const state = context.getState();
    
    // Create state updates based on payment status
    const stateUpdates = {
      ...state,
      paymentStatus: payload.status,
      paymentId: payload.paymentId,
      paymentAmount: payload.amount,
      paymentProcessedAt: new Date().toISOString()
    };
    
    // Handle different payment outcomes
    if (payload.status === 'succeeded') {
      stateUpdates.orderStatus = 'payment_received';
    } else {
      stateUpdates.orderStatus = 'payment_failed';
      stateUpdates.failureReason = payload.processorResponse;
    }
    
    // Update state in one operation
    await context.setState(stateUpdates);
 
    // Create a checkpoint for this important event
    await context.createCheckpoint({
      type: 'payment_notification',
      status: payload.status,
      paymentId: payload.paymentId
    });
 
    context.log(`Payment ${payload.status} for order ${payload.orderId}`);
  },
});

4. Signal Aggregation Pattern

For workflows that need to collect multiple signals before proceeding:

const voteSignal = defineSignal<{ voterId: string; approve: boolean }>({
  id: 'approval.vote',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Record an approval vote',
  payloadSchema: {
    type: 'object',
    properties: {
      voterId: { type: 'string' },
      approve: { type: 'boolean' }
    },
    required: ['voterId', 'approve']
  },
  handler: async (payload, context) => {
    // Get current state
    const state = context.getState();
    
    // Get current votes or initialize an empty object
    const votes = (state.votes as Record<string, boolean>) || {};
 
    // Add new vote
    votes[payload.voterId] = payload.approve;
 
    // Count approvals and rejections
    const results = Object.values(votes).reduce(
      (acc, vote) => {
        if (vote) acc.approvals++;
        else acc.rejections++;
        return acc;
      },
      { approvals: 0, rejections: 0 },
    );
 
    // Update state with all changes at once
    await context.setState({
      ...state,
      votes,
      approvalCount: results.approvals,
      rejectionCount: results.rejections,
      lastVoteAt: new Date().toISOString(),
      lastVoter: payload.voterId
    });
 
    // Create a checkpoint for this vote
    await context.createCheckpoint({
      type: 'vote_recorded',
      voterId: payload.voterId,
      vote: payload.approve,
      currentTally: results
    });
 
    context.log(`Vote received from ${payload.voterId}: ${payload.approve ? 'Approve' : 'Reject'}`);
  },
});

Best Practices

Signal Design

  • Use Correct ID Format: Signal IDs must follow the namespace.name format using only lowercase alphanumeric characters.
    • Two-part structure: namespace.name
    • Only lowercase letters and numbers allowed: [a-z0-9]
    • No hyphens, underscores, or special characters
    • Examples: workflow.cancel, order.update, user.notify
  • Be Descriptive: Use clear, descriptive names that indicate the event or action.
  • Use Past Tense for Events: For signals representing events that have occurred, use past tense (e.g., order.placed).
  • Use Imperative for Commands: For signals representing commands, use imperative form (e.g., workflow.cancel).
  • Keep Payloads Small: Signal payloads should contain just what's needed for the handler to function.
  • Include Identifiers: Always include relevant identifier fields (e.g., orderId, userId).

Signal Implementation

  • Keep Handlers Focused: Handlers should do one thing well.
  • Make Handlers Idempotent: Handlers should produce the same result if called multiple times with the same payload.
  • Handle Errors Gracefully: Catch and handle errors within the handler when possible.
  • Update Workflow State: Update the workflow state properly to reflect the signal's effect.
  • Avoid Long-Running Operations: Keep handlers short and efficient; move long operations to activities.

Error Handling

  • Validate Before Processing: Always validate payloads before processing.
  • Use Middleware for Error Handling: Implement error middleware for centralized error handling.
  • Distinguish Between Error Types: Handle different error types appropriately (e.g., validation errors vs. processing errors).
  • Log Detailed Error Information: Include relevant context in error logs for debugging.
// Example of proper error handling in a signal handler
handler: async (payload, context) => {
  try {
    // Process signal
    await context.setStateValue('someState', payload.someValue);
  } catch (error) {
    // Log the error but don't throw
    context.log(`Error processing signal: ${error.message}`, {
      error: true,
      signalId: 'domain.action',  // Note: In real code, this would follow the namespace.name pattern
      payload,
    });
 
    // Record error in workflow state
    await context.setState({
      lastSignalError: {
        signalId: 'domain.action',
        message: error.message,
        timestamp: new Date().toISOString(),
      }
    });
  }
};

Performance Considerations

  • Monitor Signal Handling Time: Use the performance middleware to track signal processing time.
  • Batch Related State Updates: If a signal requires multiple state updates, batch them when possible.
  • Control Queue Size: Be aware of the impact of queued signals on workflow memory usage.

Versioning and Compatibility

  • Plan for Evolution: Signals might need to evolve over time; plan for versioning.
  • Use the versionedSignal Helper: Apply version markers to signals when making breaking changes.
  • Maintain Backward Compatibility: When possible, design new versions to be backward compatible.
import { versionedSignal } from '@zoop/flow/core/signals/handlers';
 
// Create a new version of an existing signal
const updatedCancelSignal = versionedSignal(cancelSignal, 2, {
  changeId: 'add-cancel-code',
  // New handler that's backward compatible
  handler: async (payload, context) => {
    // Handle with new fields if present
    const { reason, code } = payload as { reason?: string; code?: string };
 
    // Get current state
    const state = context.getState();
 
    // Update state with new field if available
    await context.setState({
      ...state,
      status: 'cancelled',
      cancellationReason: reason || 'No reason provided',
      cancellationCode: code,
    });
 
    context.log('Workflow cancelled', { reason, code });
  },
});

API Reference

For detailed information on the Signal API, including core interfaces, helper functions, middleware, and standard signal handlers, please refer to the Signal API Reference documentation.

Examples

Basic Signal Registration and Usage

// Import necessary dependencies
import { defineSignal, registerSignal } from '@zoop/flow/core/signals/handlers';
import { SignalRegistryImpl } from '@zoop/flow/core/signals/core';
import { SignalManager, createLoggingMiddleware } from '@zoop/flow/core/signals/manager';
 
// Define your signals
const cancelSignal = defineSignal<{ reason?: string }>({
  id: 'order.cancel',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Signal to cancel an order',
  payloadSchema: {
    type: 'object',
    properties: {
      reason: { type: 'string' }
    }
  },
  handler: async (payload, context) => {
    // Signal handling logic
    // ...
  }
});
 
const updateAddressSignal = defineSignal<{ orderId: string; address: ShippingAddress }>({
  id: 'order.updateaddress',  // Using the correct format: lowercase alphanumeric with namespace.name pattern
  description: 'Update the shipping address for an order',
  payloadSchema: {
    // Schema definition
    type: 'object',
    properties: {
      orderId: { type: 'string' },
      address: { type: 'object' }
    },
    required: ['orderId', 'address']
  },
  handler: async (payload, context) => {
    // Signal handling logic
    // ...
  }
});
 
// Option 1: Use the global registry
registerSignal(cancelSignal);
registerSignal(updateAddressSignal);
 
// Option 2: Create a flow-specific registry and manager
const orderFlowRegistry = new SignalRegistryImpl();
orderFlowRegistry.registerSignal(cancelSignal);
orderFlowRegistry.registerSignal(updateAddressSignal);
 
// Create a manager with middleware
const orderSignalManager = new SignalManager(orderFlowRegistry, {
  queueSignalsUntilInitialized: true,
  middleware: [createLoggingMiddleware()]
});
 
// Initialize the manager
orderSignalManager.markInitialized();

Sending Signals to a Workflow

import { Connection, WorkflowClient } from '@temporalio/client';
 
// Connect to Temporal server
const connection = await Connection.connect();
const client = new WorkflowClient({ connection });
 
// Get a handle to the running workflow
const handle = client.getHandle('workflow-id');
 
// Send a signal
await handle.signal('app:order.updateaddress', {  // Note: Using lowercase signal ID
  orderId: 'order-123',
  address: {
    street: '123 Main St',
    city: 'Anytown',
    state: 'CA',
    zip: '12345',
  },
});

Using Signal Timeouts

import { withMiddleware } from '@zoop/flow/core/signals/handlers';
 
// Create a timeout middleware
const timeoutMiddleware = {
  before: async (signalId, payload, context) => {
    // Start a timer
    const startTime = Date.now();
    
    // Store it in context for the after middleware
    await context.setStateValue('__signalStartTime', startTime);
    
    return payload;
  },
  after: async (signalId, payload, context) => {
    // Get the start time
    const startTime = context.getStateValue<number>('__signalStartTime');
    const duration = Date.now() - startTime;
    
    // Log the execution time
    context.log(`Signal ${signalId} took ${duration}ms to process`);
  }
};
 
// Apply the middleware to a signal
const enhancedCancelSignal = {
  ...cancelSignal,
  handler: withMiddleware(cancelSignal.handler, timeoutMiddleware)
};

Signal Filtering

import { createSignalHandler } from '@zoop/flow/core/signals/handlers';
 
// Create a filtered signal handler
const adminOnlyHandler = createSignalHandler<{ reason?: string }>({
  validator: async (payload, context) => {
    // Get current user from context
    const user = context.getStateValue('currentUser');
 
    // Only admins can cancel
    if (!user || user.role !== 'admin') {
      throw new Error('Only admins can cancel workflows');
    }
 
    // Return true to indicate validation passed
    return true;
  },
  handler: async (payload, context) => {
    // Handler implementation
    context.log('Admin cancelled workflow', { reason: payload.reason });
    await context.setState({ status: 'cancelled' });
  }
});

For more complete examples, see the example folder with full workflow implementations incorporating signals.