diff --git a/apps/cloud/src/services/mcp-worker-transport.test.ts b/apps/cloud/src/services/mcp-worker-transport.test.ts new file mode 100644 index 000000000..b0b04203e --- /dev/null +++ b/apps/cloud/src/services/mcp-worker-transport.test.ts @@ -0,0 +1,107 @@ +import { describe, expect, it } from "@effect/vitest"; + +import { JsonRpcRequestIdQueue, PREVIOUS_REQUEST_TIMEOUT_MS } from "./mcp-worker-transport"; + +const jsonRpcRequest = (body: unknown): Request => + new Request("https://example.invalid/mcp", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + }); + +describe("JsonRpcRequestIdQueue", () => { + it("serialises requests with the same id", async () => { + const queue = new JsonRpcRequestIdQueue(); + const order: string[] = []; + + let releaseFirst!: () => void; + const firstStarted = new Promise((resolve) => { + const firstRunning = new Promise((release) => { + releaseFirst = release; + }); + queue.run(jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), async () => { + order.push("first:start"); + resolve(); + await firstRunning; + order.push("first:end"); + }); + }); + + await firstStarted; + + const secondDone = queue.run( + jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), + async () => { + order.push("second"); + }, + ); + + // Second must wait for first. + await new Promise((r) => setTimeout(r, 20)); + expect(order).toEqual(["first:start"]); + + releaseFirst(); + await secondDone; + expect(order).toEqual(["first:start", "first:end", "second"]); + }); + + it("does not block requests with different ids", async () => { + const queue = new JsonRpcRequestIdQueue(); + let release!: () => void; + const hung = new Promise((resolve) => { + release = resolve; + }); + + queue.run(jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), () => hung); + + const otherDone = await Promise.race([ + queue + .run(jsonRpcRequest({ jsonrpc: "2.0", id: 2, method: "tools/call" }), async () => "done") + .then((v) => ({ kind: "settled" as const, v })), + new Promise<{ kind: "blocked" }>((r) => + setTimeout(() => r({ kind: "blocked" }), 200), + ), + ]); + + expect(otherDone.kind).toBe("settled"); + release(); + }); + + it("regression: caps wait on a hung previous request and dispatches anyway", async () => { + // Override the timeout for fast CI — the production default is + // PREVIOUS_REQUEST_TIMEOUT_MS (60s) which we cap test-side to 100ms. + // Same behaviour, same code path; only the wall-clock budget changes. + const queue = new JsonRpcRequestIdQueue({ previousTimeoutMs: 100 }); + const order: string[] = []; + + // Kick off a request and never release it — the poisoned-queue + // shape that used to cascade for the full upstream 180s timeout. + const firstStarted = new Promise((resolve) => { + queue.run(jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), async () => { + order.push("first:start"); + resolve(); + await new Promise(() => undefined); // hang forever + }); + }); + await firstStarted; + + const result = await queue.run( + jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), + async () => { + order.push("second"); + return "ok"; + }, + ); + + expect(result).toBe("ok"); + expect(order).toEqual(["first:start", "second"]); + }); + + it("exposes a sane production timeout", () => { + // Sanity guard: must stay below the 180s upstream timeout that + // Claude / Cowork enforce, but be long enough to outlast a normal + // dynamic-worker execution under load. + expect(PREVIOUS_REQUEST_TIMEOUT_MS).toBeGreaterThan(10_000); + expect(PREVIOUS_REQUEST_TIMEOUT_MS).toBeLessThan(180_000); + }); +}); diff --git a/apps/cloud/src/services/mcp-worker-transport.ts b/apps/cloud/src/services/mcp-worker-transport.ts index 8ce4c6cdd..8a4794fc5 100644 --- a/apps/cloud/src/services/mcp-worker-transport.ts +++ b/apps/cloud/src/services/mcp-worker-transport.ts @@ -79,8 +79,22 @@ const extractJsonRpcRequestIdKeys = async (request: Request): Promise>(); + private readonly previousTimeoutMs: number; + + constructor(options: { readonly previousTimeoutMs?: number } = {}) { + this.previousTimeoutMs = options.previousTimeoutMs ?? PREVIOUS_REQUEST_TIMEOUT_MS; + } async run(request: Request, run: () => Promise): Promise { const ids = [...new Set(await extractJsonRpcRequestIdKeys(request))]; @@ -96,7 +110,18 @@ class JsonRpcRequestIdQueue { } try { - await Promise.all(previous.map((p) => p.catch(() => undefined))); + if (previous.length > 0) { + const settled = Promise.all(previous.map((p) => p.catch(() => undefined))); + const timeout = new Promise<"timeout">((resolve) => + setTimeout(() => resolve("timeout"), this.previousTimeoutMs), + ); + const outcome = await Promise.race([settled.then(() => "settled" as const), timeout]); + if (outcome === "timeout") { + console.warn( + `[mcp-worker-transport] previous in-flight request for ids=${JSON.stringify(ids)} did not release within ${this.previousTimeoutMs}ms; proceeding anyway`, + ); + } + } return await run(); } finally { for (const id of ids) { diff --git a/bun.lock b/bun.lock index dcfe21a14..bc304ce02 100644 --- a/bun.lock +++ b/bun.lock @@ -451,8 +451,10 @@ "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "effect": "catalog:", + "sucrase": "^3.35.1", }, "devDependencies": { + "@effect/vitest": "catalog:", "@types/node": "catalog:", "bun-types": "catalog:", "tsup": "catalog:", diff --git a/packages/core/execution/src/description.ts b/packages/core/execution/src/description.ts index ebb616c46..fcf4792e2 100644 --- a/packages/core/execution/src/description.ts +++ b/packages/core/execution/src/description.ts @@ -54,6 +54,7 @@ const formatDescription = (sources: readonly Source[]): string => { "- For tools that return large collections (e.g. `getStates`, `getAll`), filter results in code rather than calling per-item tools.", "- Do not use `fetch` — all API calls go through `tools.*`.", "- If execution pauses for interaction, resume it with the returned `resumePayload`.", + "- TypeScript type syntax (`: T`, `as T`, generics, interfaces, type aliases) is stripped before execution — feel free to write idiomatic TypeScript using the shapes from `tools.describe.tool()`. Decorators and `enum` are not supported.", ]; if (sources.length > 0) { diff --git a/packages/core/execution/src/engine.test.ts b/packages/core/execution/src/engine.test.ts new file mode 100644 index 000000000..49c3b8565 --- /dev/null +++ b/packages/core/execution/src/engine.test.ts @@ -0,0 +1,88 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Data, Effect, Exit } from "effect"; + +import { createExecutor, definePlugin, makeTestConfig } from "@executor-js/sdk"; +import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core"; + +import { createExecutionEngine } from "./engine"; + +// Regression for the hang reported as the executor-MCP "180s timeout" against +// Cowork (Claude web). Cowork goes down the `executeWithPause` branch because +// it doesn't advertise managed elicitation. When the dynamic worker fails +// fast (e.g. user submits TS with a `:` type annotation, "Unexpected token +// ':'" inside ~25ms), the failure was swallowed and the request hung until +// the client gave up at 180s. The cause was `Effect.race` having +// prefer-success semantics in Effect v4: the racing pause-signal Deferred +// never resolves, so a fiber failure is never observed by the racer. + +class FakeRuntimeError extends Data.TaggedError("FakeRuntimeError")<{ + readonly message: string; +}> {} + +const failingExecutor: CodeExecutor = { + execute: () => + Effect.fail(new FakeRuntimeError({ message: "Unexpected token ':'" })), +}; + +const succeedingExecutor: CodeExecutor = { + execute: () => + Effect.succeed({ result: "ok", logs: [] } satisfies ExecuteResult), +}; + +const emptyPlugin = definePlugin(() => ({ + id: "empty-test" as const, + storage: () => ({}), + staticSources: () => [], +})); + +const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const })); + +describe("executeWithPause failure propagation", () => { + it.effect("surfaces a fast codeExecutor failure as an Exit.Failure", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const engine = createExecutionEngine({ + executor, + codeExecutor: failingExecutor, + }); + + const exit = yield* Effect.exit(engine.executeWithPause("noop")); + expect(Exit.isFailure(exit)).toBe(true); + }), + ); + + it.effect("does not hang when codeExecutor fails", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const engine = createExecutionEngine({ + executor, + codeExecutor: failingExecutor, + }); + + // Race the executeWithPause against a short sleep. With the bug + // present this resolves to "hung" because the failure is swallowed + // by the prefer-success race against the pause Deferred. + const outcome = yield* Effect.race( + Effect.exit(engine.executeWithPause("noop")).pipe( + Effect.map((exit) => ({ kind: "settled" as const, exit })), + ), + Effect.sleep("500 millis").pipe(Effect.as({ kind: "hung" as const })), + ); + + expect(outcome.kind).toBe("settled"); + }), + ); + + it.effect("control: succeedingExecutor returns completed", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const engine = createExecutionEngine({ + executor, + codeExecutor: succeedingExecutor, + }); + + const result = yield* engine.executeWithPause("noop"); + expect(result.status).toBe("completed"); + }), + ); +}); diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index d9670efd8..b902c93d3 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -353,12 +353,20 @@ export const createExecutionEngine = < * Race a running fiber against a pause signal. Returns when either * the fiber completes or an elicitation handler fires (whichever * comes first). Re-used by both executeWithPause and resume. + * + * `Effect.raceFirst` (not `Effect.race`) — `race` has prefer-success + * semantics in Effect v4 ("first successful result"), which means a + * fiber failure waits indefinitely for the pause Deferred to succeed. + * For a fast `codeExecutor.execute` failure (e.g. a syntax error + * inside the dynamic worker) the pause signal never fires, so the + * outer Effect hangs until the upstream client gives up. `raceFirst` + * settles on whichever side completes first, success or failure. */ const awaitCompletionOrPause = ( fiber: Fiber.Fiber, pauseSignal: Deferred.Deferred>, ): Effect.Effect => - Effect.race( + Effect.raceFirst( Fiber.join(fiber).pipe( Effect.map((result): ExecutionResult => ({ status: "completed", result })), ), diff --git a/packages/kernel/core/package.json b/packages/kernel/core/package.json index bb55fcadd..febf8f384 100644 --- a/packages/kernel/core/package.json +++ b/packages/kernel/core/package.json @@ -41,9 +41,11 @@ "@standard-schema/spec": "^1.0.0", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", - "effect": "catalog:" + "effect": "catalog:", + "sucrase": "^3.35.1" }, "devDependencies": { + "@effect/vitest": "catalog:", "@types/node": "catalog:", "bun-types": "catalog:", "tsup": "catalog:", diff --git a/packages/kernel/core/src/index.ts b/packages/kernel/core/src/index.ts index 2b1643f4a..dcc792fa9 100644 --- a/packages/kernel/core/src/index.ts +++ b/packages/kernel/core/src/index.ts @@ -3,3 +3,4 @@ export * from "./validation"; export * from "./json-schema"; export * from "./effect-errors"; export * from "./code-recovery"; +export * from "./strip-types"; diff --git a/packages/kernel/core/src/strip-types.test.ts b/packages/kernel/core/src/strip-types.test.ts new file mode 100644 index 000000000..05d355e5a --- /dev/null +++ b/packages/kernel/core/src/strip-types.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "@effect/vitest"; + +import { stripTypeScript } from "./strip-types"; + +describe("stripTypeScript", () => { + it("removes variable type annotations", () => { + const out = stripTypeScript('const x: string = "hello"; return x;'); + expect(out).not.toContain(": string"); + expect(out).toContain('const x'); + expect(out).toContain('"hello"'); + }); + + it("removes function param and return type annotations", () => { + const out = stripTypeScript("function f(x: number): number { return x + 1; } return f(5);"); + expect(out).not.toContain(": number"); + // Result must still parse as JS — verify by Function ctor. + expect(() => new Function(out)).not.toThrow(); + }); + + it("removes `as` casts", () => { + const out = stripTypeScript("const x = (1 as number) + 2; return x;"); + expect(out).not.toContain("as number"); + expect(() => new Function(out)).not.toThrow(); + }); + + it("removes generic type arguments on call expressions", () => { + const out = stripTypeScript("const arr = Array.from([]); return arr;"); + expect(out).not.toContain(""); + expect(() => new Function(out)).not.toThrow(); + }); + + it("removes interface declarations", () => { + const out = stripTypeScript("interface User { name: string; } const u = { name: 'a' }; return u;"); + expect(out).not.toContain("interface User"); + expect(out).not.toContain(": string"); + expect(() => new Function(out)).not.toThrow(); + }); + + it("removes type alias declarations", () => { + const out = stripTypeScript("type Foo = string; const x = 'a'; return x;"); + expect(out).not.toContain("type Foo"); + expect(() => new Function(out)).not.toThrow(); + }); + + it("preserves plain JavaScript unchanged in semantics", () => { + const out = stripTypeScript("const x = 5; return x * 2;"); + expect(new Function(out)()).toBe(10); + }); + + it("rejects truly invalid syntax", () => { + // No TS interpretation will save this — sucrase should throw. + expect(() => stripTypeScript("const = 5;")).toThrow(); + }); + + it("regression: customer's failure shape (`Unexpected token ':'`)", () => { + // Closest reasonable shape to the trace at + // axiom://7bf76f79c5d807272781e9554040aab3 — typed annotation in + // a function expression. + const code = ` + const fetchDeals = async (sourceId: string): Promise> => { + const result = await tools.executor.sources.list(); + return result.items; + }; + return fetchDeals('dealcloud'); + `; + const out = stripTypeScript(code); + expect(out).not.toContain(": string"); + expect(out).not.toContain("Promise<"); + // Still parses as JS (we don't actually invoke `tools` here). + expect(() => new Function(out)).not.toThrow(); + }); +}); diff --git a/packages/kernel/core/src/strip-types.ts b/packages/kernel/core/src/strip-types.ts new file mode 100644 index 000000000..bfe19b7e8 --- /dev/null +++ b/packages/kernel/core/src/strip-types.ts @@ -0,0 +1,34 @@ +import { transform } from "sucrase"; + +/** + * Strip TypeScript type syntax (`: T`, `as T`, ``, type aliases, + * interfaces, etc.) from user-submitted code so it can run in a + * JavaScript-only sandbox (workerd's WorkerLoader, QuickJS, raw V8). + * Deno-style runtimes that handle TypeScript natively should skip + * this step — they get better source-map fidelity by parsing the + * original input. + * + * The execute tool description tells callers to write TypeScript, and + * `tools.describe.tool` hands them TypeScript shapes — without stripping, + * a single `: number` annotation throws "Unexpected token ':'" inside + * the sandbox, which used to surface as a 180s client-side timeout + * before the engine `raceFirst` fix. + * + * Sucrase's TypeScript transform is purely syntactic — no semantic + * checks, no decorator metadata — which keeps the cost low and matches + * what `tsc --isolatedModules` / Node's experimental type-stripping do. + * + * On parse failure we rethrow the original error so the caller can map + * it into the runtime's tagged error type. We deliberately do NOT fall + * back to the raw input — passing TS syntax through to a JS-only + * sandbox trades a clean error here for an opaque one downstream. + */ +export const stripTypeScript = (code: string): string => + transform(code, { + transforms: ["typescript"], + // No JSX in user code, no React-specific transforms. `disableESTransforms` + // keeps sucrase from rewriting `import`/`export` etc — we want only + // type-syntax removal. + disableESTransforms: true, + keepUnusedImports: true, + }).code; diff --git a/packages/kernel/runtime-dynamic-worker/src/executor.ts b/packages/kernel/runtime-dynamic-worker/src/executor.ts index e46e1a951..8a3ed5133 100644 --- a/packages/kernel/runtime-dynamic-worker/src/executor.ts +++ b/packages/kernel/runtime-dynamic-worker/src/executor.ts @@ -15,6 +15,7 @@ import * as Effect from "effect/Effect"; import { recoverExecutionBody, + stripTypeScript, type CodeExecutor, type ExecuteResult, type SandboxToolInvoker, @@ -399,24 +400,36 @@ const startDynamicWorker = ( options: DynamicWorkerExecutorOptions, code: string, timeoutMs: number, -): Effect.Effect => - Effect.sync((): DynamicWorkerEntrypoint => { - const recoveredBody = recoverExecutionBody(code); - const executorModule = buildExecutorModule(recoveredBody, timeoutMs); - const { [ENTRY_MODULE]: _, ...safeModules } = options.modules ?? {}; - - const worker = options.loader.get(`executor-${crypto.randomUUID()}`, () => ({ - compatibilityDate: "2025-06-01", - compatibilityFlags: ["nodejs_compat"], - mainModule: ENTRY_MODULE, - modules: { - ...safeModules, - [ENTRY_MODULE]: executorModule, - }, - globalOutbound: options.globalOutbound ?? null, - })); - - return asDynamicWorkerEntrypoint(worker.getEntrypoint()); +): Effect.Effect => + Effect.try({ + try: (): DynamicWorkerEntrypoint => { + const recoveredBody = recoverExecutionBody(code); + // The dynamic Worker isolate only accepts plain JavaScript; TS type + // syntax in user code (`: T`, `as T`, generics) would otherwise + // surface as "Unexpected token ':'" inside `evaluate()` and bubble + // out via DynamicWorkerExecutionError. Stripping here gives the + // model a clear syntax-error message at the front door instead. + const strippedBody = stripTypeScript(recoveredBody); + const executorModule = buildExecutorModule(strippedBody, timeoutMs); + const { [ENTRY_MODULE]: _, ...safeModules } = options.modules ?? {}; + + const worker = options.loader.get(`executor-${crypto.randomUUID()}`, () => ({ + compatibilityDate: "2025-06-01", + compatibilityFlags: ["nodejs_compat"], + mainModule: ENTRY_MODULE, + modules: { + ...safeModules, + [ENTRY_MODULE]: executorModule, + }, + globalOutbound: options.globalOutbound ?? null, + })); + + return asDynamicWorkerEntrypoint(worker.getEntrypoint()); + }, + catch: (cause) => + new DynamicWorkerExecutionError({ + message: renderTransportMessage(serializeWorkerErrorValue(cause)), + }), }).pipe( Effect.withSpan("executor.runtime.startup", { attributes: { diff --git a/packages/kernel/runtime-quickjs/src/index.ts b/packages/kernel/runtime-quickjs/src/index.ts index f4a2e22f5..3104bce7c 100644 --- a/packages/kernel/runtime-quickjs/src/index.ts +++ b/packages/kernel/runtime-quickjs/src/index.ts @@ -1,5 +1,6 @@ import { recoverExecutionBody, + stripTypeScript, type CodeExecutor, type ExecuteResult, type SandboxToolInvoker, @@ -87,7 +88,11 @@ const normalizeExecutionError = (cause: unknown, deadlineMs: number, timeoutMs: }; const buildExecutionSource = (code: string): string => { - const body = recoverExecutionBody(code); + // QuickJS evaluates plain JavaScript only; strip any TS type syntax + // first. A parse failure here throws a SyntaxError which the outer + // `Effect.tryPromise` maps to `QuickJsExecutionError` with the + // sucrase-formatted message intact. + const body = stripTypeScript(recoverExecutionBody(code)); return [ '"use strict";',