Error Handling in ZoopFlow
Overview
Error handling is a critical aspect of the ZoopFlow system, providing robust mechanisms for error detection, classification, recovery strategies, propagation, and compensation logic for failed workflows. This comprehensive guide outlines the error handling framework implemented in ZoopFlow, detailing its architecture, components, and best practices.
The error handling system is designed to:
- Provide a standardized classification system for consistent error handling
- Enable proper error propagation through the flow execution chain
- Support customizable recovery strategies based on error types
- Facilitate compensation logic for undoing previous operations
- Integrate with observability systems for monitoring and debugging
- Empower developers to implement domain-specific error handling
- Ensure consistent error handling across both core and Temporal environments through a unified interface
Core Error Types
ZoopFlow provides a rich hierarchy of error types extending from the base FlowError class:
export class FlowError extends Error {
public readonly errorCode: string;
public readonly severity: ErrorSeverity;
public readonly category: ErrorCategory;
public readonly context: ErrorContext;
public readonly cause?: Error;
public readonly recoverable: boolean;
// Determines if retry is appropriate
get isRetryable(): boolean {
return this.category === ErrorCategory.TRANSIENT;
}
}Specialized Error Types
ZoopFlow provides several specialized error types for common scenarios:
| Error Type | Purpose | Common Usage |
|---|---|---|
FlowValidationError | Data and schema validation failures | Input validation, schema compliance |
FlowExecutionError | Runtime execution failures | Step execution errors |
FlowTimeoutError | Timeout-related failures | Long-running operations |
FlowCancellationError | Deliberately cancelled operations | User-initiated cancellations |
FlowStateError | Invalid state transitions | Workflow state management |
FlowDependencyError | External service failures | 3rd-party API errors |
FlowPermissionError | Authorization failures | Permission checks |
ValidationError | Validation-specific issues | Input validation |
AuthorizationError | Permission-specific issues | Access control |
ThrottlingError | Rate limiting issues | API throttling |
BusinessRuleError | Business rule violations | Domain logic enforcement |
DependencyError | External dependency failures | Service dependencies |
Example Implementation
// Validation errors
class ValidationError extends FlowError {
constructor(message: string, validationIssues: any[]) {
super(message, "VALIDATION_ERROR", ErrorCategory.VALIDATION, {
issues: validationIssues,
});
this.name = "ValidationError";
}
}
// Permission errors
class AuthorizationError extends FlowError {
constructor(message: string, resource?: string, action?: string) {
super(message, "AUTHORIZATION_ERROR", ErrorCategory.PERMISSION, {
resource,
action,
});
this.name = "AuthorizationError";
}
}
// Rate limit/throttling errors
class ThrottlingError extends FlowError {
constructor(message: string, retryAfterSeconds: number) {
super(message, "THROTTLING_ERROR", ErrorCategory.TRANSIENT, { retryAfterSeconds });
this.name = "ThrottlingError";
}
}Error Classification
Errors in ZoopFlow are classified along two dimensions: category and severity.
Error Categories
Categories help determine the appropriate handling strategy:
export enum ErrorCategory {
VALIDATION = 'validation', // Input/output validation failures
BUSINESS_RULE = 'business_rule', // Business rule violations
SYSTEM = 'system', // Internal system errors
TIMEOUT = 'timeout', // Operations that exceeded time limits
CANCELLATION = 'cancellation', // Deliberately cancelled operations
STATE = 'state', // Invalid state transitions
DEPENDENCY = 'dependency', // External service failures
PERMISSION = 'permission', // Authorization/access issues
IO = 'io', // Input/output errors
TRANSIENT = 'transient', // Temporary failures that may succeed on retry
PERMANENT = 'permanent', // Permanent failures that won't succeed on retry
OTHER = 'other', // Unclassified errors
}Severity Levels
Severity enables prioritization of errors based on their impact:
export enum ErrorSeverity {
INFO = 'info', // Informational, not critical
WARNING = 'warning', // Potential issue, but not fatal
ERROR = 'error', // Significant error requiring attention
CRITICAL = 'critical', // Critical error affecting system operation
}Recovery Types
Different recovery strategies are available based on error characteristics:
export enum RecoveryStrategy {
RETRY = 'retry', // Retry the failed operation
SKIP = 'skip', // Skip the failed operation and continue
ALTERNATIVE_PATH = 'alternative_path', // Execute an alternative path
COMPENSATE = 'compensate', // Execute compensation logic
FAIL = 'fail', // Fail the workflow
}Error Context
All errors carry contextual information through the ErrorContext interface, allowing for detailed debugging and tracing:
export interface ErrorContext {
flowId?: string; // Identifier for the flow
executionId?: string; // Execution identifier
stepId?: string; // Step where error occurred
timestamp?: Date; // When the error occurred
handled?: boolean; // Whether error has been handled
flowMetadata?: Record<string, unknown>; // Flow-specific metadata
input?: Record<string, unknown>; // Input that caused the error
retryCount?: number; // Number of retry attempts
[key: string]: unknown; // Extensible properties
}This context provides:
- Traceability to specific flows, executions, and steps
- Relevant metadata for debugging
- Status indicators for error handling
- Extensibility for domain-specific context
Error Propagation
Error propagation is the process by which errors move through the system, providing opportunities for handling at different levels.
From Steps to Flows
When a step fails, the error propagates to the flow:
// Inside a flow
try {
const result = await processPaymentStep.execute(input);
// Continue with success path
} catch (error) {
// Handle error at flow level
context.log("Payment processing failed", { error });
// Options for handling errors:
// 1. Rethrow (fail the workflow)
throw error;
// 2. Return failure result
return { status: "failed", error: error.message };
// 3. Implement compensating actions
await refundStep.execute({ transactionId: input.transactionId });
// 4. Retry with different parameters
return await fallbackPaymentStep.execute({
...input,
useBackupProvider: true,
});
}Error Enrichment
Errors can be enriched with context as they propagate:
try {
return await paymentStep.execute(input);
} catch (error) {
// Transform generic errors to domain-specific ones
if (error.name === "ApiError" && error.status === 429) {
throw new ThrottlingError(
"Payment API rate limit exceeded",
parseInt(error.headers["retry-after"] || "60")
);
}
// Enrich with context
throw new FlowError(
`Payment processing failed for order ${input.orderId}`,
"PAYMENT_ERROR",
ErrorCategory.BUSINESS_RULE,
{ orderId: input.orderId },
error // Original error as cause
);
}Error Handler Registry
ZoopFlow provides a central registry for error handlers:
- Global handlers for all errors
- Flow-specific handlers
- Error type-specific handlers
Handlers are organized by specificity, with the most specific handlers invoked first.
Error Handling Patterns
ZoopFlow supports several error handling patterns to address different requirements.
1. Retry Pattern
For handling transient errors that may resolve on subsequent attempts:
const processPayment = defineStep({
// ... other configuration
retry_config: {
max_attempts: 3,
backoff_strategy: "exponential",
initial_delay_seconds: 2,
// Custom retry predicate
should_retry: (error) => {
// Only retry transient errors
if (error instanceof FlowError) {
return error.isRetryable;
}
// Default behavior for other errors - retry network errors
if (error.name === "NetworkError") {
return true;
}
// Don't retry other errors
return false;
},
},
});2. Saga Pattern (Compensation)
For multi-step processes that need compensation:
const orderProcessingFlow = defineFlow({
// ...
steps: async (input, context) => {
// Track completed steps for compensation
const completed = [];
try {
// Step 1: Reserve inventory
const inventory = await reserveInventoryStep.execute(input);
completed.push({ step: "inventory", data: inventory });
// Step 2: Process payment
const payment = await processPaymentStep.execute({
...input,
inventoryId: inventory.reservationId,
});
completed.push({ step: "payment", data: payment });
// Step 3: Create shipment
const shipment = await createShipmentStep.execute({
...input,
paymentId: payment.transactionId,
});
return { status: "success", shipmentId: shipment.id };
} catch (error) {
// Compensation actions in reverse order
for (const step of completed.reverse()) {
try {
if (step.step === "payment") {
await refundPaymentStep.execute({
transactionId: step.data.transactionId,
});
} else if (step.step === "inventory") {
await releaseInventoryStep.execute({
reservationId: step.data.reservationId,
});
}
} catch (compensationError) {
// Log compensation error but continue with other compensations
context.log("Compensation failed", {
step: step.step,
error: compensationError,
});
}
}
// Return failure result
return {
status: "failed",
reason: error instanceof FlowError ? error.code : "UNKNOWN_ERROR",
message: error.message,
};
}
},
});3. Circuit Breaker Pattern
For preventing cascading failures:
class CircuitBreaker {
private failures = 0;
private lastFailure: number = 0;
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
constructor(
private threshold: number = 5,
private resetTimeoutMs: number = 30000
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === "OPEN") {
// Check if enough time has passed to try again
if (Date.now() - this.lastFailure > this.resetTimeoutMs) {
this.state = "HALF_OPEN";
} else {
throw new FlowError(
"Circuit breaker is open",
"CIRCUIT_OPEN",
ErrorCategory.TRANSIENT
);
}
}
try {
const result = await fn();
// Reset on success if half-open
if (this.state === "HALF_OPEN") {
this.reset();
}
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
private recordFailure() {
this.failures++;
this.lastFailure = Date.now();
if (this.failures >= this.threshold) {
this.state = "OPEN";
}
}
private reset() {
this.failures = 0;
this.state = "CLOSED";
}
}4. Fallback Pattern
For graceful degradation:
const getProductDetails = defineStep({
// ...
execute: async (input, context) => {
try {
// Try primary data source
return await primaryDataSource.getProduct(input.productId);
} catch (error) {
// Log the failure
context.log("Primary data source failed, using fallback", { error });
// Use fallback data source
try {
return await fallbackDataSource.getProduct(input.productId);
} catch (fallbackError) {
// Both primary and fallback failed
throw new DependencyError(
"Failed to retrieve product details from all sources",
"product-data-service",
false // Not retryable
);
}
}
},
});5. Flow-Try-Catch System
Inspired by traditional try-catch but adapted for asynchronous flows:
// Example usage
const result = await flowTryCatch(async () => {
// Flow operations that might fail
return processData(input);
})
.catch(FlowValidationError, async error => {
// Handle validation errors
return defaultValue;
})
.catch(FlowDependencyError, async error => {
// Handle external service failures
return cachedResult;
})
.finally(async () => {
// Cleanup operations
});6. Alternative Path Strategy
For executing an alternative flow when the primary path fails:
// Define flow with alternative path
const flowWithAlternativePath = {
id: 'order-processing',
version: '1.0.0',
nodes: [
{
id: 'process-credit-card',
type: 'payment-processor',
input: {
method: 'credit-card',
},
options: {
errorHandling: {
strategy: RecoveryStrategy.ALTERNATIVE_PATH,
alternativeNodeId: 'process-paypal',
},
},
},
{
id: 'process-paypal',
type: 'payment-processor',
input: {
method: 'paypal',
},
},
{
id: 'fulfill-order',
type: 'order-fulfillment',
},
],
edges: [
{
source: 'process-credit-card',
target: 'fulfill-order',
},
{
source: 'process-paypal',
target: 'fulfill-order',
},
],
};Unified Error Handling Interface
ZoopFlow provides a standardized error handling interface that works consistently across both the core system and Temporal integration. This unified approach ensures consistent error handling regardless of whether flows are executed directly or through Temporal.
Core Components
The unified error handling system includes:
/**
* Unified error context interface combining both core and temporal contexts
*/
export interface UnifiedErrorContext {
// Flow context
flowId?: string;
executionId?: string;
stepId?: string;
// Temporal context
nodeId?: string;
workflowId?: string;
runId?: string;
activityId?: string;
// State information
state?: Record<string, unknown>;
context?: Record<string, unknown>;
// Error information
originalError?: Error;
cause?: Error | unknown;
// Additional metadata
[key: string]: unknown;
}
/**
* Result of error handling
*/
export interface ErrorHandlingResult {
handled: boolean;
strategy: RecoveryStrategy;
continueExecution: boolean;
error?: Error;
context?: Record<string, unknown>;
}
/**
* Interface for unified error handler
*/
export interface UnifiedErrorHandler {
handleError(error: unknown, context?: UnifiedErrorContext): Promise<ErrorHandlingResult>;
}Adapter System
The error handling system includes adapters that convert between core and Temporal error types:
- CoreErrorHandlerAdapter: Adapts the core error handler registry to use the unified interface
- TemporalErrorHandlerAdapter: Adapts the Temporal error handler to use the unified interface
- TemporalErrorAdapter: Provides utilities for converting between error types
Example Usage
// Create a unified error handler - works with both core and Temporal
const errorHandler = UnifiedErrorHandlerFactory.createHandler();
try {
// Execute operation
await executeOperation();
} catch (error) {
// Handle error using unified interface
const result = await errorHandler.handleError(error, {
flowId: 'order-processing-flow',
stepId: 'payment-processing',
state: getCurrentState()
});
if (result.handled) {
// Error was handled
if (result.continueExecution) {
// Continue execution
await continueOperation(result.context);
} else {
// Stop execution
await cleanupResources(result.context);
}
} else {
// Error was not handled, propagate it
throw result.error || error;
}
}Error Metrics System
ZoopFlow includes a comprehensive metrics system for tracking and analyzing errors:
Key Components
ErrorMetricsManager: Central class for collecting and analyzing error metricsErrorMetric: Structured representation of an error occurrenceErrorOccurrence: Internal representation with full error informationCategoryMetrics: Aggregated metrics for a specific error categoryErrorMetricsSnapshot: Point-in-time snapshot of all metrics
Configuration Options
export interface ErrorMetricsOptions {
enabled?: boolean; // Enable/disable metrics collection
logMetrics?: boolean; // Automatically log metrics
maxOccurrences?: number; // Max stored occurrences
includeFlowMetadata?: boolean; // Include flow metadata in metrics
customHandler?: (metric: ErrorMetric) => void | Promise<void>; // Custom handler
customHandlers?: ErrorMetricHandler[]; // Multiple custom handlers
}Usage Example
// Create a metrics manager with custom options
const metricsManager = new ErrorMetricsManager({
logMetrics: true,
includeFlowMetadata: true,
});
try {
// Flow execution...
} catch (error) {
// Record the error with context
await metricsManager.recordError(
error instanceof FlowError ? error : new FlowExecutionError(String(error)),
flowContext,
false, // unhandled
);
}
// Get metrics
const metrics = metricsManager.getMetrics();
const categoryStats = metricsManager.getErrorCountByCategory();
const report = metricsManager.generateReport();Best Practices
1. Use Specific Error Types
Choose the most specific error type that describes your failure scenario:
// ❌ Too generic
throw new Error("Payment failed");
// ✅ Better - provides context and classification
throw new FlowDependencyError(
"Payment processor service unavailable",
"payment-service",
true // retryable
);2. Enrich with Context
Always provide relevant context that will help with debugging:
// ❌ Limited context
throw new FlowError("Validation failed");
// ✅ Better - includes validation details
throw new FlowValidationError(
"Order validation failed",
validationErrors,
{ orderId: input.orderId }
);3. Design for Recoverability
Set the recoverable flag appropriately to indicate whether an error can be handled:
// Recoverable error - system will attempt recovery
throw new FlowDependencyError(
"Temporary database connection issue",
"database",
true, // recoverable
{ dbInstance: config.dbInstance }
);
// Non-recoverable error - requires manual intervention
throw new FlowDependencyError(
"Database corruption detected",
"database",
false, // not recoverable
{ dbInstance: config.dbInstance, corruptedTables: ['users', 'orders'] }
);4. Use the Unified Error Interface
Leverage the unified error handling interface for consistent error handling across environments:
// ❌ Inconsistent error handling
try {
await executeOperation();
} catch (error) {
if (error instanceof FlowError) {
// Core-specific logic
} else if (error instanceof WorkflowError) {
// Temporal-specific logic
}
}
// ✅ Better - consistent error handling using unified interface
const errorHandler = UnifiedErrorHandlerFactory.createHandler();
try {
await executeOperation();
} catch (error) {
const result = await errorHandler.handleError(error, context);
// Handle based on result, not error type
}5. Implement Appropriate Retry Logic
Use retry policies based on error categories:
// Configure retries for steps that might experience transient failures
const processPayment = defineStep({
// ... other configuration
retry_config: {
max_attempts: 3,
backoff_strategy: "exponential",
initial_delay_seconds: 2,
should_retry: (error) => error instanceof FlowError && error.isRetryable,
},
});6. Use Compensation for Distributed Transactions
Implement compensating actions for multi-step processes:
// Define flow with compensation
const flowWithCompensation = {
id: 'order-processing',
version: '1.0.0',
nodes: [
{
id: 'reserve-inventory',
type: 'inventory-service',
},
{
id: 'process-payment',
type: 'payment-processor',
options: {
errorHandling: {
strategy: RecoveryStrategy.COMPENSATE,
compensationNodeId: 'release-inventory',
},
},
},
{
id: 'release-inventory',
type: 'inventory-service',
input: {
action: 'release',
},
},
{
id: 'ship-order',
type: 'shipping-service',
},
],
edges: [
{
source: 'reserve-inventory',
target: 'process-payment',
},
{
source: 'process-payment',
target: 'ship-order',
},
],
};7. Preserve Error Context during Transitions
Ensure that error context is preserved when transitioning between systems:
// ❌ Context lost during error conversion
try {
await workflowClient.execute(workflowId, input);
} catch (error) {
// Temporal-specific error with context lost
throw new FlowError("Workflow execution failed");
}
// ✅ Better - context preserved during conversion
try {
await workflowClient.execute(workflowId, input);
} catch (error) {
// Use adapter to convert error while preserving context
throw TemporalErrorAdapter.toFlowError(error);
}8. Monitor Error Metrics
Regularly review error metrics to identify recurring issues:
// Generate a daily error report
const dailyErrorReport = metricsManager.generateReport();
logger.info('Daily error summary', { report: dailyErrorReport });
// Check for critical errors
const criticalErrors = metricsManager.getErrorsBySeverity(ErrorSeverity.CRITICAL);
if (criticalErrors.length > 0) {
alertSystem.notify('Critical errors detected', { errors: criticalErrors });
}9. Test Error Paths
Include specific tests for error paths to ensure proper handling:
// Example test for error handling
it('should handle payment failure with compensation', async () => {
// Setup: Mock payment to fail
paymentServiceMock.processPayment.mockRejectedValue(
new DependencyError('Payment service unavailable', 'payment-api')
);
// Execute flow
const result = await orderProcessingFlow.execute(testInput);
// Verify: Check compensation was triggered
expect(inventoryServiceMock.releaseReservation).toHaveBeenCalledWith({
reservationId: expect.any(String)
});
// Verify: Check result indicates failure
expect(result.status).toBe('failed');
expect(result.reason).toBe('DEPENDENCY_ERROR');
});10. Document Error Codes
Maintain documentation of all error codes used in the system to aid in troubleshooting:
/**
* @errorCode PAYMENT_PROCESSOR_UNAVAILABLE
* @errorCategory DEPENDENCY
* @severity ERROR
* @description The payment processing service is temporarily unavailable
* @recovery Automatically retried up to 3 times with exponential backoff
* @recommendation Check payment processor status and validate your API credentials
*/API Reference
Core Error Classes
FlowError
class FlowError extends Error {
constructor(
message: string,
public readonly errorCode: string,
public readonly category: ErrorCategory,
public readonly context?: ErrorContext,
public readonly cause?: Error,
public readonly recoverable: boolean = false,
public readonly severity: ErrorSeverity = ErrorSeverity.ERROR
);
// Helper to determine if retry is appropriate
get isRetryable(): boolean;
// Add additional context information
withContext(additionalContext: Partial<ErrorContext>): this;
// Compare with another error
equals(other: FlowError): boolean;
}Error Handler
interface ErrorHandler {
// Handle a specific error
handleError(error: FlowError, context: ErrorContext): Promise<ErrorHandlingResult>;
// Check if handler can handle this error
canHandle(error: FlowError): boolean;
}
// Error handler registration
errorHandlerRegistry.register(
FlowDependencyError,
async (error, context) => {
// Custom handling logic
return {
handled: true,
result: fallbackValue
};
}
);Unified Error Handler
// The unified error handler interface
interface UnifiedErrorHandler {
// Handle any type of error with a unified context
handleError(error: unknown, context?: UnifiedErrorContext): Promise<ErrorHandlingResult>;
}
// Core error handler adapter
class CoreErrorHandlerAdapter implements UnifiedErrorHandler {
constructor(registry?: ErrorHandlerRegistry);
handleError(error: unknown, context?: UnifiedErrorContext): Promise<ErrorHandlingResult>;
}
// Temporal error handler adapter
class TemporalErrorHandlerAdapter implements UnifiedErrorHandler {
constructor(config?: Partial<ErrorHandlerConfig>);
handleError(error: unknown, context?: UnifiedErrorContext): Promise<ErrorHandlingResult>;
registerCompensation(nodeId: string, compensate: () => Promise<void>, priority?: number): void;
}
// Factory for creating handlers
class UnifiedErrorHandlerFactory {
static createHandler(registry?: ErrorHandlerRegistry): UnifiedErrorHandler;
}
// Factory for Temporal handlers
class TemporalUnifiedErrorHandlerFactory {
static createHandler(config?: Partial<ErrorHandlerConfig>): UnifiedErrorHandler;
}
// Error adapter for converting between systems
class TemporalErrorAdapter {
static toFlowError(error: WorkflowError): FlowError;
static toWorkflowError(error: FlowError): WorkflowError;
static errorToWorkflowError(error: unknown, context?: TemporalErrorContext): WorkflowError;
static mapErrorTypeToCategory(type: ErrorType): ErrorCategory;
static mapCategoryToErrorType(category: ErrorCategory): ErrorType;
static mapTemporalToCorClassification(classification: TemporalClassification): CoreClassification;
static mapCoreToTemporalClassification(classification: CoreClassification): TemporalClassification;
static mapCoreToTemporalStrategy(strategy: CoreStrategy): TemporalStrategy;
static mapTemporalToCoreStrategy(strategy: TemporalStrategy): CoreStrategy;
static convertTemporalErrorContext(context: TemporalErrorContext): UnifiedErrorContext;
static convertFlowErrorContext(context: UnifiedErrorContext): TemporalErrorContext;
static convertToTemporalResult(result: ErrorHandlingResult): ErrorHandlingResult;
}Retry Configuration
interface RetryPolicy {
// Maximum number of retry attempts
maximumAttempts?: number;
// Initial interval between retries
initialInterval?: string;
// How fast the retry interval increases
backoffCoefficient?: number;
// Maximum interval between retries
maximumInterval?: string;
// Total maximum time for all retries
maximumTime?: string;
// Errors that should not be retried
nonRetryableErrors?: string[];
// Error types that should not be retried
nonRetryableErrorTypes?: ErrorType[];
}Error Metrics Manager
class ErrorMetricsManager {
constructor(options?: ErrorMetricsOptions);
// Record an error occurrence
async recordError(
error: FlowError,
context: ErrorContext | any,
handled?: boolean
): Promise<void>;
// Get all collected metrics
getMetrics(): ErrorMetric[];
// Get metrics for a specific category
getCategoryMetrics(category: string): CategoryMetrics;
// Get error counts by category
getErrorCountByCategory(): Record<string, number>;
// Get error counts by severity
getErrorCountBySeverity(): Record<string, number>;
// Get error counts by flow
getErrorCountByFlow(): Record<string, number>;
// Get handled vs unhandled counts
getHandledVsUnhandledCounts(): { handled: number, unhandled: number };
// Get occurrences by error code
getOccurrencesByCode(code: string): ErrorOccurrence[];
// Get occurrences by category
getOccurrencesByCategory(category: string): ErrorOccurrence[];
// Create a snapshot of all metrics
getSnapshot(): ErrorMetricsSnapshot;
// Generate a human-readable report
generateReport(): string;
// Clear all metrics
clear(): void;
}Flow Try-Catch
function flowTryCatch<T>(fn: () => Promise<T>): FlowTryCatchChain<T>;
interface FlowTryCatchChain<T> {
// Catch a specific error type
catch<E extends FlowError>(
errorType: Constructor<E>,
handler: (error: E) => Promise<T>
): FlowTryCatchChain<T>;
// Finally block that always executes
finally(fn: () => Promise<void>): Promise<T>;
}This comprehensive error handling guide consolidates information from multiple sources and provides a structured approach to managing errors in the ZoopFlow system. By following these patterns and best practices, developers can build robust and resilient workflows that gracefully handle failure scenarios.