Executor Service Architecture
The Executor Service is the central component of ZoopFlow's distributed architecture, responsible for managing flow and step definitions, handling registry operations, controlling client access to components, and managing configurations.
System Architecture
The Executor Service implements a modular architecture designed to scale with the needs of the platform while maintaining clear service boundaries and responsibilities.
Component Descriptions
Client Interaction Layer
The Client Interaction Layer handles all external communication with the Executor Service:
- REST API: Exposes RESTful endpoints for client SDKs and programmatic interactions
- GraphQL API: Provides GraphQL schema and resolver implementations for flexible queries
- Authentication & Authorization: Handles JWT/API key authentication and enforces access control
- Request Validation: Validates incoming requests against schemas before processing
// REST API endpoint example
app.post('/api/v1/flows', authenticateRequest, validateRequest, async (req, res) => {
try {
const flowData = req.body;
const organizationId = req.auth.organizationId;
const definitionService = serviceRegistry.getService<DefinitionManagementService>(
'definitionManagement'
);
const flow = await definitionService.createFlow(organizationId, flowData);
res.status(201).json(flow);
} catch (error) {
handleApiError(error, res);
}
});Core Services
The Core Services layer implements the primary business logic of the Executor Service:
Definition Management Service
interface DefinitionManagementService {
// Flow operations
createFlow(organizationId: string, flowData: FlowCreationInput): Promise<Flow>;
updateFlow(flowId: string, flowData: FlowUpdateInput): Promise<Flow>;
getFlow(flowId: string): Promise<Flow>;
listFlows(filters: FlowFilters, pagination: PaginationInput): Promise<PaginatedFlows>;
publishFlow(flowId: string): Promise<Flow>;
deprecateFlow(flowId: string): Promise<Flow>;
// Step operations
createStep(organizationId: string, stepData: StepCreationInput): Promise<Step>;
updateStep(stepId: string, stepData: StepUpdateInput): Promise<Step>;
getStep(stepId: string): Promise<Step>;
listSteps(filters: StepFilters, pagination: PaginationInput): Promise<PaginatedSteps>;
publishStep(stepId: string): Promise<Step>;
deprecateStep(stepId: string): Promise<Step>;
// Definition resolution
resolveFlowDefinition(flowId: string, version?: string): Promise<FlowDefinition>;
resolveStepDefinition(stepId: string, version?: string): Promise<StepDefinition>;
}Execution Control Service
interface ExecutionControlService {
// Flow execution
executeFlow(
flowId: string,
organizationId: string,
inputData: any,
options?: ExecutionOptions,
): Promise<FlowExecution>;
cancelExecution(executionId: string): Promise<FlowExecution>;
getExecution(executionId: string): Promise<FlowExecution>;
listExecutions(
filters: ExecutionFilters,
pagination: PaginationInput,
): Promise<PaginatedExecutions>;
// Step execution monitoring
getStepExecutions(executionId: string): Promise<StepExecution[]>;
getExecutionLogs(executionId: string): Promise<ExecutionLog[]>;
// Human tasks
createHumanTask(stepExecutionId: string, taskDefinition: HumanTaskDefinition): Promise<HumanTask>;
getHumanTask(taskId: string): Promise<HumanTask>;
listHumanTasks(
filters: HumanTaskFilters,
pagination: PaginationInput,
): Promise<PaginatedHumanTasks>;
completeHumanTask(taskId: string, resultData: any): Promise<HumanTask>;
cancelHumanTask(taskId: string): Promise<HumanTask>;
}Store Configuration Service
interface StoreConfigurationService {
// Store configuration
createStoreConfiguration(
organizationId: string,
storeData: StoreConfigInput,
): Promise<StoreConfig>;
updateStoreConfiguration(
storeId: string,
storeData: StoreConfigUpdateInput,
): Promise<StoreConfig>;
getStoreConfiguration(storeId: string): Promise<StoreConfig>;
listStoreConfigurations(organizationId: string, storeType?: string): Promise<StoreConfig[]>;
deleteStoreConfiguration(storeId: string): Promise<void>;
// Connection testing
testStoreConnection(storeId: string): Promise<ConnectionTestResult>;
// Store clients
getStoreClient(storeId: string): Promise<StoreClient>;
}Caching Service
interface CachingService {
// Basic cache operations
get<T>(key: string): Promise<T | null>;
set<T>(key: string, value: T, ttlSeconds?: number): Promise<void>;
delete(key: string): Promise<boolean>;
// Batch operations
mget<T>(keys: string[]): Promise<(T | null)[]>;
mset<T>(entries: Array<{key: string, value: T, ttlSeconds?: number}>): Promise<void>;
// Cache control
flush(): Promise<void>;
invalidatePattern(pattern: string): Promise<number>;
}Temporal Integration
The Temporal Integration layer connects the Executor Service with the Temporal workflow engine:
Temporal Bridge
The Temporal Bridge serves as the primary integration point with Temporal:
interface TemporalBridge {
// Workflow operations
startWorkflow(
workflowType: string,
args: any[],
options: WorkflowOptions,
): Promise<StartWorkflowResult>;
signalWorkflow(
workflowId: string,
runId: string,
signalName: string,
signalArgs: any[],
): Promise<void>;
queryWorkflow(
workflowId: string,
runId: string,
queryType: string,
queryArgs: any[],
): Promise<any>;
cancelWorkflow(workflowId: string, runId: string): Promise<void>;
// Activity operations
completeActivity(taskToken: string, result: any): Promise<void>;
failActivity(taskToken: string, error: Error): Promise<void>;
heartbeatActivity(taskToken: string, details?: any): Promise<void>;
}Data Access Layer
The Data Access Layer provides abstraction for database operations:
Definition Repository
interface DefinitionRepository {
// Flow operations
createFlow(flow: Flow): Promise<Flow>;
updateFlow(flowId: string, updates: Partial<Flow>): Promise<Flow>;
getFlow(flowId: string): Promise<Flow | null>;
getFlowsByFilters(filters: FlowFilters, pagination: PaginationInput): Promise<PaginatedFlows>;
deleteFlow(flowId: string): Promise<boolean>;
// Step operations
createStep(step: Step): Promise<Step>;
updateStep(stepId: string, updates: Partial<Step>): Promise<Step>;
getStep(stepId: string): Promise<Step | null>;
getStepsByFilters(filters: StepFilters, pagination: PaginationInput): Promise<PaginatedSteps>;
deleteStep(stepId: string): Promise<boolean>;
}Execution Repository
interface ExecutionRepository {
// Flow execution operations
createExecution(execution: FlowExecution): Promise<FlowExecution>;
updateExecution(executionId: string, updates: Partial<FlowExecution>): Promise<FlowExecution>;
getExecution(executionId: string): Promise<FlowExecution | null>;
getExecutionsByFilters(filters: ExecutionFilters, pagination: PaginationInput): Promise<PaginatedExecutions>;
// Step execution operations
createStepExecution(stepExecution: StepExecution): Promise<StepExecution>;
updateStepExecution(stepExecutionId: string, updates: Partial<StepExecution>): Promise<StepExecution>;
getStepExecution(stepExecutionId: string): Promise<StepExecution | null>;
getStepExecutionsByFlowExecution(flowExecutionId: string): Promise<StepExecution[]>;
// Execution state operations
updateExecutionState(executionId: string, state: any): Promise<void>;
getExecutionState(executionId: string): Promise<any>;
// Execution status operations
updateExecutionStatus(executionId: string, status: ExecutionStatus): Promise<void>;
recordStepExecution(
executionId: string,
stepId: string,
status: StepExecutionStatus,
metadata?: any
): Promise<void>;
}Execution Runtime
The Execution Runtime handles the actual execution of flows and steps:
Flow Workflow
The primary flow execution workflow pattern:
async function executeFlowWorkflow(
flowId: string,
organizationId: string,
inputData: any,
): Promise<any> {
// 1. Resolve the flow definition
const flowDefinition = await activities.resolveFlowDefinition(flowId, organizationId);
// 2. Initialize the execution context
const context = {
executionId: workflowInfo().workflowId,
flowId,
organizationId,
input: inputData,
state: {},
output: {},
};
// 3. Execute each step in the flow
for (const step of flowDefinition.steps) {
try {
// 4. Resolve step definition
const stepDefinition = await activities.resolveStepDefinition(
step.stepId,
step.version,
organizationId,
);
// 5. Determine execution type
if (stepDefinition.stepType === 'human-task') {
// 6a. Execute human task
const taskResult = await activities.executeHumanTask(
step.id,
context,
stepDefinition,
step.configuration,
);
// Update context with task result
context.state[step.id] = taskResult;
} else {
// 6b. Execute standard step
const stepResult = await activities.executeStep(
step.id,
context,
stepDefinition,
step.configuration,
);
// Update context with step result
context.state[step.id] = stepResult;
}
// 7. Process step output mapping
if (step.outputMapping) {
applyOutputMapping(context, step.outputMapping);
}
} catch (error) {
// 8. Handle step error based on error handling configuration
if (step.errorHandling?.continueOnError) {
context.state[step.id] = { error: error.message };
continue;
}
throw error;
}
}
// 9. Return flow execution result
return context.output;
}Database Schema
The Executor Service uses a relational database schema to store definitions, configurations, and execution metadata:
Core Tables
Organizations
CREATE TABLE Organizations (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
tier VARCHAR(50) NOT NULL, -- 'free', 'standard', 'enterprise'
status VARCHAR(50) NOT NULL, -- 'active', 'suspended', 'trial'
createdAt TIMESTAMP NOT NULL,
updatedAt TIMESTAMP NOT NULL
);Users
CREATE TABLE Users (
id UUID PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
firstName VARCHAR(100),
lastName VARCHAR(100),
role VARCHAR(50) NOT NULL, -- 'admin', 'developer', 'viewer'
organizationId UUID REFERENCES Organizations(id),
status VARCHAR(50) NOT NULL, -- 'active', 'inactive', 'invited'
createdAt TIMESTAMP NOT NULL,
updatedAt TIMESTAMP NOT NULL
);Definition Tables
Flows
CREATE TABLE Flows (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
version VARCHAR(50) NOT NULL,
description TEXT,
definition JSONB NOT NULL, -- The actual flow definition
schemaVersion VARCHAR(50) NOT NULL,
publisherId UUID NOT NULL, -- Organization or system
isPublic BOOLEAN NOT NULL DEFAULT false,
status VARCHAR(50) NOT NULL, -- 'draft', 'published', 'deprecated'
tags JSONB,
createdAt TIMESTAMP NOT NULL,
updatedAt TIMESTAMP NOT NULL,
UNIQUE(name, version, publisherId)
);Steps
CREATE TABLE Steps (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
version VARCHAR(50) NOT NULL,
description TEXT,
definition JSONB NOT NULL, -- The actual step definition
schemaVersion VARCHAR(50) NOT NULL,
publisherId UUID NOT NULL, -- Organization or system
stepType VARCHAR(50) NOT NULL, -- 'standard', 'human-task', 'integration'
isPublic BOOLEAN NOT NULL DEFAULT false,
status VARCHAR(50) NOT NULL, -- 'draft', 'published', 'deprecated'
tags JSONB,
createdAt TIMESTAMP NOT NULL,
updatedAt TIMESTAMP NOT NULL,
UNIQUE(name, version, publisherId)
);Execution Metadata Tables
FlowExecutions
CREATE TABLE FlowExecutions (
id UUID PRIMARY KEY,
flowId UUID NOT NULL REFERENCES Flows(id),
organizationId UUID NOT NULL REFERENCES Organizations(id),
status VARCHAR(50) NOT NULL, -- 'running', 'completed', 'failed', 'canceled'
temporalWorkflowId VARCHAR(255),
temporalRunId VARCHAR(255),
startedAt TIMESTAMP NOT NULL,
completedAt TIMESTAMP,
initiatedBy UUID REFERENCES Users(id),
tags JSONB,
metadata JSONB -- Additional execution metadata
);StepExecutions
CREATE TABLE StepExecutions (
id UUID PRIMARY KEY,
flowExecutionId UUID NOT NULL REFERENCES FlowExecutions(id),
stepId UUID NOT NULL REFERENCES Steps(id),
status VARCHAR(50) NOT NULL, -- 'pending', 'running', 'completed', 'failed'
startedAt TIMESTAMP,
completedAt TIMESTAMP,
retryCount INTEGER NOT NULL DEFAULT 0,
metadata JSONB -- Step-specific execution metadata
);API Endpoints
The Executor Service exposes a comprehensive RESTful API:
Flow Management
GET /api/v1/flows # List flows with filtering
POST /api/v1/flows # Create a new flow
GET /api/v1/flows/:id # Get flow by ID
PUT /api/v1/flows/:id # Update flow
DELETE /api/v1/flows/:id # Delete flow
POST /api/v1/flows/:id/publish # Publish flow version
POST /api/v1/flows/:id/deprecate # Deprecate flow version
POST /api/v1/flows/:id/execute # Execute a flowStep Management
GET /api/v1/steps # List steps with filtering
POST /api/v1/steps # Create a new step
GET /api/v1/steps/:id # Get step by ID
PUT /api/v1/steps/:id # Update step
DELETE /api/v1/steps/:id # Delete step
POST /api/v1/steps/:id/publish # Publish step version
POST /api/v1/steps/:id/deprecate # Deprecate step versionExecution Management
GET /api/v1/executions # List flow executions
GET /api/v1/executions/:id # Get execution details
POST /api/v1/executions/:id/cancel # Cancel execution
GET /api/v1/executions/:id/steps # Get execution step details
GET /api/v1/executions/:id/logs # Get execution logsSecurity Implementation
The Executor Service implements comprehensive security measures:
Authentication
The service implements multiple authentication methods:
-
JWT Authentication
- Used for client SDKs and API access
- Role-based claims with scoped permissions
- Short-lived tokens with refresh capability
-
API Key Authentication
- Used for programmatic access
- Scoped to specific operations
- Rate-limited based on plan tier
Authorization Model
The authorization system uses a role-based access control model with these key roles:
-
Organization Admin
- Manage organization settings
- Control user access
- Configure private registries
- Enable/disable marketplace components
-
Developer
- Create and edit flows and steps
- Configure store connections
- View execution details
- Debug workflow issues
-
Operator
- Monitor executions
- Restart failed flows
- Handle human tasks
- View operational metrics
-
Viewer
- View flows and steps
- View execution status
- View analytics and reports
Data Protection
For sensitive data handling, the service implements:
-
Data Encryption
- All credentials and secrets encrypted at rest
- Encryption key rotation capabilities
- Field-level encryption for sensitive data
-
Isolation
- Strict multi-tenant data isolation
- Request context validation
- Resource ownership verification
Deployment Architecture
Docker-Based Deployment
The Executor Service is containerized for deployment flexibility:
├── API Service
│ ├── REST API Container
│ └── GraphQL API Container
├── Core Services
│ ├── Definition Management Container
│ ├── Registry Management Container
│ ├── Execution Control Container
│ └── Store Configuration Container
├── Databases
│ ├── Primary Database Container
│ └── Redis Cache Container
└── Supporting Services
├── Authentication Service Container
├── Notification Service Container
└── Scheduler ContainerKubernetes Deployment
For production Kubernetes deployment, the service uses:
-
Horizontal Pod Autoscaling
- Scale API and service pods based on CPU/memory
- Traffic-based scaling for API layer
- Queue-based scaling for workers
-
Service Mesh
- Istio-based service mesh for traffic management
- Circuit breaking and retry policies
- Distributed tracing integration
-
Secret Management
- Kubernetes secrets for credentials
- External secret store integration (HashiCorp Vault)
- Automated secret rotation
Monitoring and Observability
Metrics Collection
The service exports metrics for:
-
Service Health
- API request rates and latencies
- Error rates and types
- Resource utilization
-
Business Metrics
- Flow execution counts
- Completion rates and times
- Step performance statistics
- Human task completion time
-
Resource Utilization
- Database connection pool usage
- Cache hit/miss rates
- Worker utilization
Implementation Roadmap
The Executor Service will be implemented in phases:
Phase 1: Foundation
- Core API development
- Database schema implementation
- Basic CRUD operations for definitions
- Authentication and authorization framework
Phase 2: Execution
- Temporal integration
- Flow execution engine
- Step execution handling
- Error handling and retries
Phase 3: Features
- Human task integration
- Registry management
- Monitoring and observability
- Performance optimizations
Phase 4: Enterprise
- Advanced security features
- Multi-region deployment
- High availability configurations
- Enterprise integrations
Conclusion
The Executor Service is the central component in ZoopFlow's distributed execution architecture, managing definitions, orchestrating executions, and coordinating with client environments. This technical specification provides the foundation for implementing a robust, scalable service that meets the needs of diverse enterprise clients while maintaining security, performance, and reliability.