Core Concepts
Executor Service

Executor Service Architecture

The Executor Service is the central component of ZoopFlow's distributed architecture, responsible for managing flow and step definitions, handling registry operations, controlling client access to components, and managing configurations.

System Architecture

The Executor Service implements a modular architecture designed to scale with the needs of the platform while maintaining clear service boundaries and responsibilities.

Component Descriptions

Client Interaction Layer

The Client Interaction Layer handles all external communication with the Executor Service:

  • REST API: Exposes RESTful endpoints for client SDKs and programmatic interactions
  • GraphQL API: Provides GraphQL schema and resolver implementations for flexible queries
  • Authentication & Authorization: Handles JWT/API key authentication and enforces access control
  • Request Validation: Validates incoming requests against schemas before processing
// REST API endpoint example
app.post('/api/v1/flows', authenticateRequest, validateRequest, async (req, res) => {
  try {
    const flowData = req.body;
    const organizationId = req.auth.organizationId;
    
    const definitionService = serviceRegistry.getService<DefinitionManagementService>(
      'definitionManagement'
    );
    
    const flow = await definitionService.createFlow(organizationId, flowData);
    
    res.status(201).json(flow);
  } catch (error) {
    handleApiError(error, res);
  }
});

Core Services

The Core Services layer implements the primary business logic of the Executor Service:

Definition Management Service

interface DefinitionManagementService {
  // Flow operations
  createFlow(organizationId: string, flowData: FlowCreationInput): Promise<Flow>;
  updateFlow(flowId: string, flowData: FlowUpdateInput): Promise<Flow>;
  getFlow(flowId: string): Promise<Flow>;
  listFlows(filters: FlowFilters, pagination: PaginationInput): Promise<PaginatedFlows>;
  publishFlow(flowId: string): Promise<Flow>;
  deprecateFlow(flowId: string): Promise<Flow>;
  
  // Step operations
  createStep(organizationId: string, stepData: StepCreationInput): Promise<Step>;
  updateStep(stepId: string, stepData: StepUpdateInput): Promise<Step>;
  getStep(stepId: string): Promise<Step>;
  listSteps(filters: StepFilters, pagination: PaginationInput): Promise<PaginatedSteps>;
  publishStep(stepId: string): Promise<Step>;
  deprecateStep(stepId: string): Promise<Step>;
  
  // Definition resolution
  resolveFlowDefinition(flowId: string, version?: string): Promise<FlowDefinition>;
  resolveStepDefinition(stepId: string, version?: string): Promise<StepDefinition>;
}

Execution Control Service

interface ExecutionControlService {
  // Flow execution
  executeFlow(
    flowId: string,
    organizationId: string,
    inputData: any,
    options?: ExecutionOptions,
  ): Promise<FlowExecution>;
 
  cancelExecution(executionId: string): Promise<FlowExecution>;
  getExecution(executionId: string): Promise<FlowExecution>;
  listExecutions(
    filters: ExecutionFilters,
    pagination: PaginationInput,
  ): Promise<PaginatedExecutions>;
 
  // Step execution monitoring
  getStepExecutions(executionId: string): Promise<StepExecution[]>;
  getExecutionLogs(executionId: string): Promise<ExecutionLog[]>;
 
  // Human tasks
  createHumanTask(stepExecutionId: string, taskDefinition: HumanTaskDefinition): Promise<HumanTask>;
  getHumanTask(taskId: string): Promise<HumanTask>;
  listHumanTasks(
    filters: HumanTaskFilters,
    pagination: PaginationInput,
  ): Promise<PaginatedHumanTasks>;
  
  completeHumanTask(taskId: string, resultData: any): Promise<HumanTask>;
  cancelHumanTask(taskId: string): Promise<HumanTask>;
}

Store Configuration Service

interface StoreConfigurationService {
  // Store configuration
  createStoreConfiguration(
    organizationId: string,
    storeData: StoreConfigInput,
  ): Promise<StoreConfig>;
  
  updateStoreConfiguration(
    storeId: string,
    storeData: StoreConfigUpdateInput,
  ): Promise<StoreConfig>;
  
  getStoreConfiguration(storeId: string): Promise<StoreConfig>;
  
  listStoreConfigurations(organizationId: string, storeType?: string): Promise<StoreConfig[]>;
  
  deleteStoreConfiguration(storeId: string): Promise<void>;
  
  // Connection testing
  testStoreConnection(storeId: string): Promise<ConnectionTestResult>;
  
  // Store clients
  getStoreClient(storeId: string): Promise<StoreClient>;
}

Caching Service

interface CachingService {
  // Basic cache operations
  get<T>(key: string): Promise<T | null>;
  set<T>(key: string, value: T, ttlSeconds?: number): Promise<void>;
  delete(key: string): Promise<boolean>;
  
  // Batch operations
  mget<T>(keys: string[]): Promise<(T | null)[]>;
  mset<T>(entries: Array<{key: string, value: T, ttlSeconds?: number}>): Promise<void>;
  
  // Cache control
  flush(): Promise<void>;
  invalidatePattern(pattern: string): Promise<number>;
}

Temporal Integration

The Temporal Integration layer connects the Executor Service with the Temporal workflow engine:

Temporal Bridge

The Temporal Bridge serves as the primary integration point with Temporal:

interface TemporalBridge {
  // Workflow operations
  startWorkflow(
    workflowType: string,
    args: any[],
    options: WorkflowOptions,
  ): Promise<StartWorkflowResult>;
 
  signalWorkflow(
    workflowId: string,
    runId: string,
    signalName: string,
    signalArgs: any[],
  ): Promise<void>;
 
  queryWorkflow(
    workflowId: string,
    runId: string,
    queryType: string,
    queryArgs: any[],
  ): Promise<any>;
 
  cancelWorkflow(workflowId: string, runId: string): Promise<void>;
 
  // Activity operations
  completeActivity(taskToken: string, result: any): Promise<void>;
  failActivity(taskToken: string, error: Error): Promise<void>;
  heartbeatActivity(taskToken: string, details?: any): Promise<void>;
}

Data Access Layer

The Data Access Layer provides abstraction for database operations:

Definition Repository

interface DefinitionRepository {
  // Flow operations
  createFlow(flow: Flow): Promise<Flow>;
  updateFlow(flowId: string, updates: Partial<Flow>): Promise<Flow>;
  getFlow(flowId: string): Promise<Flow | null>;
  getFlowsByFilters(filters: FlowFilters, pagination: PaginationInput): Promise<PaginatedFlows>;
  deleteFlow(flowId: string): Promise<boolean>;
  
  // Step operations
  createStep(step: Step): Promise<Step>;
  updateStep(stepId: string, updates: Partial<Step>): Promise<Step>;
  getStep(stepId: string): Promise<Step | null>;
  getStepsByFilters(filters: StepFilters, pagination: PaginationInput): Promise<PaginatedSteps>;
  deleteStep(stepId: string): Promise<boolean>;
}

Execution Repository

interface ExecutionRepository {
  // Flow execution operations
  createExecution(execution: FlowExecution): Promise<FlowExecution>;
  updateExecution(executionId: string, updates: Partial<FlowExecution>): Promise<FlowExecution>;
  getExecution(executionId: string): Promise<FlowExecution | null>;
  getExecutionsByFilters(filters: ExecutionFilters, pagination: PaginationInput): Promise<PaginatedExecutions>;
  
  // Step execution operations
  createStepExecution(stepExecution: StepExecution): Promise<StepExecution>;
  updateStepExecution(stepExecutionId: string, updates: Partial<StepExecution>): Promise<StepExecution>;
  getStepExecution(stepExecutionId: string): Promise<StepExecution | null>;
  getStepExecutionsByFlowExecution(flowExecutionId: string): Promise<StepExecution[]>;
  
  // Execution state operations
  updateExecutionState(executionId: string, state: any): Promise<void>;
  getExecutionState(executionId: string): Promise<any>;
  
  // Execution status operations
  updateExecutionStatus(executionId: string, status: ExecutionStatus): Promise<void>;
  recordStepExecution(
    executionId: string, 
    stepId: string, 
    status: StepExecutionStatus, 
    metadata?: any
  ): Promise<void>;
}

Execution Runtime

The Execution Runtime handles the actual execution of flows and steps:

Flow Workflow

The primary flow execution workflow pattern:

async function executeFlowWorkflow(
  flowId: string,
  organizationId: string,
  inputData: any,
): Promise<any> {
  // 1. Resolve the flow definition
  const flowDefinition = await activities.resolveFlowDefinition(flowId, organizationId);
 
  // 2. Initialize the execution context
  const context = {
    executionId: workflowInfo().workflowId,
    flowId,
    organizationId,
    input: inputData,
    state: {},
    output: {},
  };
 
  // 3. Execute each step in the flow
  for (const step of flowDefinition.steps) {
    try {
      // 4. Resolve step definition
      const stepDefinition = await activities.resolveStepDefinition(
        step.stepId,
        step.version,
        organizationId,
      );
 
      // 5. Determine execution type
      if (stepDefinition.stepType === 'human-task') {
        // 6a. Execute human task
        const taskResult = await activities.executeHumanTask(
          step.id,
          context,
          stepDefinition,
          step.configuration,
        );
 
        // Update context with task result
        context.state[step.id] = taskResult;
      } else {
        // 6b. Execute standard step
        const stepResult = await activities.executeStep(
          step.id,
          context,
          stepDefinition,
          step.configuration,
        );
 
        // Update context with step result
        context.state[step.id] = stepResult;
      }
 
      // 7. Process step output mapping
      if (step.outputMapping) {
        applyOutputMapping(context, step.outputMapping);
      }
    } catch (error) {
      // 8. Handle step error based on error handling configuration
      if (step.errorHandling?.continueOnError) {
        context.state[step.id] = { error: error.message };
        continue;
      }
 
      throw error;
    }
  }
 
  // 9. Return flow execution result
  return context.output;
}

Database Schema

The Executor Service uses a relational database schema to store definitions, configurations, and execution metadata:

Core Tables

Organizations

CREATE TABLE Organizations (
    id UUID PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    tier VARCHAR(50) NOT NULL, -- 'free', 'standard', 'enterprise'
    status VARCHAR(50) NOT NULL, -- 'active', 'suspended', 'trial'
    createdAt TIMESTAMP NOT NULL,
    updatedAt TIMESTAMP NOT NULL
);

Users

CREATE TABLE Users (
    id UUID PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    firstName VARCHAR(100),
    lastName VARCHAR(100),
    role VARCHAR(50) NOT NULL, -- 'admin', 'developer', 'viewer'
    organizationId UUID REFERENCES Organizations(id),
    status VARCHAR(50) NOT NULL, -- 'active', 'inactive', 'invited'
    createdAt TIMESTAMP NOT NULL,
    updatedAt TIMESTAMP NOT NULL
);

Definition Tables

Flows

CREATE TABLE Flows (
    id UUID PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    version VARCHAR(50) NOT NULL,
    description TEXT,
    definition JSONB NOT NULL, -- The actual flow definition
    schemaVersion VARCHAR(50) NOT NULL,
    publisherId UUID NOT NULL, -- Organization or system
    isPublic BOOLEAN NOT NULL DEFAULT false,
    status VARCHAR(50) NOT NULL, -- 'draft', 'published', 'deprecated'
    tags JSONB,
    createdAt TIMESTAMP NOT NULL,
    updatedAt TIMESTAMP NOT NULL,
    UNIQUE(name, version, publisherId)
);

Steps

CREATE TABLE Steps (
    id UUID PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    version VARCHAR(50) NOT NULL,
    description TEXT,
    definition JSONB NOT NULL, -- The actual step definition
    schemaVersion VARCHAR(50) NOT NULL,
    publisherId UUID NOT NULL, -- Organization or system
    stepType VARCHAR(50) NOT NULL, -- 'standard', 'human-task', 'integration'
    isPublic BOOLEAN NOT NULL DEFAULT false,
    status VARCHAR(50) NOT NULL, -- 'draft', 'published', 'deprecated'
    tags JSONB,
    createdAt TIMESTAMP NOT NULL,
    updatedAt TIMESTAMP NOT NULL,
    UNIQUE(name, version, publisherId)
);

Execution Metadata Tables

FlowExecutions

CREATE TABLE FlowExecutions (
    id UUID PRIMARY KEY,
    flowId UUID NOT NULL REFERENCES Flows(id),
    organizationId UUID NOT NULL REFERENCES Organizations(id),
    status VARCHAR(50) NOT NULL, -- 'running', 'completed', 'failed', 'canceled'
    temporalWorkflowId VARCHAR(255),
    temporalRunId VARCHAR(255),
    startedAt TIMESTAMP NOT NULL,
    completedAt TIMESTAMP,
    initiatedBy UUID REFERENCES Users(id),
    tags JSONB,
    metadata JSONB -- Additional execution metadata
);

StepExecutions

CREATE TABLE StepExecutions (
    id UUID PRIMARY KEY,
    flowExecutionId UUID NOT NULL REFERENCES FlowExecutions(id),
    stepId UUID NOT NULL REFERENCES Steps(id),
    status VARCHAR(50) NOT NULL, -- 'pending', 'running', 'completed', 'failed'
    startedAt TIMESTAMP,
    completedAt TIMESTAMP,
    retryCount INTEGER NOT NULL DEFAULT 0,
    metadata JSONB -- Step-specific execution metadata
);

API Endpoints

The Executor Service exposes a comprehensive RESTful API:

Flow Management

GET    /api/v1/flows                  # List flows with filtering
POST   /api/v1/flows                  # Create a new flow
GET    /api/v1/flows/:id              # Get flow by ID
PUT    /api/v1/flows/:id              # Update flow
DELETE /api/v1/flows/:id              # Delete flow
POST   /api/v1/flows/:id/publish      # Publish flow version
POST   /api/v1/flows/:id/deprecate    # Deprecate flow version
POST   /api/v1/flows/:id/execute      # Execute a flow

Step Management

GET    /api/v1/steps                  # List steps with filtering
POST   /api/v1/steps                  # Create a new step
GET    /api/v1/steps/:id              # Get step by ID
PUT    /api/v1/steps/:id              # Update step
DELETE /api/v1/steps/:id              # Delete step
POST   /api/v1/steps/:id/publish      # Publish step version
POST   /api/v1/steps/:id/deprecate    # Deprecate step version

Execution Management

GET    /api/v1/executions                  # List flow executions
GET    /api/v1/executions/:id              # Get execution details
POST   /api/v1/executions/:id/cancel       # Cancel execution
GET    /api/v1/executions/:id/steps        # Get execution step details
GET    /api/v1/executions/:id/logs         # Get execution logs

Security Implementation

The Executor Service implements comprehensive security measures:

Authentication

The service implements multiple authentication methods:

  1. JWT Authentication

    • Used for client SDKs and API access
    • Role-based claims with scoped permissions
    • Short-lived tokens with refresh capability
  2. API Key Authentication

    • Used for programmatic access
    • Scoped to specific operations
    • Rate-limited based on plan tier

Authorization Model

The authorization system uses a role-based access control model with these key roles:

  1. Organization Admin

    • Manage organization settings
    • Control user access
    • Configure private registries
    • Enable/disable marketplace components
  2. Developer

    • Create and edit flows and steps
    • Configure store connections
    • View execution details
    • Debug workflow issues
  3. Operator

    • Monitor executions
    • Restart failed flows
    • Handle human tasks
    • View operational metrics
  4. Viewer

    • View flows and steps
    • View execution status
    • View analytics and reports

Data Protection

For sensitive data handling, the service implements:

  1. Data Encryption

    • All credentials and secrets encrypted at rest
    • Encryption key rotation capabilities
    • Field-level encryption for sensitive data
  2. Isolation

    • Strict multi-tenant data isolation
    • Request context validation
    • Resource ownership verification

Deployment Architecture

Docker-Based Deployment

The Executor Service is containerized for deployment flexibility:

├── API Service
│   ├── REST API Container
│   └── GraphQL API Container
├── Core Services
│   ├── Definition Management Container
│   ├── Registry Management Container
│   ├── Execution Control Container
│   └── Store Configuration Container
├── Databases
│   ├── Primary Database Container
│   └── Redis Cache Container
└── Supporting Services
    ├── Authentication Service Container
    ├── Notification Service Container
    └── Scheduler Container

Kubernetes Deployment

For production Kubernetes deployment, the service uses:

  1. Horizontal Pod Autoscaling

    • Scale API and service pods based on CPU/memory
    • Traffic-based scaling for API layer
    • Queue-based scaling for workers
  2. Service Mesh

    • Istio-based service mesh for traffic management
    • Circuit breaking and retry policies
    • Distributed tracing integration
  3. Secret Management

    • Kubernetes secrets for credentials
    • External secret store integration (HashiCorp Vault)
    • Automated secret rotation

Monitoring and Observability

Metrics Collection

The service exports metrics for:

  1. Service Health

    • API request rates and latencies
    • Error rates and types
    • Resource utilization
  2. Business Metrics

    • Flow execution counts
    • Completion rates and times
    • Step performance statistics
    • Human task completion time
  3. Resource Utilization

    • Database connection pool usage
    • Cache hit/miss rates
    • Worker utilization

Implementation Roadmap

The Executor Service will be implemented in phases:

Phase 1: Foundation

  • Core API development
  • Database schema implementation
  • Basic CRUD operations for definitions
  • Authentication and authorization framework

Phase 2: Execution

  • Temporal integration
  • Flow execution engine
  • Step execution handling
  • Error handling and retries

Phase 3: Features

  • Human task integration
  • Registry management
  • Monitoring and observability
  • Performance optimizations

Phase 4: Enterprise

  • Advanced security features
  • Multi-region deployment
  • High availability configurations
  • Enterprise integrations

Conclusion

The Executor Service is the central component in ZoopFlow's distributed execution architecture, managing definitions, orchestrating executions, and coordinating with client environments. This technical specification provides the foundation for implementing a robust, scalable service that meets the needs of diverse enterprise clients while maintaining security, performance, and reliability.