Skip to content
2 changes: 2 additions & 0 deletions packages/_example/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ FOREST_AUTH_SECRET=
EXECUTOR_AGENT_URL=http://localhost:3351
WORKFLOW_EXECUTOR_URL=http://localhost:3400
EXECUTOR_DATABASE_URL=postgresql://executor:password@localhost:5459/workflow_executor
# At-rest encryption key for secrets the executor stores (openssl rand -hex 32; same value across instances)
FOREST_EXECUTOR_ENCRYPTION_KEY=
Comment thread
hercemer42 marked this conversation as resolved.
# when start:with-executor:multiple-instances command
# EXECUTOR_AGENT_URL=http://host.docker.internal:3351
# WORKFLOW_EXECUTOR_URL=http://localhost:3400
Expand Down
11 changes: 9 additions & 2 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ Workflows currently run entirely in the **frontend** (`forestadmin/frontend`). T
This works for interactive use cases but blocks **automation**: scheduled workflows, API-triggered runs, and headless execution all require a human with a browser open. The goal of this migration is to move workflow execution to the **backend** (client-side agent infrastructure) so workflows can run without a frontend and without human intervention.

### What stays on the front

- Workflow designer (BPMN editor)
- Run monitoring / progress display
- Manual decisions when the AI can't decide (`manual-decision` status)

### What moves to the backend (this package)

- Step execution (condition decisions, AI tasks, record operations)
- AI calls (gateway option selection, tool selection, tool execution)
- Record selection and data access (via AgentPort)

### Constraint: must be ISO with front

The executor must produce the same behavior as the frontend implementation (`forestadmin/frontend`, `app/features/workflow/`). Same tool schemas, same AI interactions, same fallback logic.

## System Architecture
Expand Down Expand Up @@ -59,6 +62,7 @@ src/
├── stores/ # RunStore implementations
│ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests
│ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations
│ ├── mcp-oauth-credentials-store.ts # McpOAuthCredentialsStore — ai_mcp_oauth_credentials (002 migration)
│ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore
├── adapters/ # Port implementations
│ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client
Expand All @@ -72,7 +76,10 @@ src/
│ ├── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow)
│ └── guidance-step-executor.ts # Manual guidance step (saves user input, no AI)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
│ ├── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger, POST+DELETE /mcp-oauth-credentials
│ └── mcp-oauth-credentials.ts # Deposit-body zod schema (.strict()) + buildMcpOAuthCredentialInput mapper
├── crypto/ # At-rest encryption
│ └── credential-encryption.ts # CredentialEncryption — HKDF (FOREST_EXECUTOR_ENCRYPTION_KEY) + AES-GCM, lazy key, fail-closed
└── index.ts # Barrel exports
```

Expand All @@ -91,7 +98,7 @@ src/
- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` runs before `doExecute()` so neither cache hits nor uncertain-state errors reach the activity log emitted by `AgentWithLog`. The `executing` write-ahead marker is saved in the `beforeCall` thunk the executor passes to `AgentWithLog`'s write methods (run after `createPending`, just before the side effect) so an activity-log creation failure never leaves an orphan `executing` marker. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations).
- **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when *no* field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when _no_ field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
- **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor.
- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (each clone's `originalStepIndex` points at the step it copies) plus a fresh re-execution of the revised step. Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this leaks a superseded branch's context into a re-run. To find a step's RunStore execution, resolve own `stepIndex` first, then fall back to `originalStepIndex` (a clone the executor never ran inherits the copied step's record — mirrors the frontend's `carryForwardExecutorDataForCopiedSteps`). Own-index-first is essential: a re-executed step has its own entry, so it must never inherit the superseded original's record. Never key on `stepName` — LinkTo loops can put the same name on the live path twice.
Expand Down
1 change: 1 addition & 0 deletions packages/workflow-executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ npm install -g @forestadmin/workflow-executor
| `FOREST_SERVER_URL` | — | `https://api.forestadmin.com` | Orchestrator URL |
| `POLLING_INTERVAL_S` | — | `30` | Poll cadence for pending steps |
| `STOP_TIMEOUT_S` | — | `30` | Graceful shutdown deadline |
| `FOREST_EXECUTOR_ENCRYPTION_KEY` | —† | — | At-rest encryption key for secrets the executor stores (HKDF-derived, AES-256-GCM). Generate with `openssl rand -hex 32`; set the **same** value across all executor instances. (†Read lazily — required only when a feature that persists secrets is used, currently OAuth-protected MCP servers.) |

Optional AI configuration (all-or-nothing — falls back to server AI if any is missing):

Expand Down
3 changes: 3 additions & 0 deletions packages/workflow-executor/example/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
FOREST_ENV_SECRET=
FOREST_AUTH_SECRET=

# At-rest encryption key for secrets the executor stores (openssl rand -hex 32; same value across all instances)
FOREST_EXECUTOR_ENCRYPTION_KEY=

# Your locally running Forest Admin agent
AGENT_URL=http://localhost:3351

Expand Down
7 changes: 7 additions & 0 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import createConsoleLogger from './adapters/console-logger';
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory';
import ServerAiAdapter from './adapters/server-ai-adapter';
import CredentialEncryption from './crypto/credential-encryption';
import {
DEFAULT_AI_INVOKE_TIMEOUT_S,
DEFAULT_FOREST_SERVER_URL,
Expand All @@ -24,7 +25,9 @@ import {
import ExecutorHttpServer from './http/executor-http-server';
import Runner from './runner';
import SchemaCache from './schema-cache';
import DatabaseMcpOAuthCredentialsStore from './stores/database-mcp-oauth-credentials-store';
import DatabaseStore from './stores/database-store';
import InMemoryMcpOAuthCredentialsStore from './stores/in-memory-mcp-oauth-credentials-store';
import InMemoryStore from './stores/in-memory-store';

const FORCE_EXIT_DELAY_S = 5;
Expand Down Expand Up @@ -211,6 +214,8 @@ export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecuto
authSecret: options.authSecret,
workflowPort: deps.workflowPort,
logger: deps.logger,
mcpOAuthCredentialsStore: new InMemoryMcpOAuthCredentialsStore(),
credentialEncryption: new CredentialEncryption(),
});

return createWorkflowExecutor(runner, server, deps.logger);
Expand Down Expand Up @@ -239,6 +244,8 @@ export function buildDatabaseExecutor(options: DatabaseExecutorOptions): Workflo
authSecret: options.authSecret,
workflowPort: deps.workflowPort,
logger: deps.logger,
mcpOAuthCredentialsStore: new DatabaseMcpOAuthCredentialsStore({ sequelize }),
credentialEncryption: new CredentialEncryption(),
});

return createWorkflowExecutor(runner, server, deps.logger);
Expand Down
90 changes: 90 additions & 0 deletions packages/workflow-executor/src/crypto/credential-encryption.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { createCipheriv, createDecipheriv, hkdfSync, randomFillSync } from 'crypto';

import { ExecutorEncryptionKeyMissingError } from '../errors';

const ENV_KEY = 'FOREST_EXECUTOR_ENCRYPTION_KEY';
// Fixed context label bound into the HKDF derivation — domain-separates this key from any other
// use of the same secret. Changing it would make every existing row undecryptable.
const HKDF_INFO = 'forest-executor:mcp-oauth-credentials';
const HKDF_DIGEST = 'sha256';
const KEY_BYTES = 32; // AES-256
const IV_BYTES = 12; // GCM standard nonce length
const AUTH_TAG_BYTES = 16;
const ALGORITHM = 'aes-256-gcm';
const CURRENT_ENC_KEY_VERSION = 1;

export interface EncryptedValue {
// Packed layout: iv | authTag | ciphertext — stored as a single BLOB column.
ciphertext: Buffer;
encKeyVersion: number;
}

// Concatenate byte arrays without going through Buffer.concat — keeps everything in the concrete
// Uint8Array<ArrayBuffer> domain the Node crypto types expect.
function concatBytes(parts: Uint8Array[]): Uint8Array {
const total = parts.reduce((length, part) => length + part.length, 0);
const out = new Uint8Array(total);
let offset = 0;

for (const part of parts) {
out.set(part, offset);
offset += part.length;
}

return out;
}

// At-rest encryption for secrets the executor stores. The HKDF key (from
// FOREST_EXECUTOR_ENCRYPTION_KEY) is read lazily — an executor that stores no such secrets boots
// without it — and fails closed: a missing key throws rather than persisting or returning an
// unprotected value.
export default class CredentialEncryption {
private readonly encKeyVersion: number;

constructor(encKeyVersion: number = CURRENT_ENC_KEY_VERSION) {
this.encKeyVersion = encKeyVersion;
}

encrypt(plaintext: string): EncryptedValue {
const iv = randomFillSync(new Uint8Array(IV_BYTES));
const cipher = createCipheriv(ALGORITHM, this.deriveKey(), iv);
const encrypted = concatBytes([
new Uint8Array(cipher.update(plaintext, 'utf8')),
new Uint8Array(cipher.final()),
]);
const authTag = new Uint8Array(cipher.getAuthTag());

return {
ciphertext: Buffer.from(concatBytes([iv, authTag, encrypted])),
encKeyVersion: this.encKeyVersion,
};
}

decrypt(value: Buffer): string {
const bytes = new Uint8Array(value);
const iv = bytes.subarray(0, IV_BYTES);
const authTag = bytes.subarray(IV_BYTES, IV_BYTES + AUTH_TAG_BYTES);
const encrypted = bytes.subarray(IV_BYTES + AUTH_TAG_BYTES);

const decipher = createDecipheriv(ALGORITHM, this.deriveKey(), iv);
decipher.setAuthTag(authTag);

const decrypted = concatBytes([
new Uint8Array(decipher.update(encrypted)),
new Uint8Array(decipher.final()),
]);

return Buffer.from(decrypted).toString('utf8');
}

private deriveKey(): Uint8Array {
const secret = process.env[ENV_KEY];

if (!secret) throw new ExecutorEncryptionKeyMissingError();

// Empty salt is intentional: the fixed HKDF_INFO label gives domain separation and the
// single high-entropy secret needs no salt. Wrap hkdfSync's ArrayBuffer as a concrete
// Uint8Array<ArrayBuffer> to satisfy CipherKey (Buffer's ArrayBufferLike backing does not).
return new Uint8Array(hkdfSync(HKDF_DIGEST, secret, new Uint8Array(0), HKDF_INFO, KEY_BYTES));
Comment thread
hercemer42 marked this conversation as resolved.
}
}
Comment thread
hercemer42 marked this conversation as resolved.
11 changes: 11 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,17 @@ export class ConfigurationError extends Error {
}
}

// Boundary error — the deposit endpoint maps it to a typed HTTP response so the frontend can tell
// an operator to provision the key, not a generic or re-consent failure.
export class ExecutorEncryptionKeyMissingError extends Error {
readonly code = 'executor_encryption_key_missing';

constructor() {
super('FOREST_EXECUTOR_ENCRYPTION_KEY is not set');
this.name = 'ExecutorEncryptionKeyMissingError';
}
}

// Run lifecycle/access errors raised by the Runner. Each extends a domain category, so toHttpError
// maps them by category (404/409/403) — no per-error HTTP binding.
export class RunNotFoundError extends NotFoundError {
Expand Down
Loading
Loading