diff --git a/packages/_example/.env.example b/packages/_example/.env.example index 7c4ebaf6e7..85da10b99d 100644 --- a/packages/_example/.env.example +++ b/packages/_example/.env.example @@ -21,6 +21,8 @@ FOREST_AUTH_SECRET= EXECUTOR_AGENT_URL=http://localhost:3351 WORKFLOW_EXECUTOR_URL=http://localhost:3400 EXECUTOR_DATABASE_URL=postgresql://executor:password@localhost:5459/workflow_executor +# At-rest encryption key for secrets the executor stores (openssl rand -hex 32; same value across instances) +FOREST_EXECUTOR_ENCRYPTION_KEY= # when start:with-executor:multiple-instances command # EXECUTOR_AGENT_URL=http://host.docker.internal:3351 # WORKFLOW_EXECUTOR_URL=http://localhost:3400 diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 1e5b1d22ec..ff45ae4348 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -13,16 +13,19 @@ Workflows currently run entirely in the **frontend** (`forestadmin/frontend`). T This works for interactive use cases but blocks **automation**: scheduled workflows, API-triggered runs, and headless execution all require a human with a browser open. The goal of this migration is to move workflow execution to the **backend** (client-side agent infrastructure) so workflows can run without a frontend and without human intervention. ### What stays on the front + - Workflow designer (BPMN editor) - Run monitoring / progress display - Manual decisions when the AI can't decide (`manual-decision` status) ### What moves to the backend (this package) + - Step execution (condition decisions, AI tasks, record operations) - AI calls (gateway option selection, tool selection, tool execution) - Record selection and data access (via AgentPort) ### Constraint: must be ISO with front + The executor must produce the same behavior as the frontend implementation (`forestadmin/frontend`, `app/features/workflow/`). Same tool schemas, same AI interactions, same fallback logic. ## System Architecture @@ -59,6 +62,7 @@ src/ ├── stores/ # RunStore implementations │ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests │ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations +│ ├── mcp-oauth-credentials-store.ts # McpOAuthCredentialsStore — ai_mcp_oauth_credentials (002 migration) │ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore ├── adapters/ # Port implementations │ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client @@ -72,7 +76,10 @@ src/ │ ├── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow) │ └── guidance-step-executor.ts # Manual guidance step (saves user input, no AI) ├── http/ # HTTP server (optional, for frontend data access) -│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger +│ ├── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger, POST+DELETE /mcp-oauth-credentials +│ └── mcp-oauth-credentials.ts # Deposit-body zod schema (.strict()) + buildMcpOAuthCredentialInput mapper +├── crypto/ # At-rest encryption +│ └── credential-encryption.ts # CredentialEncryption — HKDF (FOREST_EXECUTOR_ENCRYPTION_KEY) + AES-GCM, lazy key, fail-closed └── index.ts # Barrel exports ``` @@ -91,7 +98,7 @@ src/ - **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` runs before `doExecute()` so neither cache hits nor uncertain-state errors reach the activity log emitted by `AgentWithLog`. The `executing` write-ahead marker is saved in the `beforeCall` thunk the executor passes to `AgentWithLog`'s write methods (run after `createPending`, just before the side effect) so an activity-log creation failure never leaves an orphan `executing` marker. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe. - **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations). - **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry. -- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when *no* field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. +- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when _no_ field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. - **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. - **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor. - **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (each clone's `originalStepIndex` points at the step it copies) plus a fresh re-execution of the revised step. Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this leaks a superseded branch's context into a re-run. To find a step's RunStore execution, resolve own `stepIndex` first, then fall back to `originalStepIndex` (a clone the executor never ran inherits the copied step's record — mirrors the frontend's `carryForwardExecutorDataForCopiedSteps`). Own-index-first is essential: a re-executed step has its own entry, so it must never inherit the superseded original's record. Never key on `stepName` — LinkTo loops can put the same name on the live path twice. diff --git a/packages/workflow-executor/README.md b/packages/workflow-executor/README.md index 56635d6806..fd2d310d34 100644 --- a/packages/workflow-executor/README.md +++ b/packages/workflow-executor/README.md @@ -26,6 +26,7 @@ npm install -g @forestadmin/workflow-executor | `FOREST_SERVER_URL` | — | `https://api.forestadmin.com` | Orchestrator URL | | `POLLING_INTERVAL_S` | — | `30` | Poll cadence for pending steps | | `STOP_TIMEOUT_S` | — | `30` | Graceful shutdown deadline | +| `FOREST_EXECUTOR_ENCRYPTION_KEY` | —† | — | At-rest encryption key for secrets the executor stores (HKDF-derived, AES-256-GCM). Generate with `openssl rand -hex 32`; set the **same** value across all executor instances. (†Read lazily — required only when a feature that persists secrets is used, currently OAuth-protected MCP servers.) | Optional AI configuration (all-or-nothing — falls back to server AI if any is missing): diff --git a/packages/workflow-executor/example/.env.example b/packages/workflow-executor/example/.env.example index fb96106528..354b34b67a 100644 --- a/packages/workflow-executor/example/.env.example +++ b/packages/workflow-executor/example/.env.example @@ -2,6 +2,9 @@ FOREST_ENV_SECRET= FOREST_AUTH_SECRET= +# At-rest encryption key for secrets the executor stores (openssl rand -hex 32; same value across all instances) +FOREST_EXECUTOR_ENCRYPTION_KEY= + # Your locally running Forest Admin agent AGENT_URL=http://localhost:3351 diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 3c0dc0ddd7..1eb3950424 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -13,6 +13,7 @@ import createConsoleLogger from './adapters/console-logger'; import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory'; import ServerAiAdapter from './adapters/server-ai-adapter'; +import CredentialEncryption from './crypto/credential-encryption'; import { DEFAULT_AI_INVOKE_TIMEOUT_S, DEFAULT_FOREST_SERVER_URL, @@ -24,7 +25,9 @@ import { import ExecutorHttpServer from './http/executor-http-server'; import Runner from './runner'; import SchemaCache from './schema-cache'; +import DatabaseMcpOAuthCredentialsStore from './stores/database-mcp-oauth-credentials-store'; import DatabaseStore from './stores/database-store'; +import InMemoryMcpOAuthCredentialsStore from './stores/in-memory-mcp-oauth-credentials-store'; import InMemoryStore from './stores/in-memory-store'; const FORCE_EXIT_DELAY_S = 5; @@ -211,6 +214,8 @@ export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecuto authSecret: options.authSecret, workflowPort: deps.workflowPort, logger: deps.logger, + mcpOAuthCredentialsStore: new InMemoryMcpOAuthCredentialsStore(), + credentialEncryption: new CredentialEncryption(), }); return createWorkflowExecutor(runner, server, deps.logger); @@ -239,6 +244,8 @@ export function buildDatabaseExecutor(options: DatabaseExecutorOptions): Workflo authSecret: options.authSecret, workflowPort: deps.workflowPort, logger: deps.logger, + mcpOAuthCredentialsStore: new DatabaseMcpOAuthCredentialsStore({ sequelize }), + credentialEncryption: new CredentialEncryption(), }); return createWorkflowExecutor(runner, server, deps.logger); diff --git a/packages/workflow-executor/src/crypto/credential-encryption.ts b/packages/workflow-executor/src/crypto/credential-encryption.ts new file mode 100644 index 0000000000..6e68bd4fe2 --- /dev/null +++ b/packages/workflow-executor/src/crypto/credential-encryption.ts @@ -0,0 +1,90 @@ +import { createCipheriv, createDecipheriv, hkdfSync, randomFillSync } from 'crypto'; + +import { ExecutorEncryptionKeyMissingError } from '../errors'; + +const ENV_KEY = 'FOREST_EXECUTOR_ENCRYPTION_KEY'; +// Fixed context label bound into the HKDF derivation — domain-separates this key from any other +// use of the same secret. Changing it would make every existing row undecryptable. +const HKDF_INFO = 'forest-executor:mcp-oauth-credentials'; +const HKDF_DIGEST = 'sha256'; +const KEY_BYTES = 32; // AES-256 +const IV_BYTES = 12; // GCM standard nonce length +const AUTH_TAG_BYTES = 16; +const ALGORITHM = 'aes-256-gcm'; +const CURRENT_ENC_KEY_VERSION = 1; + +export interface EncryptedValue { + // Packed layout: iv | authTag | ciphertext — stored as a single BLOB column. + ciphertext: Buffer; + encKeyVersion: number; +} + +// Concatenate byte arrays without going through Buffer.concat — keeps everything in the concrete +// Uint8Array domain the Node crypto types expect. +function concatBytes(parts: Uint8Array[]): Uint8Array { + const total = parts.reduce((length, part) => length + part.length, 0); + const out = new Uint8Array(total); + let offset = 0; + + for (const part of parts) { + out.set(part, offset); + offset += part.length; + } + + return out; +} + +// At-rest encryption for secrets the executor stores. The HKDF key (from +// FOREST_EXECUTOR_ENCRYPTION_KEY) is read lazily — an executor that stores no such secrets boots +// without it — and fails closed: a missing key throws rather than persisting or returning an +// unprotected value. +export default class CredentialEncryption { + private readonly encKeyVersion: number; + + constructor(encKeyVersion: number = CURRENT_ENC_KEY_VERSION) { + this.encKeyVersion = encKeyVersion; + } + + encrypt(plaintext: string): EncryptedValue { + const iv = randomFillSync(new Uint8Array(IV_BYTES)); + const cipher = createCipheriv(ALGORITHM, this.deriveKey(), iv); + const encrypted = concatBytes([ + new Uint8Array(cipher.update(plaintext, 'utf8')), + new Uint8Array(cipher.final()), + ]); + const authTag = new Uint8Array(cipher.getAuthTag()); + + return { + ciphertext: Buffer.from(concatBytes([iv, authTag, encrypted])), + encKeyVersion: this.encKeyVersion, + }; + } + + decrypt(value: Buffer): string { + const bytes = new Uint8Array(value); + const iv = bytes.subarray(0, IV_BYTES); + const authTag = bytes.subarray(IV_BYTES, IV_BYTES + AUTH_TAG_BYTES); + const encrypted = bytes.subarray(IV_BYTES + AUTH_TAG_BYTES); + + const decipher = createDecipheriv(ALGORITHM, this.deriveKey(), iv); + decipher.setAuthTag(authTag); + + const decrypted = concatBytes([ + new Uint8Array(decipher.update(encrypted)), + new Uint8Array(decipher.final()), + ]); + + return Buffer.from(decrypted).toString('utf8'); + } + + private deriveKey(): Uint8Array { + const secret = process.env[ENV_KEY]; + + if (!secret) throw new ExecutorEncryptionKeyMissingError(); + + // Empty salt is intentional: the fixed HKDF_INFO label gives domain separation and the + // single high-entropy secret needs no salt. Wrap hkdfSync's ArrayBuffer as a concrete + // Uint8Array to satisfy CipherKey (Buffer's ArrayBufferLike backing does not). + return new Uint8Array(hkdfSync(HKDF_DIGEST, secret, new Uint8Array(0), HKDF_INFO, KEY_BYTES)); + } +} diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 8ad2f7e113..f73019c34e 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -321,6 +321,17 @@ export class ConfigurationError extends Error { } } +// Boundary error — the deposit endpoint maps it to a typed HTTP response so the frontend can tell +// an operator to provision the key, not a generic or re-consent failure. +export class ExecutorEncryptionKeyMissingError extends Error { + readonly code = 'executor_encryption_key_missing'; + + constructor() { + super('FOREST_EXECUTOR_ENCRYPTION_KEY is not set'); + this.name = 'ExecutorEncryptionKeyMissingError'; + } +} + // Run lifecycle/access errors raised by the Runner. Each extends a domain category, so toHttpError // maps them by category (404/409/403) — no per-error HTTP binding. export class RunNotFoundError extends NotFoundError { diff --git a/packages/workflow-executor/src/http/executor-http-server.ts b/packages/workflow-executor/src/http/executor-http-server.ts index 51c97c3dd1..7d713acf7a 100644 --- a/packages/workflow-executor/src/http/executor-http-server.ts +++ b/packages/workflow-executor/src/http/executor-http-server.ts @@ -1,4 +1,6 @@ +import type CredentialEncryption from '../crypto/credential-encryption'; import type { Logger } from '../ports/logger-port'; +import type { McpOAuthCredentialsStore } from '../ports/mcp-oauth-credentials-store'; import type { WorkflowPort } from '../ports/workflow-port'; import type Runner from '../runner'; import type { Server } from 'http'; @@ -11,14 +13,19 @@ import koaJwt from 'koa-jwt'; import { type BearerClaims, BearerClaimsSchema } from './bearer-claims'; import { + BadRequestHttpError, ForbiddenHttpError, ServiceUnavailableHttpError, UnauthorizedHttpError, toHttpError, } from './http-errors'; +import { + buildMcpOAuthCredentialInput, + depositCredentialsBodySchema, +} from './mcp-oauth-credentials'; import serializeStepForWire from './step-serializer'; import createConsoleLogger from '../adapters/console-logger'; -import { extractErrorMessage } from '../errors'; +import { ExecutorEncryptionKeyMissingError, extractErrorMessage } from '../errors'; export interface ExecutorHttpServerOptions { port: number; @@ -26,17 +33,21 @@ export interface ExecutorHttpServerOptions { authSecret: string; workflowPort: WorkflowPort; logger?: Logger; + mcpOAuthCredentialsStore: McpOAuthCredentialsStore; + credentialEncryption: CredentialEncryption; } export default class ExecutorHttpServer { private readonly app: Koa; private readonly options: ExecutorHttpServerOptions; private readonly logger: Logger; + private readonly mcpOAuthCredentialsStore: McpOAuthCredentialsStore; private server: Server | null = null; constructor(options: ExecutorHttpServerOptions) { this.options = options; this.logger = options.logger ?? createConsoleLogger(); + this.mcpOAuthCredentialsStore = options.mcpOAuthCredentialsStore; this.app = new Koa(); // Error-translation middleware — the single place converting thrown errors (typed HTTP @@ -134,11 +145,22 @@ export default class ExecutorHttpServer { ); router.post('/runs/:runId/trigger', this.handleTrigger.bind(this)); + const { mcpOAuthCredentialsStore: credentialsStore, credentialEncryption } = this.options; + + router.post('/mcp-oauth-credentials', ctx => + this.handleDepositCredentials(ctx, credentialsStore, credentialEncryption), + ); + router.delete('/mcp-oauth-credentials/:mcpServerId', ctx => + this.handleDeleteCredentials(ctx, credentialsStore), + ); + this.app.use(router.routes()); this.app.use(router.allowedMethods()); } async start(): Promise { + await this.mcpOAuthCredentialsStore.init(this.logger); + return new Promise((resolve, reject) => { this.server = http.createServer(this.app.callback()); this.server.once('error', reject); @@ -210,4 +232,49 @@ export default class ExecutorHttpServer { ctx.status = 200; ctx.body = { triggered: true }; } + + private async handleDepositCredentials( + ctx: Koa.Context, + store: McpOAuthCredentialsStore, + encryption: CredentialEncryption, + ): Promise { + const userId = (ctx.state.user as BearerClaims).id; + const parsed = depositCredentialsBodySchema.safeParse(ctx.request.body ?? {}); + + if (!parsed.success) { + const details = parsed.error.issues + .map(issue => `${issue.path.join('.') || 'body'}: ${issue.message}`) + .join('; '); + + throw new BadRequestHttpError(`Invalid request body — ${details}`); + } + + try { + await store.upsert(buildMcpOAuthCredentialInput({ body: parsed.data, userId, encryption })); + } catch (err) { + // The frontend must tell this missing-key config error apart from a generic failure (to route + // the user to an admin rather than retry), so it returns a typed { code } the middleware won't. + if (err instanceof ExecutorEncryptionKeyMissingError) { + ctx.status = 503; + ctx.body = { code: err.code }; + + return; + } + + throw err; + } + + ctx.status = 200; + ctx.body = { stored: true }; + } + + private async handleDeleteCredentials( + ctx: Koa.Context, + store: McpOAuthCredentialsStore, + ): Promise { + const userId = (ctx.state.user as BearerClaims).id; + + await store.delete(userId, ctx.params.mcpServerId); + ctx.status = 204; + } } diff --git a/packages/workflow-executor/src/http/mcp-oauth-credentials.ts b/packages/workflow-executor/src/http/mcp-oauth-credentials.ts new file mode 100644 index 0000000000..ed205d9330 --- /dev/null +++ b/packages/workflow-executor/src/http/mcp-oauth-credentials.ts @@ -0,0 +1,54 @@ +import type CredentialEncryption from '../crypto/credential-encryption'; +import type { McpOAuthCredentialInput } from '../ports/mcp-oauth-credentials-store'; + +import { z } from 'zod'; + +// String lengths mirror the DB column limits, so oversized values are rejected here at the boundary +// rather than at insert. .strict() matters for security: it blocks a body-supplied user id, since +// identity comes only from the JWT. +export const depositCredentialsBodySchema = z + .object({ + mcpServerId: z.string().min(1).max(255), + refreshToken: z.string().min(1), + clientId: z.string().max(255).optional(), + clientSecret: z.string().optional(), + clientSecretExpiresAt: z + .string() + .refine(value => !Number.isNaN(Date.parse(value)), { message: 'must be a parseable date' }) + .optional(), + tokenEndpoint: z.string().min(1).max(2048), + tokenEndpointAuthMethod: z.string().max(64).optional(), + scopes: z.string().max(2048).optional(), + }) + .strict(); + +export type DepositCredentialsBody = z.infer; + +// Translates a validated deposit body into the at-rest record: encrypts the refresh token (and +// client secret when present) and maps optional fields to their nullable columns. encrypt() throws +// ExecutorEncryptionKeyMissingError when the key is unset; the caller maps that to a 503. +export function buildMcpOAuthCredentialInput({ + body, + userId, + encryption, +}: { + body: DepositCredentialsBody; + userId: number; + encryption: CredentialEncryption; +}): McpOAuthCredentialInput { + const refreshToken = encryption.encrypt(body.refreshToken); + const clientSecret = body.clientSecret ? encryption.encrypt(body.clientSecret) : null; + + return { + userId, + mcpServerId: body.mcpServerId, + refreshTokenEnc: refreshToken.ciphertext, + clientId: body.clientId ?? null, + clientSecretEnc: clientSecret?.ciphertext ?? null, + clientSecretExpiresAt: body.clientSecretExpiresAt ? new Date(body.clientSecretExpiresAt) : null, + tokenEndpoint: body.tokenEndpoint, + tokenEndpointAuthMethod: body.tokenEndpointAuthMethod ?? null, + scopes: body.scopes ?? null, + encKeyVersion: refreshToken.encKeyVersion, + }; +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 7fa349a083..e18ed25d2b 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -100,6 +100,7 @@ export { AiModelPortError, AgentProbeError, ConfigurationError, + ExecutorEncryptionKeyMissingError, InvalidPreRecordedArgsError, UnsupportedStepTypeError, UnsupportedActionFormError, @@ -126,6 +127,16 @@ export { default as SchemaResolver } from './schema-resolver'; export { default as InMemoryStore } from './stores/in-memory-store'; export { default as DatabaseStore } from './stores/database-store'; export type { DatabaseStoreOptions } from './stores/database-store'; +export { default as DatabaseMcpOAuthCredentialsStore } from './stores/database-mcp-oauth-credentials-store'; +export type { DatabaseMcpOAuthCredentialsStoreOptions } from './stores/database-mcp-oauth-credentials-store'; +export { default as InMemoryMcpOAuthCredentialsStore } from './stores/in-memory-mcp-oauth-credentials-store'; +export type { + McpOAuthCredentialsStore, + McpOAuthCredentialInput, + StoredMcpOAuthCredential, +} from './ports/mcp-oauth-credentials-store'; +export { default as CredentialEncryption } from './crypto/credential-encryption'; +export type { EncryptedValue } from './crypto/credential-encryption'; export { buildDatabaseRunStore, buildInMemoryRunStore } from './stores/build-run-store'; export { buildInMemoryExecutor, buildDatabaseExecutor } from './build-workflow-executor'; export { runCli } from './cli-core'; diff --git a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts new file mode 100644 index 0000000000..cdb5bd829f --- /dev/null +++ b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts @@ -0,0 +1,29 @@ +import type { Logger } from './logger-port'; + +export interface McpOAuthCredentialInput { + userId: number; + mcpServerId: string; + refreshTokenEnc: Buffer; + clientId?: string | null; + clientSecretEnc?: Buffer | null; + clientSecretExpiresAt?: Date | null; + tokenEndpoint: string; + tokenEndpointAuthMethod?: string | null; + scopes?: string | null; + encKeyVersion: number; +} + +export interface StoredMcpOAuthCredential extends McpOAuthCredentialInput { + id: number; +} + +// Persists OAuth MCP credentials, one row per (userId, mcpServerId). Backed by a Sequelize store +// (real executors) or an in-memory one (--in-memory / dev). Holds opaque encrypted bytes — +// encryption happens upstream in buildMcpOAuthCredentialInput — so implementations do no crypto. +export interface McpOAuthCredentialsStore { + init(logger?: Logger): Promise; + close(logger?: Logger): Promise; + get(userId: number, mcpServerId: string): Promise; + upsert(credential: McpOAuthCredentialInput): Promise; + delete(userId: number, mcpServerId: string): Promise; +} diff --git a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts new file mode 100644 index 0000000000..92e54dc2aa --- /dev/null +++ b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts @@ -0,0 +1,205 @@ +import type { Logger } from '../ports/logger-port'; +import type { + McpOAuthCredentialInput, + McpOAuthCredentialsStore, + StoredMcpOAuthCredential, +} from '../ports/mcp-oauth-credentials-store'; +import type { QueryInterface, Sequelize } from 'sequelize'; + +import { DataTypes } from 'sequelize'; +import { SequelizeStorage, Umzug } from 'umzug'; + +import { extractErrorMessage } from '../errors'; + +const TABLE_NAME = 'ai_mcp_oauth_credentials'; + +export interface DatabaseMcpOAuthCredentialsStoreOptions { + sequelize: Sequelize; +} + +interface CredentialRow { + id: number; + user_id: number; + mcp_server_id: string; + refresh_token_enc: Buffer; + client_id: string | null; + client_secret_enc: Buffer | null; + client_secret_expires_at: string | Date | null; + token_endpoint: string; + token_endpoint_auth_method: string | null; + scopes: string | null; + enc_key_version: number; +} + +export default class DatabaseMcpOAuthCredentialsStore implements McpOAuthCredentialsStore { + private readonly sequelize: Sequelize; + + constructor(options: DatabaseMcpOAuthCredentialsStoreOptions) { + this.sequelize = options.sequelize; + } + + async init(logger?: Logger): Promise { + const umzug = new Umzug({ + migrations: [ + { + name: '002_create_mcp_oauth_credentials', + up: async ({ context }: { context: QueryInterface }) => { + await context.createTable(TABLE_NAME, { + id: { type: DataTypes.INTEGER, primaryKey: true, autoIncrement: true }, + userId: { type: DataTypes.INTEGER, allowNull: false, field: 'user_id' }, + mcpServerId: { + type: DataTypes.STRING(255), + allowNull: false, + field: 'mcp_server_id', + }, + refreshTokenEnc: { + type: DataTypes.BLOB, + allowNull: false, + field: 'refresh_token_enc', + }, + clientId: { type: DataTypes.STRING(255), allowNull: true, field: 'client_id' }, + clientSecretEnc: { + type: DataTypes.BLOB, + allowNull: true, + field: 'client_secret_enc', + }, + clientSecretExpiresAt: { + type: DataTypes.DATE, + allowNull: true, + field: 'client_secret_expires_at', + }, + tokenEndpoint: { + type: DataTypes.STRING(2048), + allowNull: false, + field: 'token_endpoint', + }, + tokenEndpointAuthMethod: { + type: DataTypes.STRING(64), + allowNull: true, + field: 'token_endpoint_auth_method', + }, + scopes: { type: DataTypes.STRING(2048), allowNull: true, field: 'scopes' }, + encKeyVersion: { + type: DataTypes.INTEGER, + allowNull: false, + field: 'enc_key_version', + }, + createdAt: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW, + field: 'created_at', + }, + updatedAt: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW, + field: 'updated_at', + }, + }); + + await context.addIndex(TABLE_NAME, ['user_id', 'mcp_server_id'], { + unique: true, + name: 'idx_user_id_mcp_server_id', + }); + }, + down: async ({ context }: { context: QueryInterface }) => { + await context.dropTable(TABLE_NAME); + }, + }, + ], + context: this.sequelize.getQueryInterface(), + storage: new SequelizeStorage({ sequelize: this.sequelize }), + logger: undefined, + }); + + try { + await umzug.up(); + } catch (error) { + logger?.('Error', 'MCP OAuth credentials migration failed', { + error: extractErrorMessage(error), + }); + throw error; + } + } + + async get(userId: number, mcpServerId: string): Promise { + const [rows] = await this.sequelize.query( + `SELECT * FROM ${TABLE_NAME} WHERE user_id = :userId AND mcp_server_id = :mcpServerId`, + { replacements: { userId, mcpServerId } }, + ); + + const row = (rows as CredentialRow[])[0]; + + return row ? DatabaseMcpOAuthCredentialsStore.toCredential(row) : null; + } + + async upsert(credential: McpOAuthCredentialInput): Promise { + await this.sequelize.transaction(async transaction => { + const now = new Date(); + const replacements = { + userId: credential.userId, + mcpServerId: credential.mcpServerId, + refreshTokenEnc: credential.refreshTokenEnc, + clientId: credential.clientId ?? null, + clientSecretEnc: credential.clientSecretEnc ?? null, + clientSecretExpiresAt: credential.clientSecretExpiresAt ?? null, + tokenEndpoint: credential.tokenEndpoint ?? null, + tokenEndpointAuthMethod: credential.tokenEndpointAuthMethod ?? null, + scopes: credential.scopes ?? null, + encKeyVersion: credential.encKeyVersion, + now, + }; + + // Delete + insert in transaction: dialect-agnostic upsert (avoids ON CONFLICT / ON DUPLICATE). + await this.sequelize.query( + `DELETE FROM ${TABLE_NAME} WHERE user_id = :userId AND mcp_server_id = :mcpServerId`, + { replacements, transaction }, + ); + await this.sequelize.query( + `INSERT INTO ${TABLE_NAME} ` + + '(user_id, mcp_server_id, refresh_token_enc, client_id, client_secret_enc, ' + + 'client_secret_expires_at, token_endpoint, token_endpoint_auth_method, scopes, ' + + 'enc_key_version, created_at, updated_at) VALUES ' + + '(:userId, :mcpServerId, :refreshTokenEnc, :clientId, :clientSecretEnc, ' + + ':clientSecretExpiresAt, :tokenEndpoint, :tokenEndpointAuthMethod, :scopes, ' + + ':encKeyVersion, :now, :now)', + { replacements, transaction }, + ); + }); + } + + async delete(userId: number, mcpServerId: string): Promise { + await this.sequelize.query( + `DELETE FROM ${TABLE_NAME} WHERE user_id = :userId AND mcp_server_id = :mcpServerId`, + { replacements: { userId, mcpServerId } }, + ); + } + + async close(logger?: Logger): Promise { + try { + await this.sequelize.close(); + } catch (error) { + logger?.('Error', 'Failed to close database connection', { + error: extractErrorMessage(error), + }); + } + } + + private static toCredential(row: CredentialRow): StoredMcpOAuthCredential { + return { + id: Number(row.id), + userId: Number(row.user_id), + mcpServerId: row.mcp_server_id, + refreshTokenEnc: row.refresh_token_enc, + clientId: row.client_id ?? null, + clientSecretEnc: row.client_secret_enc ?? null, + clientSecretExpiresAt: + row.client_secret_expires_at == null ? null : new Date(row.client_secret_expires_at), + tokenEndpoint: row.token_endpoint, + tokenEndpointAuthMethod: row.token_endpoint_auth_method ?? null, + scopes: row.scopes ?? null, + encKeyVersion: Number(row.enc_key_version), + }; + } +} diff --git a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts new file mode 100644 index 0000000000..29b7f9de59 --- /dev/null +++ b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts @@ -0,0 +1,42 @@ +import type { + McpOAuthCredentialInput, + McpOAuthCredentialsStore, + StoredMcpOAuthCredential, +} from '../ports/mcp-oauth-credentials-store'; + +// In-memory MCP OAuth credentials store for --in-memory / dev. Same throwaway semantics as +// InMemoryStore (the run store): state is lost on restart. It holds the already-encrypted Buffers +// produced by buildMcpOAuthCredentialInput — there is no crypto here, just a Map keyed by +// (userId, mcpServerId), mirroring the DB store's one-row-per-key contract. +export default class InMemoryMcpOAuthCredentialsStore implements McpOAuthCredentialsStore { + private readonly data = new Map(); + private nextId = 1; + + async init(): Promise { + // No-op: nothing to migrate for an in-memory store. + } + + async close(): Promise { + // No-op: nothing to close. + } + + async get(userId: number, mcpServerId: string): Promise { + return this.data.get(InMemoryMcpOAuthCredentialsStore.key(userId, mcpServerId)) ?? null; + } + + async upsert(credential: McpOAuthCredentialInput): Promise { + // Overwrite in place — one row per (userId, mcpServerId). A fresh id each time mirrors the DB + // store's delete-then-insert; nothing relies on id stability. + const key = InMemoryMcpOAuthCredentialsStore.key(credential.userId, credential.mcpServerId); + this.data.set(key, { ...credential, id: this.nextId }); + this.nextId += 1; + } + + async delete(userId: number, mcpServerId: string): Promise { + this.data.delete(InMemoryMcpOAuthCredentialsStore.key(userId, mcpServerId)); + } + + private static key(userId: number, mcpServerId: string): string { + return `${userId}:${mcpServerId}`; + } +} diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index c9e1131af1..d306df115f 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -1,9 +1,13 @@ import ForestServerWorkflowPort from '../src/adapters/forest-server-workflow-port'; import { buildDatabaseExecutor, buildInMemoryExecutor } from '../src/build-workflow-executor'; +import CredentialEncryption from '../src/crypto/credential-encryption'; import { DEFAULT_SCHEMA_CACHE_TTL_S } from '../src/defaults'; +import ExecutorHttpServer from '../src/http/executor-http-server'; import Runner from '../src/runner'; import SchemaCache from '../src/schema-cache'; +import DatabaseMcpOAuthCredentialsStore from '../src/stores/database-mcp-oauth-credentials-store'; import DatabaseStore from '../src/stores/database-store'; +import InMemoryMcpOAuthCredentialsStore from '../src/stores/in-memory-mcp-oauth-credentials-store'; import InMemoryStore from '../src/stores/in-memory-store'; jest.mock('../src/runner'); @@ -56,6 +60,17 @@ describe('buildInMemoryExecutor', () => { ); }); + it('wires the in-memory OAuth credentials store and encryption into the HTTP server', () => { + buildInMemoryExecutor(BASE_OPTIONS); + + expect(ExecutorHttpServer).toHaveBeenCalledWith( + expect.objectContaining({ + mcpOAuthCredentialsStore: expect.any(InMemoryMcpOAuthCredentialsStore), + credentialEncryption: expect.any(CredentialEncryption), + }), + ); + }); + it('creates ForestServerWorkflowPort with default forestServerUrl', () => { buildInMemoryExecutor(BASE_OPTIONS); @@ -257,6 +272,17 @@ describe('buildDatabaseExecutor', () => { ); }); + it('wires the database OAuth credentials store and encryption into the HTTP server', () => { + buildDatabaseExecutor(DB_OPTIONS); + + expect(ExecutorHttpServer).toHaveBeenCalledWith( + expect.objectContaining({ + mcpOAuthCredentialsStore: expect.any(DatabaseMcpOAuthCredentialsStore), + credentialEncryption: expect.any(CredentialEncryption), + }), + ); + }); + it('creates Sequelize with uri and passes remaining options through', () => { buildDatabaseExecutor(DB_OPTIONS); diff --git a/packages/workflow-executor/test/crypto/credential-encryption.test.ts b/packages/workflow-executor/test/crypto/credential-encryption.test.ts new file mode 100644 index 0000000000..5a174e12be --- /dev/null +++ b/packages/workflow-executor/test/crypto/credential-encryption.test.ts @@ -0,0 +1,212 @@ +/** + * Spec for the at-rest credential encryption helper. + * + * Behaviour: + * - Key is derived in-process via HKDF (`crypto.hkdfSync`, fixed context label) from a dedicated + * `FOREST_EXECUTOR_ENCRYPTION_KEY` env var — separate from `FOREST_AUTH_SECRET`. + * - The key is read LAZILY (never required at construction / boot). + * - AES-GCM is used (authenticated encryption — tampering must be detected on decrypt). + * - Each encrypted value carries an `encKeyVersion` (persisted per-row by the store). + * - Fail closed: a missing key (or a failed decrypt) must throw, never return plaintext/garbage. + * + * Version-aware key selection (rotation) is not yet supported, so `decrypt` takes only the + * packed ciphertext; `encrypt` still surfaces `encKeyVersion` for the store to persist. + */ +import CredentialEncryption from '../../src/crypto/credential-encryption'; +import { ExecutorEncryptionKeyMissingError } from '../../src/errors'; + +const ENV_KEY = 'FOREST_EXECUTOR_ENCRYPTION_KEY'; +// 32-byte key as 64 hex chars (mirrors the envSecret format validated elsewhere). +const TEST_KEY = 'a'.repeat(64); +const OTHER_KEY = 'b'.repeat(64); + +describe('CredentialEncryption', () => { + const original = process.env[ENV_KEY]; + + beforeEach(() => { + process.env[ENV_KEY] = TEST_KEY; + }); + + afterEach(() => { + if (original === undefined) delete process.env[ENV_KEY]; + else process.env[ENV_KEY] = original; + }); + + describe('round-trip', () => { + it('decrypts back to the exact plaintext that was encrypted', () => { + const enc = new CredentialEncryption(); + const plaintext = 'refresh-token-abc123'; + + const { ciphertext } = enc.encrypt(plaintext); + + expect(enc.decrypt(ciphertext)).toBe(plaintext); + }); + + it('round-trips multi-byte unicode without corruption', () => { + const enc = new CredentialEncryption(); + const plaintext = 'tökén-🔐-Ω-secret'; + + const { ciphertext } = enc.encrypt(plaintext); + + expect(enc.decrypt(ciphertext)).toBe(plaintext); + }); + + it('round-trips an empty string (boundary: zero-length plaintext)', () => { + const enc = new CredentialEncryption(); + + const { ciphertext } = enc.encrypt(''); + + expect(enc.decrypt(ciphertext)).toBe(''); + }); + }); + + describe('output shape', () => { + it('returns ciphertext as a Buffer (blob-storable)', () => { + const enc = new CredentialEncryption(); + + const { ciphertext } = enc.encrypt('secret'); + + expect(Buffer.isBuffer(ciphertext)).toBe(true); + }); + + it('tags each value with a positive integer encKeyVersion', () => { + const enc = new CredentialEncryption(); + + const { encKeyVersion } = enc.encrypt('secret'); + + expect(Number.isInteger(encKeyVersion)).toBe(true); + expect(encKeyVersion).toBeGreaterThanOrEqual(1); + }); + + it('does not leak the plaintext into the ciphertext bytes', () => { + const enc = new CredentialEncryption(); + const plaintext = 'super-secret-refresh-token'; + + const { ciphertext } = enc.encrypt(plaintext); + + expect(ciphertext.toString('utf8')).not.toContain(plaintext); + expect(ciphertext.toString('latin1')).not.toContain(plaintext); + }); + }); + + describe('non-determinism (random IV per encryption)', () => { + it('produces different ciphertext for the same plaintext on repeated calls', () => { + const enc = new CredentialEncryption(); + + const a = enc.encrypt('same-plaintext'); + const b = enc.encrypt('same-plaintext'); + + expect(a.ciphertext.toString('hex')).not.toBe(b.ciphertext.toString('hex')); + }); + + it('still decrypts both independently to the same plaintext', () => { + const enc = new CredentialEncryption(); + + const a = enc.encrypt('same-plaintext'); + const b = enc.encrypt('same-plaintext'); + + expect(enc.decrypt(a.ciphertext)).toBe('same-plaintext'); + expect(enc.decrypt(b.ciphertext)).toBe('same-plaintext'); + }); + }); + + describe('authenticity (AES-GCM) — fail closed on tampering', () => { + it('throws when a ciphertext byte is flipped', () => { + const enc = new CredentialEncryption(); + const { ciphertext } = enc.encrypt('secret'); + + const tampered = Buffer.from(ciphertext.toString('hex'), 'hex'); + const last = tampered.length - 1; + tampered[last] = (tampered[last] + 1) % 256; + + expect(() => enc.decrypt(tampered)).toThrow(); + }); + + it('throws when a byte in the IV region is flipped', () => { + const enc = new CredentialEncryption(); + const { ciphertext } = enc.encrypt('secret'); + + // Packed layout is iv | authTag | ciphertext; the IV is the first 12 bytes. + const tampered = Buffer.from(ciphertext.toString('hex'), 'hex'); + tampered[0] = (tampered[0] + 1) % 256; + + expect(() => enc.decrypt(tampered)).toThrow(); + }); + + it('throws when a byte in the auth-tag region is flipped', () => { + const enc = new CredentialEncryption(); + const { ciphertext } = enc.encrypt('secret'); + + // The 16-byte auth tag immediately follows the 12-byte IV. + const tampered = Buffer.from(ciphertext.toString('hex'), 'hex'); + tampered[12] = (tampered[12] + 1) % 256; + + expect(() => enc.decrypt(tampered)).toThrow(); + }); + + it('throws when the ciphertext is truncated', () => { + const enc = new CredentialEncryption(); + const { ciphertext } = enc.encrypt('secret'); + + const truncated = ciphertext.subarray(0, ciphertext.length - 1); + + expect(() => enc.decrypt(truncated)).toThrow(); + }); + + it('throws when decrypting under a different key (cross-key, fail closed)', () => { + const enc = new CredentialEncryption(); + const { ciphertext } = enc.encrypt('secret'); + + // Rotate the host key out from under the same payload. + process.env[ENV_KEY] = OTHER_KEY; + const other = new CredentialEncryption(); + + expect(() => other.decrypt(ciphertext)).toThrow(); + }); + }); + + describe('key derivation', () => { + it('derives the same key across instances (empty-salt HKDF is deterministic)', () => { + const writer = new CredentialEncryption(); + const { ciphertext } = writer.encrypt('cross-instance-secret'); + + // A separate instance reading the same env key must decrypt the payload — this is what lets a + // restarted or horizontally-scaled executor read rows written by another instance, and pins + // the empty-salt / fixed-info derivation as stable rather than per-instance. + const reader = new CredentialEncryption(); + + expect(reader.decrypt(ciphertext)).toBe('cross-instance-secret'); + }); + + it('carries an explicit encKeyVersion through to the encrypted value', () => { + const enc = new CredentialEncryption(2); + + expect(enc.encrypt('secret').encKeyVersion).toBe(2); + }); + }); + + describe('lazy key reading', () => { + it('does not throw at construction when the key is unset', () => { + delete process.env[ENV_KEY]; + + expect(() => new CredentialEncryption()).not.toThrow(); + }); + + it('throws ExecutorEncryptionKeyMissingError on encrypt when the key is unset', () => { + delete process.env[ENV_KEY]; + const enc = new CredentialEncryption(); + + expect(() => enc.encrypt('secret')).toThrow(ExecutorEncryptionKeyMissingError); + }); + + it('throws ExecutorEncryptionKeyMissingError on decrypt when the key is unset', () => { + const enc = new CredentialEncryption(); + const { ciphertext } = enc.encrypt('secret'); + + delete process.env[ENV_KEY]; + const cold = new CredentialEncryption(); + + expect(() => cold.decrypt(ciphertext)).toThrow(ExecutorEncryptionKeyMissingError); + }); + }); +}); diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index 6fa8759f1e..a3d2f844db 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -1,10 +1,12 @@ import type { Logger } from '../../src/ports/logger-port'; +import type { McpOAuthCredentialsStore } from '../../src/ports/mcp-oauth-credentials-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type Runner from '../../src/runner'; import jsonwebtoken from 'jsonwebtoken'; import request from 'supertest'; +import CredentialEncryption from '../../src/crypto/credential-encryption'; import { MalformedRunError, RunAlreadyInFlightError, @@ -14,6 +16,7 @@ import { WorkflowPortError, } from '../../src/errors'; import ExecutorHttpServer from '../../src/http/executor-http-server'; +import InMemoryMcpOAuthCredentialsStore from '../../src/stores/in-memory-mcp-oauth-credentials-store'; const AUTH_SECRET = 'test-auth-secret'; @@ -49,6 +52,8 @@ function createServer( runner?: Runner; workflowPort?: WorkflowPort; logger?: jest.MockedFunction; + mcpOAuthCredentialsStore?: McpOAuthCredentialsStore; + credentialEncryption?: CredentialEncryption; } = {}, ) { return new ExecutorHttpServer({ @@ -57,10 +62,37 @@ function createServer( authSecret: AUTH_SECRET, workflowPort: overrides.workflowPort ?? createMockWorkflowPort(), logger: overrides.logger, + mcpOAuthCredentialsStore: + overrides.mcpOAuthCredentialsStore ?? new InMemoryMcpOAuthCredentialsStore(), + credentialEncryption: overrides.credentialEncryption ?? new CredentialEncryption(), }); } describe('ExecutorHttpServer', () => { + describe('start()', () => { + it('initializes the MCP OAuth credentials store with the logger', async () => { + const init = jest.fn().mockResolvedValue(undefined); + const logger: jest.MockedFunction = jest.fn(); + const server = new ExecutorHttpServer({ + port: 0, + runner: createMockRunner(), + authSecret: AUTH_SECRET, + workflowPort: createMockWorkflowPort(), + logger, + mcpOAuthCredentialsStore: { init } as unknown as McpOAuthCredentialsStore, + credentialEncryption: {} as unknown as CredentialEncryption, + }); + + await server.start(); + + try { + expect(init).toHaveBeenCalledWith(logger); + } finally { + await server.stop(); + } + }); + }); + describe('JWT authentication', () => { it('should return 401 when no token is provided', async () => { const server = createServer(); diff --git a/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts b/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts new file mode 100644 index 0000000000..01170dc776 --- /dev/null +++ b/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts @@ -0,0 +1,416 @@ +/** + * Spec for the OAuth credential deposit endpoint. + * + * Behaviour: + * - POST /mcp-oauth-credentials and DELETE deposit/disconnect, on the SAME HTTP server as /trigger. + * - Authenticated by the existing koaJwt middleware (forest_session_token, FOREST_AUTH_SECRET). + * - user_id is taken from the validated token, NEVER from the request body. + * - The executor encrypts the refresh token (+ client secret) and upserts one row per (user, server). + * - When FOREST_EXECUTOR_ENCRYPTION_KEY is unset, encryption fails closed and the endpoint returns + * a distinct, typed `executor_encryption_key_missing` (HTTP 503) — never a generic failure. + * - Dormant for now: nothing reads the table at runtime yet; only this deposit path writes it. + * + * Endpoint contract: + * - ExecutorHttpServer options gain: `mcpOAuthCredentialsStore` (upsert/get/delete) and + * `credentialEncryption` (encrypt/decrypt). Both injected like `runner` / `workflowPort`. + * - POST body (camelCase JSON): { mcpServerId, refreshToken, clientId?, clientSecret?, + * clientSecretExpiresAt?, tokenEndpoint?, tokenEndpointAuthMethod?, scopes? }. + * - DELETE path: /mcp-oauth-credentials/:mcpServerId. + * - Typed key-missing response: HTTP 503 with body { code: 'executor_encryption_key_missing' }. + */ +import jsonwebtoken from 'jsonwebtoken'; +import request from 'supertest'; + +import { ExecutorEncryptionKeyMissingError } from '../../src/errors'; +import ExecutorHttpServer from '../../src/http/executor-http-server'; + +const AUTH_SECRET = 'test-auth-secret'; + +function signToken(payload: object, secret = AUTH_SECRET, options?: jsonwebtoken.SignOptions) { + return jsonwebtoken.sign(payload, secret, { expiresIn: '1h', ...options }); +} + +function createMockRunner() { + return { + state: 'running', + start: jest.fn().mockResolvedValue(undefined), + stop: jest.fn().mockResolvedValue(undefined), + triggerPoll: jest.fn().mockResolvedValue(undefined), + getRunStepExecutions: jest.fn().mockResolvedValue([]), + }; +} + +function createMockWorkflowPort() { + return { + getAvailableRuns: jest.fn().mockResolvedValue({ pending: [], malformed: [] }), + getAvailableRun: jest.fn(), + updateStepExecution: jest.fn().mockResolvedValue(undefined), + getCollectionSchema: jest.fn(), + getMcpServerConfigs: jest.fn().mockResolvedValue({}), + hasRunAccess: jest.fn().mockResolvedValue(true), + }; +} + +function createMockStore() { + return { + init: jest.fn().mockResolvedValue(undefined), + upsert: jest.fn().mockResolvedValue(undefined), + get: jest.fn().mockResolvedValue(null), + delete: jest.fn().mockResolvedValue(undefined), + close: jest.fn().mockResolvedValue(undefined), + }; +} + +function createMockEncryption() { + return { + // Deterministic stub: the route under test only needs an opaque blob + version back. + encrypt: jest.fn((plaintext: string) => ({ + ciphertext: Buffer.from(`enc(${plaintext})`), + encKeyVersion: 1, + })), + decrypt: jest.fn(), + }; +} + +function createServer(overrides: Record = {}) { + return new ExecutorHttpServer({ + port: 0, + runner: createMockRunner(), + authSecret: AUTH_SECRET, + workflowPort: createMockWorkflowPort(), + mcpOAuthCredentialsStore: createMockStore(), + credentialEncryption: createMockEncryption(), + ...overrides, + } as never); +} + +const validBody = { + mcpServerId: 'mcp-server-1', + refreshToken: 'refresh-token-xyz', + clientId: 'client-abc', + clientSecret: 'client-secret-123', + tokenEndpoint: 'https://auth.example.com/token', + tokenEndpointAuthMethod: 'client_secret_post', + scopes: 'read write', +}; + +describe('POST /mcp-oauth-credentials', () => { + describe('authentication', () => { + it('returns 401 when no token is provided', async () => { + const server = createServer(); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .send(validBody); + + expect(response.status).toBe(401); + expect(response.body).toEqual({ error: 'Unauthorized' }); + }); + + it('returns 401 when the token is signed with the wrong secret', async () => { + const server = createServer(); + const token = signToken({ id: 1 }, 'wrong-secret'); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(response.status).toBe(401); + }); + + it('does not write to the store when unauthenticated', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + + await request(server.callback).post('/mcp-oauth-credentials').send(validBody); + + expect(store.upsert).not.toHaveBeenCalled(); + }); + }); + + describe('user identity from token', () => { + it('upserts using the user id from the token', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 7 }); + + await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(store.upsert).toHaveBeenCalledWith( + expect.objectContaining({ userId: 7, mcpServerId: 'mcp-server-1' }), + ); + }); + + it('rejects a body that tries to supply a user id (the token is the only source)', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 7 }); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send({ ...validBody, userId: 999, user_id: 999 }); + + expect(response.status).toBe(400); + expect(store.upsert).not.toHaveBeenCalled(); + }); + + it('returns 401 when the token carries no numeric id (rejected by the claims middleware)', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ email: 'no-id@example.com' }); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(response.status).toBe(401); + expect(store.upsert).not.toHaveBeenCalled(); + }); + }); + + describe('encryption before persistence', () => { + it('encrypts the refresh token and stores only the ciphertext (never plaintext)', async () => { + const store = createMockStore(); + const encryption = createMockEncryption(); + const server = createServer({ + mcpOAuthCredentialsStore: store, + credentialEncryption: encryption, + }); + const token = signToken({ id: 1 }); + + await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(encryption.encrypt).toHaveBeenCalledWith('refresh-token-xyz'); + const persisted = store.upsert.mock.calls[0][0]; + expect(Buffer.isBuffer(persisted.refreshTokenEnc)).toBe(true); + expect(persisted.refreshTokenEnc.toString()).toBe('enc(refresh-token-xyz)'); + expect(persisted.encKeyVersion).toBe(1); + // The plaintext must not have been handed to the store under any field. + expect(JSON.stringify(persisted)).not.toContain('refresh-token-xyz'); + }); + + it('encrypts the client secret when one is provided', async () => { + const store = createMockStore(); + const encryption = createMockEncryption(); + const server = createServer({ + mcpOAuthCredentialsStore: store, + credentialEncryption: encryption, + }); + const token = signToken({ id: 1 }); + + await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(encryption.encrypt).toHaveBeenCalledWith('client-secret-123'); + expect(store.upsert.mock.calls[0][0].clientSecretEnc.toString()).toBe( + 'enc(client-secret-123)', + ); + }); + + it('stores a null client secret for a public / PKCE client (no clientSecret in body)', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + const { clientSecret, ...publicBody } = validBody; + + await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send({ ...publicBody, tokenEndpointAuthMethod: 'none' }); + + expect(store.upsert).toHaveBeenCalledWith(expect.objectContaining({ clientSecretEnc: null })); + }); + }); + + describe('fail closed when the encryption key is missing', () => { + it('returns 503 with a typed executor_encryption_key_missing code', async () => { + const encryption = createMockEncryption(); + encryption.encrypt.mockImplementation(() => { + throw new ExecutorEncryptionKeyMissingError(); + }); + const server = createServer({ credentialEncryption: encryption }); + const token = signToken({ id: 1 }); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(response.status).toBe(503); + expect(response.body).toEqual( + expect.objectContaining({ code: 'executor_encryption_key_missing' }), + ); + }); + + it('does not persist anything when the key is missing', async () => { + const store = createMockStore(); + const encryption = createMockEncryption(); + encryption.encrypt.mockImplementation(() => { + throw new ExecutorEncryptionKeyMissingError(); + }); + const server = createServer({ + mcpOAuthCredentialsStore: store, + credentialEncryption: encryption, + }); + const token = signToken({ id: 1 }); + + await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(store.upsert).not.toHaveBeenCalled(); + }); + }); + + describe('body validation', () => { + it('returns 400 when the refresh token is missing', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + const { refreshToken, ...noRefresh } = validBody; + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(noRefresh); + + expect(response.status).toBe(400); + expect(store.upsert).not.toHaveBeenCalled(); + }); + + it('returns 400 when mcpServerId is missing', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + const { mcpServerId, ...noServer } = validBody; + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(noServer); + + expect(response.status).toBe(400); + expect(store.upsert).not.toHaveBeenCalled(); + }); + + it('returns 400 when tokenEndpoint is missing', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + const { tokenEndpoint, ...noEndpoint } = validBody; + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(noEndpoint); + + expect(response.status).toBe(400); + expect(store.upsert).not.toHaveBeenCalled(); + }); + + it('returns 400 when a field has the wrong type', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send({ ...validBody, refreshToken: 12345 }); + + expect(response.status).toBe(400); + expect(store.upsert).not.toHaveBeenCalled(); + }); + + it('returns 400 when clientSecretExpiresAt is not a parseable date', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send({ ...validBody, clientSecretExpiresAt: 'not-a-date' }); + + expect(response.status).toBe(400); + expect(store.upsert).not.toHaveBeenCalled(); + }); + }); + + describe('store failure', () => { + it('returns 500 when the store rejects', async () => { + const store = createMockStore(); + store.upsert.mockRejectedValue(new Error('db down')); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 1 }); + + const response = await request(server.callback) + .post('/mcp-oauth-credentials') + .set('Authorization', `Bearer ${token}`) + .send(validBody); + + expect(response.status).toBe(500); + }); + }); +}); + +describe('DELETE /mcp-oauth-credentials/:mcpServerId', () => { + it('returns 401 when no token is provided', async () => { + const server = createServer(); + + const response = await request(server.callback).delete('/mcp-oauth-credentials/mcp-server-1'); + + expect(response.status).toBe(401); + }); + + it('deletes the credential for (token user, mcpServerId) and returns 204 with no body', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 7 }); + + const response = await request(server.callback) + .delete('/mcp-oauth-credentials/mcp-server-1') + .set('Authorization', `Bearer ${token}`); + + expect(response.status).toBe(204); + expect(response.body).toEqual({}); + expect(store.delete).toHaveBeenCalledWith(7, 'mcp-server-1'); + }); + + it("does not delete another user's credential", async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ id: 7 }); + + await request(server.callback) + .delete('/mcp-oauth-credentials/mcp-server-1') + .set('Authorization', `Bearer ${token}`); + + expect(store.delete).not.toHaveBeenCalledWith(999, expect.anything()); + }); + + it('returns 401 when the token carries no numeric id (rejected by the claims middleware)', async () => { + const store = createMockStore(); + const server = createServer({ mcpOAuthCredentialsStore: store }); + const token = signToken({ email: 'no-id@example.com' }); + + const response = await request(server.callback) + .delete('/mcp-oauth-credentials/mcp-server-1') + .set('Authorization', `Bearer ${token}`); + + expect(response.status).toBe(401); + expect(store.delete).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/workflow-executor/test/http/mcp-oauth-credentials.test.ts b/packages/workflow-executor/test/http/mcp-oauth-credentials.test.ts new file mode 100644 index 0000000000..aef9726c98 --- /dev/null +++ b/packages/workflow-executor/test/http/mcp-oauth-credentials.test.ts @@ -0,0 +1,82 @@ +import type CredentialEncryption from '../../src/crypto/credential-encryption'; +import type { DepositCredentialsBody } from '../../src/http/mcp-oauth-credentials'; + +import { ExecutorEncryptionKeyMissingError } from '../../src/errors'; +import { buildMcpOAuthCredentialInput } from '../../src/http/mcp-oauth-credentials'; + +function createEncryption(): CredentialEncryption { + return { + encrypt: jest.fn((plaintext: string) => ({ + ciphertext: Buffer.from(`enc(${plaintext})`), + encKeyVersion: 1, + })), + decrypt: jest.fn(), + } as unknown as CredentialEncryption; +} + +const fullBody: DepositCredentialsBody = { + mcpServerId: 'mcp-server-1', + refreshToken: 'refresh-token-xyz', + clientId: 'client-abc', + clientSecret: 'client-secret-123', + clientSecretExpiresAt: '2030-01-02T03:04:05.000Z', + tokenEndpoint: 'https://auth.example.com/token', + tokenEndpointAuthMethod: 'client_secret_post', + scopes: 'read write', +}; + +describe('buildMcpOAuthCredentialInput', () => { + it('encrypts the refresh token and maps the record for the given user', () => { + const encryption = createEncryption(); + + const input = buildMcpOAuthCredentialInput({ body: fullBody, userId: 7, encryption }); + + expect(encryption.encrypt).toHaveBeenCalledWith('refresh-token-xyz'); + expect(input.userId).toBe(7); + expect(input.mcpServerId).toBe('mcp-server-1'); + expect(input.refreshTokenEnc.toString()).toBe('enc(refresh-token-xyz)'); + expect(input.tokenEndpoint).toBe('https://auth.example.com/token'); + expect(input.tokenEndpointAuthMethod).toBe('client_secret_post'); + expect(input.scopes).toBe('read write'); + expect(input.encKeyVersion).toBe(1); + }); + + it('encrypts the client secret and parses the expiry when both are provided', () => { + const encryption = createEncryption(); + + const input = buildMcpOAuthCredentialInput({ body: fullBody, userId: 7, encryption }); + + expect(encryption.encrypt).toHaveBeenCalledWith('client-secret-123'); + expect(input.clientSecretEnc?.toString()).toBe('enc(client-secret-123)'); + expect(input.clientSecretExpiresAt).toEqual(new Date('2030-01-02T03:04:05.000Z')); + }); + + it('leaves optional client fields null for a public / PKCE client', () => { + const encryption = createEncryption(); + const publicBody: DepositCredentialsBody = { + mcpServerId: 'mcp-server-1', + refreshToken: 'refresh-token-xyz', + tokenEndpoint: 'https://auth.example.com/token', + }; + + const input = buildMcpOAuthCredentialInput({ body: publicBody, userId: 7, encryption }); + + expect(encryption.encrypt).toHaveBeenCalledTimes(1); + expect(input.clientId).toBeNull(); + expect(input.clientSecretEnc).toBeNull(); + expect(input.clientSecretExpiresAt).toBeNull(); + expect(input.tokenEndpointAuthMethod).toBeNull(); + expect(input.scopes).toBeNull(); + }); + + it('propagates ExecutorEncryptionKeyMissingError so the caller can fail closed', () => { + const encryption = createEncryption(); + (encryption.encrypt as jest.Mock).mockImplementation(() => { + throw new ExecutorEncryptionKeyMissingError(); + }); + + expect(() => buildMcpOAuthCredentialInput({ body: fullBody, userId: 7, encryption })).toThrow( + ExecutorEncryptionKeyMissingError, + ); + }); +}); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 482e827972..3ad67f71ec 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -9,9 +9,11 @@ import jsonwebtoken from 'jsonwebtoken'; import request from 'supertest'; import { z } from 'zod'; +import CredentialEncryption from '../../src/crypto/credential-encryption'; import ExecutorHttpServer from '../../src/http/executor-http-server'; import Runner from '../../src/runner'; import SchemaCache from '../../src/schema-cache'; +import InMemoryMcpOAuthCredentialsStore from '../../src/stores/in-memory-mcp-oauth-credentials-store'; import InMemoryStore from '../../src/stores/in-memory-store'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; @@ -213,6 +215,8 @@ function createIntegrationSetup(overrides?: { runner, authSecret: AUTH_SECRET, workflowPort, + mcpOAuthCredentialsStore: new InMemoryMcpOAuthCredentialsStore(), + credentialEncryption: new CredentialEncryption(), }); return { runner, server, workflowPort, agentPort, runStore, aiClient, model }; diff --git a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts new file mode 100644 index 0000000000..be57f07ac5 --- /dev/null +++ b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts @@ -0,0 +1,313 @@ +/** + * Spec for the database (Sequelize) MCP OAuth credentials store + its Umzug migration. + * + * Behaviour: + * - One row per (user_id, mcp_server_id) — UNIQUE (user_id, mcp_server_id); upsert in place. + * - Refresh token + client secret are stored as encrypted BLOBs; the store persists opaque bytes + * (encryption itself is exercised in credential-encryption.test.ts — the store does not encrypt). + * - client_id, client_secret_enc, client_secret_expires_at, scopes are nullable + * (null for public / PKCE clients). + * - enc_key_version is stored per row. + * - Deleted on disconnect / permanent refresh failure. + * - Migration `002_create_mcp_oauth_credentials` is added alongside `001_create_workflow_step_executions`. + * + * Store contract: + * import DatabaseMcpOAuthCredentialsStore from '../../src/stores/database-mcp-oauth-credentials-store'; + * const store = new DatabaseMcpOAuthCredentialsStore({ sequelize }); + * await store.init(); // runs the 002 migration (table exists after) + * await store.upsert(credential); // keyed by (userId, mcpServerId) + * const row = await store.get(userId, mcpServerId); // StoredCredential | null + * await store.delete(userId, mcpServerId); + * await store.close(); + * + * Field names are camelCase, mapping to the snake_case columns. + */ +import type { Sequelize as SequelizeType } from 'sequelize'; + +import { Sequelize } from 'sequelize'; + +import DatabaseMcpOAuthCredentialsStore from '../../src/stores/database-mcp-oauth-credentials-store'; + +interface CredentialInput { + userId: number; + mcpServerId: string; + refreshTokenEnc: Buffer; + clientId?: string | null; + clientSecretEnc?: Buffer | null; + clientSecretExpiresAt?: Date | null; + tokenEndpoint: string; + tokenEndpointAuthMethod?: string | null; + scopes?: string | null; + encKeyVersion: number; +} + +function makeCredential(overrides: Partial = {}): CredentialInput { + return { + userId: 42, + mcpServerId: 'mcp-server-1', + refreshTokenEnc: Buffer.from('enc-refresh-token'), + clientId: 'client-abc', + clientSecretEnc: Buffer.from('enc-client-secret'), + clientSecretExpiresAt: null, + tokenEndpoint: 'https://auth.example.com/token', + tokenEndpointAuthMethod: 'client_secret_post', + scopes: 'read write', + encKeyVersion: 1, + ...overrides, + }; +} + +// Asserts presence and narrows the type — avoids non-null assertions (`!`), which the codebase avoids. +function unwrap(value: T | null | undefined): T { + if (value === null || value === undefined) { + throw new Error('expected a stored credential, got null/undefined'); + } + + return value; +} + +describe('DatabaseMcpOAuthCredentialsStore (SQLite)', () => { + let sequelize: SequelizeType; + let store: DatabaseMcpOAuthCredentialsStore; + + beforeEach(async () => { + sequelize = new Sequelize({ dialect: 'sqlite', storage: ':memory:', logging: false }); + store = new DatabaseMcpOAuthCredentialsStore({ sequelize }); + await store.init(); + }); + + afterEach(async () => { + await store.close(); + }); + + describe('get', () => { + it('returns null for an unknown (userId, mcpServerId)', async () => { + expect(await store.get(999, 'no-such-server')).toBeNull(); + }); + + it('returns the stored credential for a known (userId, mcpServerId)', async () => { + const credential = makeCredential(); + + await store.upsert(credential); + const row = await store.get(credential.userId, credential.mcpServerId); + + expect(row).toEqual( + expect.objectContaining({ + userId: 42, + mcpServerId: 'mcp-server-1', + clientId: 'client-abc', + tokenEndpoint: 'https://auth.example.com/token', + tokenEndpointAuthMethod: 'client_secret_post', + scopes: 'read write', + encKeyVersion: 1, + }), + ); + }); + + it('preserves the encrypted blobs byte-for-byte', async () => { + const refreshTokenEnc = Buffer.from([0x00, 0x01, 0xfe, 0xff, 0x10]); + const clientSecretEnc = Buffer.from([0xde, 0xad, 0xbe, 0xef]); + + await store.upsert(makeCredential({ refreshTokenEnc, clientSecretEnc })); + const row = unwrap(await store.get(42, 'mcp-server-1')); + + expect(row.refreshTokenEnc.toString('hex')).toBe(refreshTokenEnc.toString('hex')); + expect(unwrap(row.clientSecretEnc).toString('hex')).toBe(clientSecretEnc.toString('hex')); + }); + }); + + describe('upsert', () => { + it('updates the existing row in place for the same (userId, mcpServerId)', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old'), encKeyVersion: 1 })); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('new'), encKeyVersion: 2 })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + + expect(row.refreshTokenEnc.toString()).toBe('new'); + expect(row.encKeyVersion).toBe(2); + }); + + it('keeps exactly one row after re-upserting the same key (UNIQUE constraint)', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('v1') })); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('v2') })); + + // Pollution-proof: count only rows for this known key, never the whole table. + const [rows] = await sequelize.query( + 'SELECT COUNT(*) AS c FROM ai_mcp_oauth_credentials WHERE user_id = 42 AND mcp_server_id = :id', + { replacements: { id: 'mcp-server-1' } }, + ); + expect(Number((rows[0] as { c: number }).c)).toBe(1); + }); + + it('stores nullable client fields as null for a public / PKCE client', async () => { + await store.upsert( + makeCredential({ + clientId: null, + clientSecretEnc: null, + clientSecretExpiresAt: null, + tokenEndpointAuthMethod: 'none', + scopes: null, + }), + ); + + const row = await store.get(42, 'mcp-server-1'); + + expect(row).toEqual( + expect.objectContaining({ + clientId: null, + clientSecretEnc: null, + clientSecretExpiresAt: null, + scopes: null, + }), + ); + }); + + it('persists client_secret_expires_at when provided', async () => { + const expiresAt = new Date('2030-01-02T03:04:05.000Z'); + + await store.upsert(makeCredential({ clientSecretExpiresAt: expiresAt })); + const row = unwrap(await store.get(42, 'mcp-server-1')); + + expect(new Date(unwrap(row.clientSecretExpiresAt)).toISOString()).toBe( + expiresAt.toISOString(), + ); + }); + }); + + describe('isolation', () => { + it('keeps credentials for the same server but different users separate', async () => { + await store.upsert(makeCredential({ userId: 1, refreshTokenEnc: Buffer.from('user-1') })); + await store.upsert(makeCredential({ userId: 2, refreshTokenEnc: Buffer.from('user-2') })); + + const rowOne = unwrap(await store.get(1, 'mcp-server-1')); + const rowTwo = unwrap(await store.get(2, 'mcp-server-1')); + + expect(rowOne.refreshTokenEnc.toString()).toBe('user-1'); + expect(rowTwo.refreshTokenEnc.toString()).toBe('user-2'); + }); + + it('keeps credentials for the same user but different servers separate', async () => { + await store.upsert( + makeCredential({ mcpServerId: 'server-a', refreshTokenEnc: Buffer.from('a') }), + ); + await store.upsert( + makeCredential({ mcpServerId: 'server-b', refreshTokenEnc: Buffer.from('b') }), + ); + + const rowA = unwrap(await store.get(42, 'server-a')); + const rowB = unwrap(await store.get(42, 'server-b')); + + expect(rowA.refreshTokenEnc.toString()).toBe('a'); + expect(rowB.refreshTokenEnc.toString()).toBe('b'); + }); + }); + + describe('delete', () => { + it('removes the credential for a (userId, mcpServerId)', async () => { + await store.upsert(makeCredential()); + + await store.delete(42, 'mcp-server-1'); + + expect(await store.get(42, 'mcp-server-1')).toBeNull(); + }); + + it('does not affect other users when deleting one user', async () => { + await store.upsert(makeCredential({ userId: 1 })); + await store.upsert(makeCredential({ userId: 2 })); + + await store.delete(1, 'mcp-server-1'); + + expect(await store.get(1, 'mcp-server-1')).toBeNull(); + expect(await store.get(2, 'mcp-server-1')).not.toBeNull(); + }); + + it('is a no-op (does not throw) when deleting a non-existent credential', async () => { + await expect(store.delete(999, 'no-such-server')).resolves.toBeUndefined(); + }); + }); + + describe('migration / init', () => { + it('creates the ai_mcp_oauth_credentials table on init', async () => { + const [rows] = await sequelize.query( + "SELECT name FROM sqlite_master WHERE type='table' AND name='ai_mcp_oauth_credentials'", + ); + + expect(rows).toHaveLength(1); + }); + + it('runs init idempotently', async () => { + await expect(store.init()).resolves.toBeUndefined(); + }); + + it('rejects an insert with a null token_endpoint at the DB level', async () => { + // token_endpoint is NOT NULL: the refresh grant has nowhere to go without it. + await expect( + sequelize.query( + 'INSERT INTO ai_mcp_oauth_credentials ' + + '(user_id, mcp_server_id, refresh_token_enc, enc_key_version, created_at, updated_at) ' + + "VALUES (7, 'mcp-server-1', :blob, 1, :now, :now)", + { replacements: { blob: Buffer.from('no-endpoint'), now: new Date() } }, + ), + ).rejects.toThrow(); + }); + + it('enforces the UNIQUE (user_id, mcp_server_id) constraint at the DB level', async () => { + // Direct insert bypassing upsert proves the constraint exists in the schema, not just app logic. + await store.upsert(makeCredential()); + + await expect( + sequelize.query( + 'INSERT INTO ai_mcp_oauth_credentials ' + + '(user_id, mcp_server_id, refresh_token_enc, token_endpoint, enc_key_version, ' + + 'created_at, updated_at) ' + + "VALUES (42, 'mcp-server-1', :blob, :tokenEndpoint, 1, :now, :now)", + { + replacements: { + blob: Buffer.from('dup'), + tokenEndpoint: 'https://auth.example.com/token', + now: new Date(), + }, + }, + ), + ).rejects.toThrow(); + }); + }); + + describe('failure handling', () => { + it('logs and rethrows when the migration fails', async () => { + const badSequelize = new Sequelize({ + dialect: 'sqlite', + storage: ':memory:', + logging: false, + }); + const badStore = new DatabaseMcpOAuthCredentialsStore({ sequelize: badSequelize }); + + // Break the query interface so createTable fails mid-migration. + jest + .spyOn(badSequelize.getQueryInterface(), 'createTable') + .mockRejectedValueOnce(new Error('disk full')); + + const logger = jest.fn(); + await expect(badStore.init(logger)).rejects.toThrow('disk full'); + expect(logger).toHaveBeenCalledWith( + 'Error', + 'MCP OAuth credentials migration failed', + expect.objectContaining({ error: 'disk full' }), + ); + + await badSequelize.close(); + }); + + it('close() catches and logs the error instead of throwing', async () => { + const logger = jest.fn(); + jest.spyOn(sequelize, 'close').mockRejectedValueOnce(new Error('close failed')); + + await expect(store.close(logger)).resolves.toBeUndefined(); + expect(logger).toHaveBeenCalledWith( + 'Error', + 'Failed to close database connection', + expect.objectContaining({ error: 'close failed' }), + ); + }); + }); +}); diff --git a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts new file mode 100644 index 0000000000..e230630488 --- /dev/null +++ b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts @@ -0,0 +1,136 @@ +/** + * Spec for the in-memory MCP OAuth credentials store (dev / --in-memory). + * + * Behaviour mirrors the database store's contract, minus persistence: + * - One entry per (userId, mcpServerId) — upsert overwrites in place. + * - Stores opaque encrypted bytes (no crypto in the store itself). + * - get returns null for unknown keys; delete is a no-op for unknown keys. + * - State is process-local and lost on restart (same throwaway semantics as InMemoryStore). + */ +import type { McpOAuthCredentialInput } from '../../src/ports/mcp-oauth-credentials-store'; + +import InMemoryMcpOAuthCredentialsStore from '../../src/stores/in-memory-mcp-oauth-credentials-store'; + +function makeCredential(overrides: Partial = {}): McpOAuthCredentialInput { + return { + userId: 42, + mcpServerId: 'mcp-server-1', + refreshTokenEnc: Buffer.from('enc-refresh-token'), + clientId: 'client-abc', + clientSecretEnc: Buffer.from('enc-client-secret'), + clientSecretExpiresAt: null, + tokenEndpoint: 'https://auth.example.com/token', + tokenEndpointAuthMethod: 'client_secret_post', + scopes: 'read write', + encKeyVersion: 1, + ...overrides, + }; +} + +// Asserts presence and narrows the type — avoids non-null assertions (`!`), which the codebase avoids. +function unwrap(value: T | null | undefined): T { + if (value === null || value === undefined) { + throw new Error('expected a stored credential, got null/undefined'); + } + + return value; +} + +describe('InMemoryMcpOAuthCredentialsStore', () => { + let store: InMemoryMcpOAuthCredentialsStore; + + beforeEach(() => { + store = new InMemoryMcpOAuthCredentialsStore(); + }); + + describe('get', () => { + it('returns null for an unknown (userId, mcpServerId)', async () => { + expect(await store.get(999, 'no-such-server')).toBeNull(); + }); + + it('returns the stored credential for a known (userId, mcpServerId)', async () => { + await store.upsert(makeCredential()); + + expect(await store.get(42, 'mcp-server-1')).toEqual( + expect.objectContaining({ + userId: 42, + mcpServerId: 'mcp-server-1', + tokenEndpoint: 'https://auth.example.com/token', + encKeyVersion: 1, + }), + ); + }); + + it('preserves the encrypted blobs byte-for-byte', async () => { + const refreshTokenEnc = Buffer.from([0x00, 0x01, 0xfe, 0xff]); + await store.upsert(makeCredential({ refreshTokenEnc })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + + expect(row.refreshTokenEnc.toString('hex')).toBe(refreshTokenEnc.toString('hex')); + }); + }); + + describe('upsert', () => { + it('overwrites the existing entry in place for the same key', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old'), encKeyVersion: 1 })); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('new'), encKeyVersion: 2 })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + + expect(row.refreshTokenEnc.toString()).toBe('new'); + expect(row.encKeyVersion).toBe(2); + }); + + it('assigns a positive integer id', async () => { + await store.upsert(makeCredential()); + + expect(unwrap(await store.get(42, 'mcp-server-1')).id).toBeGreaterThanOrEqual(1); + }); + }); + + describe('isolation', () => { + it('keeps entries for different users and servers separate', async () => { + await store.upsert(makeCredential({ userId: 1, refreshTokenEnc: Buffer.from('user-1') })); + await store.upsert(makeCredential({ userId: 2, refreshTokenEnc: Buffer.from('user-2') })); + await store.upsert( + makeCredential({ mcpServerId: 'server-b', refreshTokenEnc: Buffer.from('b') }), + ); + + expect(unwrap(await store.get(1, 'mcp-server-1')).refreshTokenEnc.toString()).toBe('user-1'); + expect(unwrap(await store.get(2, 'mcp-server-1')).refreshTokenEnc.toString()).toBe('user-2'); + expect(unwrap(await store.get(42, 'server-b')).refreshTokenEnc.toString()).toBe('b'); + }); + }); + + describe('delete', () => { + it('removes the credential for a (userId, mcpServerId)', async () => { + await store.upsert(makeCredential()); + + await store.delete(42, 'mcp-server-1'); + + expect(await store.get(42, 'mcp-server-1')).toBeNull(); + }); + + it('is a no-op (does not throw) for an unknown credential', async () => { + await expect(store.delete(999, 'no-such-server')).resolves.toBeUndefined(); + }); + + it('does not affect other users when deleting one user', async () => { + await store.upsert(makeCredential({ userId: 1 })); + await store.upsert(makeCredential({ userId: 2 })); + + await store.delete(1, 'mcp-server-1'); + + expect(await store.get(1, 'mcp-server-1')).toBeNull(); + expect(await store.get(2, 'mcp-server-1')).not.toBeNull(); + }); + }); + + describe('lifecycle', () => { + it('init and close are no-ops that resolve', async () => { + await expect(store.init()).resolves.toBeUndefined(); + await expect(store.close()).resolves.toBeUndefined(); + }); + }); +});