Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions apps/cloud/src/services/mcp-worker-transport.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { describe, expect, it } from "@effect/vitest";

import { JsonRpcRequestIdQueue, PREVIOUS_REQUEST_TIMEOUT_MS } from "./mcp-worker-transport";

const jsonRpcRequest = (body: unknown): Request =>
new Request("https://example.invalid/mcp", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(body),
});

describe("JsonRpcRequestIdQueue", () => {
it("serialises requests with the same id", async () => {
const queue = new JsonRpcRequestIdQueue();
const order: string[] = [];

let releaseFirst!: () => void;
const firstStarted = new Promise<void>((resolve) => {
const firstRunning = new Promise<void>((release) => {
releaseFirst = release;
});
queue.run(jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), async () => {
order.push("first:start");
resolve();
await firstRunning;
order.push("first:end");
});
});

await firstStarted;

const secondDone = queue.run(
jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }),
async () => {
order.push("second");
},
);

// Second must wait for first.
await new Promise((r) => setTimeout(r, 20));
expect(order).toEqual(["first:start"]);

releaseFirst();
await secondDone;
expect(order).toEqual(["first:start", "first:end", "second"]);
});

it("does not block requests with different ids", async () => {
const queue = new JsonRpcRequestIdQueue();
let release!: () => void;
const hung = new Promise<void>((resolve) => {
release = resolve;
});

queue.run(jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), () => hung);

const otherDone = await Promise.race([
queue
.run(jsonRpcRequest({ jsonrpc: "2.0", id: 2, method: "tools/call" }), async () => "done")
.then((v) => ({ kind: "settled" as const, v })),
new Promise<{ kind: "blocked" }>((r) =>
setTimeout(() => r({ kind: "blocked" }), 200),
),
]);

expect(otherDone.kind).toBe("settled");
release();
});

it("regression: caps wait on a hung previous request and dispatches anyway", async () => {
// Override the timeout for fast CI — the production default is
// PREVIOUS_REQUEST_TIMEOUT_MS (60s) which we cap test-side to 100ms.
// Same behaviour, same code path; only the wall-clock budget changes.
const queue = new JsonRpcRequestIdQueue({ previousTimeoutMs: 100 });
const order: string[] = [];

// Kick off a request and never release it — the poisoned-queue
// shape that used to cascade for the full upstream 180s timeout.
const firstStarted = new Promise<void>((resolve) => {
queue.run(jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }), async () => {
order.push("first:start");
resolve();
await new Promise(() => undefined); // hang forever
});
});
await firstStarted;

const result = await queue.run(
jsonRpcRequest({ jsonrpc: "2.0", id: 1, method: "tools/call" }),
async () => {
order.push("second");
return "ok";
},
);

expect(result).toBe("ok");
expect(order).toEqual(["first:start", "second"]);
});

it("exposes a sane production timeout", () => {
// Sanity guard: must stay below the 180s upstream timeout that
// Claude / Cowork enforce, but be long enough to outlast a normal
// dynamic-worker execution under load.
expect(PREVIOUS_REQUEST_TIMEOUT_MS).toBeGreaterThan(10_000);
expect(PREVIOUS_REQUEST_TIMEOUT_MS).toBeLessThan(180_000);
});
});
29 changes: 27 additions & 2 deletions apps/cloud/src/services/mcp-worker-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,22 @@ const extractJsonRpcRequestIdKeys = async (request: Request): Promise<ReadonlyAr
}
};

class JsonRpcRequestIdQueue {
// Hard ceiling on how long a same-id JSON-RPC request will wait for an
// earlier in-flight one to finish. Stays well under the 180s upstream
// client timeout that Claude / Cowork enforce, so a poisoned queue slot
// can't block the next request long enough for the client to give up.
// If a previous request hasn't released within the budget, we proceed
// anyway — at worst the MCP SDK rejects the second reply for a duplicate
// id, which is recoverable; a perma-stuck queue is not.
export const PREVIOUS_REQUEST_TIMEOUT_MS = 60_000;

export class JsonRpcRequestIdQueue {
private readonly inFlight = new Map<string, Promise<void>>();
private readonly previousTimeoutMs: number;

constructor(options: { readonly previousTimeoutMs?: number } = {}) {
this.previousTimeoutMs = options.previousTimeoutMs ?? PREVIOUS_REQUEST_TIMEOUT_MS;
}

async run<A>(request: Request, run: () => Promise<A>): Promise<A> {
const ids = [...new Set(await extractJsonRpcRequestIdKeys(request))];
Expand All @@ -96,7 +110,18 @@ class JsonRpcRequestIdQueue {
}

try {
await Promise.all(previous.map((p) => p.catch(() => undefined)));
if (previous.length > 0) {
const settled = Promise.all(previous.map((p) => p.catch(() => undefined)));
const timeout = new Promise<"timeout">((resolve) =>
setTimeout(() => resolve("timeout"), this.previousTimeoutMs),
);
const outcome = await Promise.race([settled.then(() => "settled" as const), timeout]);
if (outcome === "timeout") {
console.warn(
`[mcp-worker-transport] previous in-flight request for ids=${JSON.stringify(ids)} did not release within ${this.previousTimeoutMs}ms; proceeding anyway`,
);
}
}
return await run();
} finally {
for (const id of ids) {
Expand Down
2 changes: 2 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/execution/src/description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const formatDescription = (sources: readonly Source[]): string => {
"- For tools that return large collections (e.g. `getStates`, `getAll`), filter results in code rather than calling per-item tools.",
"- Do not use `fetch` — all API calls go through `tools.*`.",
"- If execution pauses for interaction, resume it with the returned `resumePayload`.",
"- TypeScript type syntax (`: T`, `as T`, generics, interfaces, type aliases) is stripped before execution — feel free to write idiomatic TypeScript using the shapes from `tools.describe.tool()`. Decorators and `enum` are not supported.",
];

if (sources.length > 0) {
Expand Down
88 changes: 88 additions & 0 deletions packages/core/execution/src/engine.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { describe, expect, it } from "@effect/vitest";
import { Data, Effect, Exit } from "effect";

import { createExecutor, definePlugin, makeTestConfig } from "@executor-js/sdk";
import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core";

import { createExecutionEngine } from "./engine";

// Regression for the hang reported as the executor-MCP "180s timeout" against
// Cowork (Claude web). Cowork goes down the `executeWithPause` branch because
// it doesn't advertise managed elicitation. When the dynamic worker fails
// fast (e.g. user submits TS with a `:` type annotation, "Unexpected token
// ':'" inside ~25ms), the failure was swallowed and the request hung until
// the client gave up at 180s. The cause was `Effect.race` having
// prefer-success semantics in Effect v4: the racing pause-signal Deferred
// never resolves, so a fiber failure is never observed by the racer.

class FakeRuntimeError extends Data.TaggedError("FakeRuntimeError")<{
readonly message: string;
}> {}

const failingExecutor: CodeExecutor<FakeRuntimeError> = {
execute: () =>
Effect.fail(new FakeRuntimeError({ message: "Unexpected token ':'" })),
};

const succeedingExecutor: CodeExecutor<FakeRuntimeError> = {
execute: () =>
Effect.succeed({ result: "ok", logs: [] } satisfies ExecuteResult),
};

const emptyPlugin = definePlugin(() => ({
id: "empty-test" as const,
storage: () => ({}),
staticSources: () => [],
}));

const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const }));

describe("executeWithPause failure propagation", () => {
it.effect("surfaces a fast codeExecutor failure as an Exit.Failure", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const engine = createExecutionEngine({
executor,
codeExecutor: failingExecutor,
});

const exit = yield* Effect.exit(engine.executeWithPause("noop"));
expect(Exit.isFailure(exit)).toBe(true);
}),
);

it.effect("does not hang when codeExecutor fails", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const engine = createExecutionEngine({
executor,
codeExecutor: failingExecutor,
});

// Race the executeWithPause against a short sleep. With the bug
// present this resolves to "hung" because the failure is swallowed
// by the prefer-success race against the pause Deferred.
const outcome = yield* Effect.race(
Effect.exit(engine.executeWithPause("noop")).pipe(
Effect.map((exit) => ({ kind: "settled" as const, exit })),
),
Effect.sleep("500 millis").pipe(Effect.as({ kind: "hung" as const })),
);

expect(outcome.kind).toBe("settled");
}),
);

it.effect("control: succeedingExecutor returns completed", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const engine = createExecutionEngine({
executor,
codeExecutor: succeedingExecutor,
});

const result = yield* engine.executeWithPause("noop");
expect(result.status).toBe("completed");
}),
);
});
10 changes: 9 additions & 1 deletion packages/core/execution/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,20 @@ export const createExecutionEngine = <
* Race a running fiber against a pause signal. Returns when either
* the fiber completes or an elicitation handler fires (whichever
* comes first). Re-used by both executeWithPause and resume.
*
* `Effect.raceFirst` (not `Effect.race`) — `race` has prefer-success
* semantics in Effect v4 ("first successful result"), which means a
* fiber failure waits indefinitely for the pause Deferred to succeed.
* For a fast `codeExecutor.execute` failure (e.g. a syntax error
* inside the dynamic worker) the pause signal never fires, so the
* outer Effect hangs until the upstream client gives up. `raceFirst`
* settles on whichever side completes first, success or failure.
*/
const awaitCompletionOrPause = (
fiber: Fiber.Fiber<ExecuteResult, E>,
pauseSignal: Deferred.Deferred<InternalPausedExecution<E>>,
): Effect.Effect<ExecutionResult, E> =>
Effect.race(
Effect.raceFirst(
Fiber.join(fiber).pipe(
Effect.map((result): ExecutionResult => ({ status: "completed", result })),
),
Expand Down
4 changes: 3 additions & 1 deletion packages/kernel/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
"@standard-schema/spec": "^1.0.0",
"ajv": "^8.17.1",
"ajv-formats": "^3.0.1",
"effect": "catalog:"
"effect": "catalog:",
"sucrase": "^3.35.1"
},
"devDependencies": {
"@effect/vitest": "catalog:",
"@types/node": "catalog:",
"bun-types": "catalog:",
"tsup": "catalog:",
Expand Down
1 change: 1 addition & 0 deletions packages/kernel/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from "./validation";
export * from "./json-schema";
export * from "./effect-errors";
export * from "./code-recovery";
export * from "./strip-types";
72 changes: 72 additions & 0 deletions packages/kernel/core/src/strip-types.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { describe, expect, it } from "@effect/vitest";

import { stripTypeScript } from "./strip-types";

describe("stripTypeScript", () => {
it("removes variable type annotations", () => {
const out = stripTypeScript('const x: string = "hello"; return x;');
expect(out).not.toContain(": string");
expect(out).toContain('const x');
expect(out).toContain('"hello"');
});

it("removes function param and return type annotations", () => {
const out = stripTypeScript("function f(x: number): number { return x + 1; } return f(5);");
expect(out).not.toContain(": number");
// Result must still parse as JS — verify by Function ctor.
expect(() => new Function(out)).not.toThrow();
});

it("removes `as` casts", () => {
const out = stripTypeScript("const x = (1 as number) + 2; return x;");
expect(out).not.toContain("as number");
expect(() => new Function(out)).not.toThrow();
});

it("removes generic type arguments on call expressions", () => {
const out = stripTypeScript("const arr = Array.from<string>([]); return arr;");
expect(out).not.toContain("<string>");
expect(() => new Function(out)).not.toThrow();
});

it("removes interface declarations", () => {
const out = stripTypeScript("interface User { name: string; } const u = { name: 'a' }; return u;");
expect(out).not.toContain("interface User");
expect(out).not.toContain(": string");
expect(() => new Function(out)).not.toThrow();
});

it("removes type alias declarations", () => {
const out = stripTypeScript("type Foo = string; const x = 'a'; return x;");
expect(out).not.toContain("type Foo");
expect(() => new Function(out)).not.toThrow();
});

it("preserves plain JavaScript unchanged in semantics", () => {
const out = stripTypeScript("const x = 5; return x * 2;");
expect(new Function(out)()).toBe(10);
});

it("rejects truly invalid syntax", () => {
// No TS interpretation will save this — sucrase should throw.
expect(() => stripTypeScript("const = 5;")).toThrow();
});

it("regression: customer's failure shape (`Unexpected token ':'`)", () => {
// Closest reasonable shape to the trace at
// axiom://7bf76f79c5d807272781e9554040aab3 — typed annotation in
// a function expression.
const code = `
const fetchDeals = async (sourceId: string): Promise<Array<{ id: string }>> => {
const result = await tools.executor.sources.list();
return result.items;
};
return fetchDeals('dealcloud');
`;
const out = stripTypeScript(code);
expect(out).not.toContain(": string");
expect(out).not.toContain("Promise<");
// Still parses as JS (we don't actually invoke `tools` here).
expect(() => new Function(out)).not.toThrow();
});
});
Loading
Loading