From c154c398d2a653230667bac804c113657468b98d Mon Sep 17 00:00:00 2001 From: Shubhdeep Sarkar Date: Tue, 30 Jun 2026 16:49:46 -0400 Subject: [PATCH 1/6] feat(cloud-agent): add event-driven agent host package - Add @mieweb/cloud-agent with hostAgent() API - Session DO for queue-driven turns, suspend/resume, alarms - Storage module for sessions, events, messages, activity_events, summaries - Add @mieweb/cloud-agent-cli with message-first dispatcher - Support --call (streaming) and -txt/--put (fire-and-forget) modes - Update README with new packages Co-authored-by: Cursor --- packages/README.md | 2 + packages/cloud-agent-cli/bin/agent-cli.js | 16 + packages/cloud-agent-cli/package.json | 37 +++ packages/cloud-agent-cli/src/client.ts | 230 ++++++++++++++ packages/cloud-agent-cli/src/index.ts | 28 ++ packages/cloud-agent-cli/src/parse.test.ts | 135 ++++++++ packages/cloud-agent-cli/src/parse.ts | 136 ++++++++ packages/cloud-agent-cli/src/run.ts | 156 +++++++++ packages/cloud-agent-cli/src/types.ts | 50 +++ packages/cloud-agent/package.json | 40 +++ packages/cloud-agent/src/host.ts | 178 +++++++++++ packages/cloud-agent/src/index.ts | 59 ++++ packages/cloud-agent/src/session.ts | 297 +++++++++++++++++ packages/cloud-agent/src/storage.test.ts | 172 ++++++++++ packages/cloud-agent/src/storage.ts | 350 +++++++++++++++++++++ packages/cloud-agent/src/types.ts | 244 ++++++++++++++ 16 files changed, 2130 insertions(+) create mode 100755 packages/cloud-agent-cli/bin/agent-cli.js create mode 100644 packages/cloud-agent-cli/package.json create mode 100644 packages/cloud-agent-cli/src/client.ts create mode 100644 packages/cloud-agent-cli/src/index.ts create mode 100644 packages/cloud-agent-cli/src/parse.test.ts create mode 100644 packages/cloud-agent-cli/src/parse.ts create mode 100644 packages/cloud-agent-cli/src/run.ts create mode 100644 packages/cloud-agent-cli/src/types.ts create mode 100644 packages/cloud-agent/package.json create mode 100644 packages/cloud-agent/src/host.ts create mode 100644 packages/cloud-agent/src/index.ts create mode 100644 packages/cloud-agent/src/session.ts create mode 100644 packages/cloud-agent/src/storage.test.ts create mode 100644 packages/cloud-agent/src/storage.ts create mode 100644 packages/cloud-agent/src/types.ts diff --git a/packages/README.md b/packages/README.md index ec7c4cc..3f81871 100644 --- a/packages/README.md +++ b/packages/README.md @@ -20,6 +20,8 @@ implement the same Cloudflare-shaped contract. | [`cloud-workers`](cloud-workers) | The `DurableObject` base. Backs the **`mieweb:workers`** virtual import: re-exports `cloudflare:workers` on Cloudflare (workerd export condition), pure-JS base everywhere else. | | [`cloud`](cloud) | Umbrella entry. Re-exports the contracts + `DurableObject` from one stable import surface (`@mieweb/cloud`). | | [`cloud-local`](cloud-local) | Local/Node **adapters** (the POC): D1→SQLite, R2→filesystem, KV→in-memory, Queues→in-process, Durable Objects→in-process registry. Vectorize/Workers AI surface explicit `UnsupportedBindingError`. Includes the Node **host harness** that runs the unchanged worker handler and a migration runner. | +| [`cloud-agent`](cloud-agent) | Event-driven **agent host**. Binds an agent definition + `AgentRuntime` to a Durable Object with queue-driven turns, suspend/resume, and alarms. Provides `hostAgent()` which returns DO class + worker wiring helpers. | +| [`cloud-agent-cli`](cloud-agent-cli) | Message-first **CLI dispatcher** for `cloud-agent`. Agent identity from `basename(argv[0])`. Agent-specific packages (`jerry`, `lisa`, etc.) wrap this with their config. | | [`cli`](cli) | The **`mieweb`** CLI. On the `cloudflare` target it delegates verbatim to `wrangler`; on other targets it drives the matching adapter. | ## How it wires into the app diff --git a/packages/cloud-agent-cli/bin/agent-cli.js b/packages/cloud-agent-cli/bin/agent-cli.js new file mode 100755 index 0000000..34ca9cb --- /dev/null +++ b/packages/cloud-agent-cli/bin/agent-cli.js @@ -0,0 +1,16 @@ +#!/usr/bin/env node +/** + * Generic agent CLI dispatcher. + * Agent name is determined by basename(argv[0]). + */ + +import { run } from '../src/index.js'; +import { basename } from 'path'; + +const agent = basename(process.argv[1]).replace(/\.(js|mjs|ts)$/, ''); + +run({ + agent, + baseUrl: process.env.JERRY_URL ?? 'http://127.0.0.1:8787', + version: '0.1.0', +}); diff --git a/packages/cloud-agent-cli/package.json b/packages/cloud-agent-cli/package.json new file mode 100644 index 0000000..88e1a52 --- /dev/null +++ b/packages/cloud-agent-cli/package.json @@ -0,0 +1,37 @@ +{ + "name": "@mieweb/cloud-agent-cli", + "version": "0.1.0", + "description": "Message-first CLI dispatcher for @mieweb/cloud-agent. Agent identity is determined by basename(argv[0]) (busybox/git multicall pattern).", + "type": "module", + "license": "MIT", + "author": "MIEWEB", + "homepage": "https://github.com/mieweb/cloud#readme", + "bugs": "https://github.com/mieweb/cloud/issues", + "repository": { + "type": "git", + "url": "git+https://github.com/mieweb/cloud.git", + "directory": "packages/cloud-agent-cli" + }, + "publishConfig": { + "access": "public" + }, + "exports": { + ".": { + "types": "./src/index.ts", + "default": "./src/index.ts" + } + }, + "bin": { + "agent-cli": "./bin/agent-cli.js" + }, + "files": [ + "src", + "bin" + ], + "scripts": { + "test": "node --import tsx --test src/**/*.test.ts" + }, + "devDependencies": { + "@types/node": "^20.0.0" + } +} diff --git a/packages/cloud-agent-cli/src/client.ts b/packages/cloud-agent-cli/src/client.ts new file mode 100644 index 0000000..0283c70 --- /dev/null +++ b/packages/cloud-agent-cli/src/client.ts @@ -0,0 +1,230 @@ +/** + * HTTP client for agent communication. + * Supports streaming (SSE) and fire-and-forget modes. + */ + +import type { StreamEvent } from "./types.js"; + +interface RequestOptions { + profile?: unknown; + cwd?: string; +} + +/** + * Send a message and stream the response. + * Uses Server-Sent Events for real-time streaming. + */ +export async function* streamCall( + baseUrl: string, + sessionId: string, + message: string, + options: RequestOptions +): AsyncGenerator { + const url = `${baseUrl}/v1/sessions/${sessionId}/messages`; + + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "text/event-stream, application/json", + }, + body: JSON.stringify({ + message, + profile: options.profile, + context: { cwd: options.cwd }, + }), + }); + + if (!response.ok) { + const text = await response.text(); + let errorMessage: string; + try { + const json = JSON.parse(text); + errorMessage = json.error ?? text; + } catch { + errorMessage = text; + } + yield { type: "error", message: errorMessage }; + return; + } + + const contentType = response.headers.get("content-type") ?? ""; + + // Handle SSE streaming + if (contentType.includes("text/event-stream")) { + yield* parseSSE(response); + return; + } + + // Handle JSON response (non-streaming) + const json = await response.json() as { + ok?: boolean; + message?: string; + status?: string; + suspended?: boolean; + error?: string; + }; + + if (json.error) { + yield { type: "error", message: json.error }; + return; + } + + yield { type: "start" }; + + if (json.message) { + yield { type: "text", text: json.message }; + } + + if (json.suspended) { + yield { + type: "suspended", + reason: json.status ?? "waiting_for_user", + message: json.message, + }; + } else { + yield { type: "finish", finishReason: "stop" }; + } +} + +/** + * Parse Server-Sent Events from a response. + */ +async function* parseSSE(response: Response): AsyncGenerator { + const reader = response.body?.getReader(); + if (!reader) { + yield { type: "error", message: "No response body" }; + return; + } + + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Parse complete SSE events + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; // Keep incomplete line in buffer + + for (const line of lines) { + if (line.startsWith("data: ")) { + const data = line.slice(6); + if (data === "[DONE]") { + yield { type: "finish", finishReason: "stop" }; + return; + } + + try { + const event = JSON.parse(data); + yield normalizeEvent(event); + } catch { + // Ignore malformed JSON + } + } + } + } + } finally { + reader.releaseLock(); + } +} + +/** + * Normalize server event to StreamEvent. + */ +function normalizeEvent(event: unknown): StreamEvent { + if (typeof event !== "object" || event === null) { + return { type: "error", message: "Invalid event" }; + } + + const e = event as Record; + + switch (e.type) { + case "start": + return { type: "start" }; + case "text-delta": + case "text": + return { type: "text", text: String(e.text ?? "") }; + case "tool-call": + return { + type: "tool-call", + toolName: String(e.toolName ?? ""), + input: e.input, + }; + case "tool-result": + return { + type: "tool-result", + toolName: String(e.toolName ?? ""), + output: e.output, + }; + case "finish": + return { type: "finish", finishReason: String(e.finishReason ?? "stop") }; + case "error": + return { type: "error", message: String(e.message ?? "Unknown error") }; + case "suspend": + case "suspended": + return { + type: "suspended", + reason: String(e.reason ?? "waiting_for_user"), + message: e.message as string | undefined, + }; + default: + return { type: "error", message: `Unknown event type: ${e.type}` }; + } +} + +/** + * Send a message and return immediately (fire-and-forget). + * The agent processes the message asynchronously. + */ +export async function fireAndForget( + baseUrl: string, + sessionId: string, + message: string, + options: RequestOptions +): Promise<{ eventId: string; status: string }> { + const url = `${baseUrl}/v1/sessions/${sessionId}/enqueue`; + + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + message, + profile: options.profile, + context: { cwd: options.cwd }, + }), + }); + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Request failed: ${text}`); + } + + const json = await response.json() as { eventId: string; status: string }; + return json; +} + +/** + * Get session status. + */ +export async function getStatus( + baseUrl: string, + sessionId: string +): Promise<{ status: string; continuation?: unknown }> { + const url = `${baseUrl}/v1/sessions/${sessionId}/status`; + + const response = await fetch(url); + + if (!response.ok) { + const text = await response.text(); + throw new Error(`Request failed: ${text}`); + } + + return response.json() as Promise<{ status: string; continuation?: unknown }>; +} diff --git a/packages/cloud-agent-cli/src/index.ts b/packages/cloud-agent-cli/src/index.ts new file mode 100644 index 0000000..49a9d64 --- /dev/null +++ b/packages/cloud-agent-cli/src/index.ts @@ -0,0 +1,28 @@ +/** + * @mieweb/cloud-agent-cli — Message-first CLI dispatcher for @mieweb/cloud-agent. + * + * Agent identity is determined by basename(argv[0]) (busybox/git multicall pattern). + * Agent-specific packages wrap this with their config. + * + * @example + * ```ts + * // packages/cli/bin/jerry.js + * import { run } from '@mieweb/cloud-agent-cli'; + * + * run({ + * agent: 'jerry', + * baseUrl: process.env.JERRY_URL ?? 'http://127.0.0.1:8787', + * version: '0.1.0', + * }); + * ``` + */ + +export { run } from "./run.js"; +export { parseArgs } from "./parse.js"; +export { streamCall, fireAndForget, getStatus } from "./client.js"; +export type { + CliConfig, + ParsedCommand, + CliOptions, + StreamEvent, +} from "./types.js"; diff --git a/packages/cloud-agent-cli/src/parse.test.ts b/packages/cloud-agent-cli/src/parse.test.ts new file mode 100644 index 0000000..2cd89d2 --- /dev/null +++ b/packages/cloud-agent-cli/src/parse.test.ts @@ -0,0 +1,135 @@ +/** + * Tests for CLI argument parsing. + */ + +import { describe, it, beforeEach, afterEach } from "node:test"; +import assert from "node:assert"; +import { parseArgs } from "./parse.js"; + +describe("parseArgs", () => { + let originalEnv: NodeJS.ProcessEnv; + + beforeEach(() => { + originalEnv = { ...process.env }; + delete process.env.JERRY_SESSION; + }); + + afterEach(() => { + process.env = originalEnv; + }); + + describe("message-first behavior", () => { + it("treats non-flag args as a message for --call", () => { + const { command } = parseArgs(["hello", "world"]); + assert.strictEqual(command.type, "call"); + if (command.type === "call") { + assert.strictEqual(command.message, "hello world"); + } + }); + + it("joins all args with spaces", () => { + const { command } = parseArgs(["summarize", "my", "last", "2", "hours"]); + assert.strictEqual(command.type, "call"); + if (command.type === "call") { + assert.strictEqual(command.message, "summarize my last 2 hours"); + } + }); + + it("returns help when no args", () => { + const { command } = parseArgs([]); + assert.strictEqual(command.type, "help"); + }); + }); + + describe("flags", () => { + it("parses --help", () => { + const { command } = parseArgs(["--help"]); + assert.strictEqual(command.type, "help"); + }); + + it("parses -h", () => { + const { command } = parseArgs(["-h"]); + assert.strictEqual(command.type, "help"); + }); + + it("parses --version", () => { + const { command } = parseArgs(["--version"]); + assert.strictEqual(command.type, "version"); + }); + + it("parses -v", () => { + const { command } = parseArgs(["-v"]); + assert.strictEqual(command.type, "version"); + }); + + it("parses -txt with message", () => { + const { command } = parseArgs(["-txt", "quick", "note"]); + assert.strictEqual(command.type, "put"); + if (command.type === "put") { + assert.strictEqual(command.message, "quick note"); + } + }); + + it("parses --put with message", () => { + const { command } = parseArgs(["--put", "enqueue", "this"]); + assert.strictEqual(command.type, "put"); + if (command.type === "put") { + assert.strictEqual(command.message, "enqueue this"); + } + }); + + it("parses --debug with message", () => { + const { command, options } = parseArgs(["--debug", "test", "message"]); + assert.strictEqual(command.type, "debug"); + assert.strictEqual(options.debug, true); + if (command.type === "debug") { + assert.strictEqual(command.message, "test message"); + } + }); + + it("parses --config", () => { + const { command } = parseArgs(["--config"]); + assert.strictEqual(command.type, "config"); + }); + + it("parses --report", () => { + const { command } = parseArgs(["--report", "daily"]); + assert.strictEqual(command.type, "report"); + if (command.type === "report") { + assert.deepStrictEqual(command.args, ["daily"]); + } + }); + }); + + describe("session handling", () => { + it("uses JERRY_SESSION from env", () => { + process.env.JERRY_SESSION = "test-session-123"; + const { command, options } = parseArgs(["hello"]); + assert.strictEqual(options.sessionId, "test-session-123"); + }); + + it("parses --session flag", () => { + const { command, options } = parseArgs(["--session", "my-session", "hello", "world"]); + assert.strictEqual(options.sessionId, "my-session"); + assert.strictEqual(command.type, "call"); + if (command.type === "call") { + assert.strictEqual(command.message, "hello world"); + } + }); + }); + + describe("edge cases", () => { + it("treats unknown flags as message content", () => { + const { command } = parseArgs(["--unknown", "flag"]); + assert.strictEqual(command.type, "call"); + if (command.type === "call") { + assert.strictEqual(command.message, "--unknown flag"); + } + }); + + it("returns help for -txt with no message", () => { + const { command } = parseArgs(["-txt"]); + assert.strictEqual(command.type, "help"); + }); + }); +}); diff --git a/packages/cloud-agent-cli/src/parse.ts b/packages/cloud-agent-cli/src/parse.ts new file mode 100644 index 0000000..1b74d6f --- /dev/null +++ b/packages/cloud-agent-cli/src/parse.ts @@ -0,0 +1,136 @@ +/** + * CLI argument parsing. + * Message-first: all args are joined as a message unless the first arg is a flag. + */ + +import type { ParsedCommand, CliOptions } from "./types.js"; + +/** + * Parse CLI arguments into a command and options. + */ +export function parseArgs(args: string[]): { + command: ParsedCommand; + options: CliOptions; +} { + const options: CliOptions = { + sessionId: process.env.JERRY_SESSION, + cwd: process.cwd(), + }; + + if (args.length === 0) { + return { command: { type: "help" }, options }; + } + + const first = args[0]; + + // Check for flags + if (first.startsWith("-")) { + return parseFlag(first, args.slice(1), options); + } + + // Default: --call with message + const message = args.join(" "); + return { + command: { type: "call", message, sessionId: options.sessionId }, + options, + }; +} + +/** + * Parse a flag-based command. + */ +function parseFlag( + flag: string, + rest: string[], + options: CliOptions +): { command: ParsedCommand; options: CliOptions } { + switch (flag) { + case "-h": + case "--help": + return { command: { type: "help" }, options }; + + case "-v": + case "--version": + return { command: { type: "version" }, options }; + + case "-txt": + case "--put": + if (rest.length === 0) { + return { command: { type: "help" }, options }; + } + return { + command: { + type: "put", + message: rest.join(" "), + sessionId: options.sessionId, + }, + options, + }; + + case "--call": + if (rest.length === 0) { + return { command: { type: "help" }, options }; + } + return { + command: { + type: "call", + message: rest.join(" "), + sessionId: options.sessionId, + }, + options, + }; + + case "-d": + case "--debug": + if (rest.length === 0) { + return { command: { type: "help" }, options }; + } + return { + command: { + type: "debug", + message: rest.join(" "), + sessionId: options.sessionId, + }, + options: { ...options, debug: true }, + }; + + case "--report": + return { + command: { type: "report", args: rest }, + options, + }; + + case "--config": + return { + command: { type: "config", args: rest }, + options, + }; + + case "-s": + case "--session": + if (rest.length === 0) { + return { command: { type: "help" }, options }; + } + options.sessionId = rest[0]; + if (rest.length === 1) { + return { command: { type: "help" }, options }; + } + // Parse remaining args as message + const message = rest.slice(1).join(" "); + return { + command: { type: "call", message, sessionId: options.sessionId }, + options, + }; + + default: + // Unknown flag, treat as message + return { + command: { + type: "call", + message: [flag, ...rest].join(" "), + sessionId: options.sessionId, + }, + options, + }; + } +} diff --git a/packages/cloud-agent-cli/src/run.ts b/packages/cloud-agent-cli/src/run.ts new file mode 100644 index 0000000..8f9e60d --- /dev/null +++ b/packages/cloud-agent-cli/src/run.ts @@ -0,0 +1,156 @@ +/** + * CLI run function for message-first agent interaction. + */ + +import type { CliConfig, CliOptions } from "./types.js"; +import { parseArgs } from "./parse.js"; +import { streamCall, fireAndForget } from "./client.js"; + +const DEFAULT_BASE_URL = "http://127.0.0.1:8787"; + +/** + * Run the CLI with the given config. + * Agent-specific wrappers call this with their config. + */ +export async function run(config: CliConfig): Promise { + const baseUrl = config.baseUrl ?? DEFAULT_BASE_URL; + const { command, options } = parseArgs(process.argv.slice(2)); + + switch (command.type) { + case "help": + printHelp(config.agent); + break; + + case "version": + console.log(`${config.agent} ${config.version ?? "0.0.0"}`); + break; + + case "call": + await handleCall(baseUrl, config, command.message, options); + break; + + case "put": + await handlePut(baseUrl, config, command.message, options); + break; + + case "debug": + options.debug = true; + await handleCall(baseUrl, config, command.message, options); + break; + + case "report": + console.log("Report mode not yet implemented"); + break; + + case "config": + console.log("Config:", JSON.stringify(config, null, 2)); + break; + } +} + +/** + * Handle --call (default): send message, stream reply. + */ +async function handleCall( + baseUrl: string, + config: CliConfig, + message: string, + options: CliOptions +): Promise { + const sessionId = options.sessionId ?? generateSessionId(); + + if (options.debug) { + console.error(`[debug] agent=${config.agent} session=${sessionId}`); + console.error(`[debug] baseUrl=${baseUrl}`); + console.error(`[debug] cwd=${options.cwd ?? process.cwd()}`); + } + + try { + for await (const event of streamCall(baseUrl, sessionId, message, { + profile: config.profile, + cwd: options.cwd ?? process.cwd(), + })) { + if (event.type === "text") { + process.stdout.write(event.text); + } else if (event.type === "tool-call" && options.debug) { + console.error(`[tool] ${event.toolName}(${JSON.stringify(event.input)})`); + } else if (event.type === "tool-result" && options.debug) { + console.error(`[tool-result] ${event.toolName}: ${JSON.stringify(event.output)}`); + } else if (event.type === "error") { + console.error(`\nError: ${event.message}`); + process.exitCode = 1; + } else if (event.type === "suspended") { + console.log(`\n[${event.reason}] ${event.message ?? ""}`); + console.log(`Session: ${sessionId}`); + } else if (event.type === "finish") { + if (options.debug) { + console.error(`\n[finish] reason=${event.finishReason}`); + } + } + } + console.log(); // Final newline + } catch (err) { + console.error(`Error: ${err instanceof Error ? err.message : err}`); + process.exitCode = 1; + } +} + +/** + * Handle -txt/--put: enqueue message, return immediately. + */ +async function handlePut( + baseUrl: string, + config: CliConfig, + message: string, + options: CliOptions +): Promise { + const sessionId = options.sessionId ?? generateSessionId(); + + try { + const result = await fireAndForget(baseUrl, sessionId, message, { + profile: config.profile, + cwd: options.cwd ?? process.cwd(), + }); + console.log(`Queued: session=${sessionId} eventId=${result.eventId}`); + } catch (err) { + console.error(`Error: ${err instanceof Error ? err.message : err}`); + process.exitCode = 1; + } +} + +/** + * Generate a session ID (for new conversations). + */ +function generateSessionId(): string { + return `session-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +/** + * Print help message. + */ +function printHelp(agent: string): void { + console.log(` +${agent} — message-first CLI + +Usage: + ${agent} Send message and stream reply (default --call) + ${agent} -txt Enqueue message, return immediately + ${agent} --put Same as -txt + ${agent} --help Show this help + ${agent} --version Show version + ${agent} --debug Send with debug output + ${agent} --config Show current config + +Arguments are joined with spaces, so quotes are only needed to preserve +multiple spaces or escape shell metacharacters. + +Examples: + ${agent} summarize my last 2 hours + ${agent} I just made this PR + ${agent} -txt quick note for the day + +Environment: + JERRY_URL Override base URL (default: http://127.0.0.1:8787) + JERRY_SESSION Override session ID +`.trim()); +} diff --git a/packages/cloud-agent-cli/src/types.ts b/packages/cloud-agent-cli/src/types.ts new file mode 100644 index 0000000..f0ad66c --- /dev/null +++ b/packages/cloud-agent-cli/src/types.ts @@ -0,0 +1,50 @@ +/** + * @mieweb/cloud-agent-cli type definitions + */ + +/** + * CLI configuration passed to run(). + */ +export interface CliConfig { + /** Agent name (used for routing and display) */ + agent: string; + /** Base URL for the agent server (default: http://127.0.0.1:8787) */ + baseUrl?: string; + /** Privacy profile to send with requests */ + profile?: unknown; + /** Version string for --version */ + version?: string; +} + +/** + * Parsed command from CLI arguments. + */ +export type ParsedCommand = + | { type: "call"; message: string; sessionId?: string } + | { type: "put"; message: string; sessionId?: string } + | { type: "help" } + | { type: "version" } + | { type: "debug"; message: string; sessionId?: string } + | { type: "report"; args: string[] } + | { type: "config"; args: string[] }; + +/** + * Options extracted from CLI flags. + */ +export interface CliOptions { + sessionId?: string; + debug?: boolean; + cwd?: string; +} + +/** + * Streaming response event from the agent. + */ +export type StreamEvent = + | { type: "start" } + | { type: "text"; text: string } + | { type: "tool-call"; toolName: string; input: unknown } + | { type: "tool-result"; toolName: string; output: unknown } + | { type: "finish"; finishReason: string } + | { type: "error"; message: string } + | { type: "suspended"; reason: string; message?: string }; diff --git a/packages/cloud-agent/package.json b/packages/cloud-agent/package.json new file mode 100644 index 0000000..45df0e1 --- /dev/null +++ b/packages/cloud-agent/package.json @@ -0,0 +1,40 @@ +{ + "name": "@mieweb/cloud-agent", + "version": "0.1.0", + "description": "Event-driven agent host for @mieweb/cloud. Binds an agent definition + AgentRuntime to a Durable Object with queue-driven turns, suspend/resume, and alarms.", + "type": "module", + "license": "MIT", + "author": "MIEWEB", + "homepage": "https://github.com/mieweb/cloud#readme", + "bugs": "https://github.com/mieweb/cloud/issues", + "repository": { + "type": "git", + "url": "git+https://github.com/mieweb/cloud.git", + "directory": "packages/cloud-agent" + }, + "publishConfig": { + "access": "public" + }, + "exports": { + ".": { + "types": "./src/index.ts", + "default": "./src/index.ts" + }, + "./types": { + "types": "./src/types.ts", + "default": "./src/types.ts" + } + }, + "files": [ + "src" + ], + "scripts": { + "test": "node --import tsx --test src/**/*.test.ts" + }, + "dependencies": { + "@mieweb/cloud-types": "workspace:*" + }, + "devDependencies": { + "@types/node": "^20.0.0" + } +} diff --git a/packages/cloud-agent/src/host.ts b/packages/cloud-agent/src/host.ts new file mode 100644 index 0000000..659a2b5 --- /dev/null +++ b/packages/cloud-agent/src/host.ts @@ -0,0 +1,178 @@ +/** + * hostAgent() — binds an agent definition + AgentRuntime to a Durable Object + * with queue-driven turns, suspend/resume, and alarms. + */ + +import type { + HostAgentConfig, + HostAgentResult, + HostEnv, + TurnJob, +} from "./types.js"; +import { createSessionClass } from "./session.js"; +import { initSchema, insertActivityEvent } from "./storage.js"; + +const json = (data: unknown, status = 200) => + new Response(JSON.stringify(data), { + status, + headers: { "content-type": "application/json" }, + }); + +/** + * Extract session ID from request. + * Looks for :id in path /v1/sessions/:id/... or X-Session-Id header. + */ +function extractSessionId(request: Request): string | null { + const url = new URL(request.url); + const match = url.pathname.match(/\/v1\/sessions\/([^/]+)/); + if (match) return match[1]; + return request.headers.get("X-Session-Id"); +} + +/** + * Extract user ID from request headers. + */ +function extractUserId(request: Request): string | undefined { + return request.headers.get("X-User-Id") ?? undefined; +} + +/** + * hostAgent() creates the wiring for an event-driven agent. + * + * Returns: + * - SessionClass: The DO class to export from the worker + * - handleFetch: Routes fetch requests to the appropriate DO + * - handleQueue: Processes queue messages by forwarding to DOs + * - handleScheduled: Optional cron handler + */ +export function hostAgent(config: HostAgentConfig): HostAgentResult { + const { agent, createRuntime } = config; + + const SessionClass = createSessionClass(agent, createRuntime); + + async function handleFetch( + request: Request, + env: HostEnv + ): Promise { + const url = new URL(request.url); + const path = url.pathname; + + await initSchema(env.DB); + + if (path === "/health") { + return json({ ok: true, agent: agent.name }); + } + + if (path === "/v1/events" && request.method === "POST") { + const events = await request.json() as Array<{ + source: string; + payload: unknown; + occurredAt: string; + }>; + + const ids: string[] = []; + for (const event of events) { + const id = await insertActivityEvent( + env.DB, + event.source, + event.payload, + event.occurredAt + ); + ids.push(id); + } + + return json({ ok: true, count: ids.length, ids }); + } + + const sessionId = extractSessionId(request); + if (!sessionId) { + return json({ error: "Missing session ID" }, 400); + } + + const id = env.SESSION.idFromName(sessionId); + const stub = env.SESSION.get(id); + + if (path.endsWith("/messages") && request.method === "POST") { + const body = await request.json() as { message: string; profile?: unknown }; + const userId = extractUserId(request); + + const doRequest = new Request(`${url.origin}/message`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + message: body.message, + userId, + profile: body.profile, + }), + }); + + return stub.fetch(doRequest); + } + + if (path.endsWith("/enqueue") && request.method === "POST") { + const body = await request.json() as { message: string }; + + const eventId = crypto.randomUUID(); + await env.JOBS.send({ + sessionId, + eventId, + message: body.message, + }); + + return json({ ok: true, sessionId, eventId, status: "queued" }); + } + + if (path.endsWith("/status")) { + const doRequest = new Request(`${url.origin}/status`); + return stub.fetch(doRequest); + } + + return json({ error: "Not found", path }, 404); + } + + async function handleQueue( + batch: { messages: Array<{ body: TurnJob; ack: () => void }> }, + env: HostEnv + ): Promise { + for (const message of batch.messages) { + const job = message.body; + + try { + const id = env.SESSION.idFromName(job.sessionId); + const stub = env.SESSION.get(id); + + const response = await stub.fetch( + new Request("http://internal/turn", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(job), + }) + ); + + if (!response.ok) { + const error = await response.text(); + console.error(`Turn failed for session ${job.sessionId}:`, error); + } + + message.ack(); + } catch (err) { + console.error(`Queue processing error for session ${job.sessionId}:`, err); + message.ack(); + } + } + } + + async function handleScheduled( + event: { cron: string }, + _env: HostEnv + ): Promise { + console.log(`Scheduled event: ${event.cron}`); + } + + return { + SessionClass, + handleFetch, + handleQueue, + handleScheduled, + }; +} diff --git a/packages/cloud-agent/src/index.ts b/packages/cloud-agent/src/index.ts new file mode 100644 index 0000000..5d32208 --- /dev/null +++ b/packages/cloud-agent/src/index.ts @@ -0,0 +1,59 @@ +/** + * @mieweb/cloud-agent — Event-driven agent host for @mieweb/cloud. + * + * Binds an agent definition + AgentRuntime to a Durable Object with + * queue-driven turns, suspend/resume, and alarms. + * + * @example + * ```ts + * import { hostAgent } from '@mieweb/cloud-agent'; + * + * const { SessionClass, handleFetch, handleQueue } = hostAgent({ + * agent: { name: 'jerry', instructions: '...', tools: [...] }, + * createRuntime: (profile) => resolveRuntime(profile), + * store: { db: env.DB }, + * }); + * + * export { SessionClass as AgentSession }; + * export default { + * fetch: (req, env) => handleFetch(req, env), + * queue: (batch, env) => handleQueue(batch, env), + * }; + * ``` + */ + +export { hostAgent } from "./host.js"; +export { createSessionClass } from "./session.js"; +export { + initSchema, + getOrCreateSession, + updateSessionStatus, + insertEvent, + getSessionEvents, + insertMessage, + getSessionMessages, + insertActivityEvent, + getActivityEvents, + insertSummary, +} from "./storage.js"; +export type { + SessionStatus, + EventType, + LifecycleEvent, + MessageRole, + ConversationMessage, + ContinuationState, + Session, + TurnJob, + AgentDefinition, + AgentRuntime, + TurnInput, + RuntimeEvent, + HostStore, + Trigger, + HostEnv, + HostAgentConfig, + ToolContext, + HostAgentResult, + AgentSessionDO, +} from "./types.js"; diff --git a/packages/cloud-agent/src/session.ts b/packages/cloud-agent/src/session.ts new file mode 100644 index 0000000..9aa2fa9 --- /dev/null +++ b/packages/cloud-agent/src/session.ts @@ -0,0 +1,297 @@ +/** + * AgentSession Durable Object. + * Handles the agent turn lifecycle: queue-driven turns, suspend/resume, alarms. + */ + +import type { CloudStatefulState } from "@mieweb/cloud-types"; +import type { + HostEnv, + AgentDefinition, + AgentRuntime, + TurnJob, + ContinuationState, + ToolContext, +} from "./types.js"; +import { + getOrCreateSession, + updateSessionStatus, + insertEvent, + insertMessage, + getSessionMessages, + initSchema, +} from "./storage.js"; + +const json = (data: unknown, status = 200) => + new Response(JSON.stringify(data), { + status, + headers: { "content-type": "application/json" }, + }); + +/** + * Creates an AgentSession DO class bound to the given agent definition and runtime factory. + */ +export function createSessionClass( + agent: AgentDefinition, + createRuntime: (profile?: unknown) => AgentRuntime +) { + return class AgentSession { + private state: CloudStatefulState; + private env: HostEnv; + private turnInProgress = false; + private suspendReason: "waiting_for_user" | "waiting_for_approval" | null = null; + private suspendMessage: string | null = null; + + constructor(state: CloudStatefulState, env: HostEnv) { + this.state = state; + this.env = env; + } + + async fetch(request: Request): Promise { + const url = new URL(request.url); + const sessionId = this.state.id.toString(); + + try { + await initSchema(this.env.DB); + + if (url.pathname === "/status") { + return this.handleStatus(sessionId); + } + + if (url.pathname === "/message" && request.method === "POST") { + return this.handleMessage(request, sessionId); + } + + if (url.pathname === "/turn" && request.method === "POST") { + return this.handleTurn(request, sessionId); + } + + if (url.pathname === "/alarm" && request.method === "POST") { + return this.handleAlarmTrigger(request, sessionId); + } + + return json({ error: "not found", path: url.pathname }, 404); + } catch (err) { + console.error("AgentSession error:", err); + return json( + { error: String(err instanceof Error ? err.message : err) }, + 500 + ); + } + } + + /** + * Alarm handler - fires when a scheduled wake occurs. + */ + async alarm(): Promise { + const sessionId = this.state.id.toString(); + const payload = await this.state.storage.get("alarm_payload"); + await this.state.storage.delete("alarm_payload"); + + await insertEvent(this.env.DB, sessionId, "scheduled_wake", payload); + + await this.env.JOBS.send({ + sessionId, + eventId: crypto.randomUUID(), + scheduledPayload: payload, + }); + } + + /** + * Get session status. + */ + private async handleStatus(sessionId: string): Promise { + const session = await getOrCreateSession(this.env.DB, sessionId); + return json({ + sessionId, + status: session.status, + continuation: session.continuation, + }); + } + + /** + * Handle incoming message - record event and enqueue turn. + */ + private async handleMessage( + request: Request, + sessionId: string + ): Promise { + const { message, userId, profile: _profile } = await request.json() as { + message: string; + userId?: string; + profile?: unknown; + }; + + const session = await getOrCreateSession(this.env.DB, sessionId, userId); + + if (session.status === "running") { + return json( + { error: "Turn already in progress", status: session.status }, + 409 + ); + } + + const eventId = await insertEvent(this.env.DB, sessionId, "user_message", { + message, + }); + await insertMessage(this.env.DB, sessionId, "user", message); + + const isResume = + session.status === "waiting_for_user" || + session.status === "waiting_for_approval"; + + if (isResume) { + await insertEvent(this.env.DB, sessionId, "resumed"); + } + + await this.env.JOBS.send({ + sessionId, + eventId, + message, + isResume, + }); + + await updateSessionStatus(this.env.DB, sessionId, "running"); + + return json({ + ok: true, + sessionId, + eventId, + status: "queued", + wasResume: isResume, + }); + } + + /** + * Handle a turn job (called from queue consumer via fetch). + */ + private async handleTurn( + request: Request, + sessionId: string + ): Promise { + if (this.turnInProgress) { + return json({ error: "Turn already in progress" }, 409); + } + + this.turnInProgress = true; + this.suspendReason = null; + this.suspendMessage = null; + + try { + // Read job from request (used for context, e.g. scheduledPayload) + const job = await request.json() as TurnJob; + void job; // Used in future for scheduledPayload handling + // Ensure session exists (also validates sessionId) + await getOrCreateSession(this.env.DB, sessionId); + + const messages = await getSessionMessages(this.env.DB, sessionId); + const coreMessages = messages.map((m) => ({ + role: m.role, + content: m.content, + })); + + const runtime = createRuntime(); + + // Tool context for tools to access bindings and control flow + // TODO: Pass this to tools when they're implemented + const toolContext: ToolContext = { + sessionId, + db: this.env.DB, + vectors: this.env.VECTORS, + bucket: this.env.BUCKET, + scheduleWake: async (at, payload) => { + const when = typeof at === "string" ? new Date(at) : at; + await this.state.storage.put("alarm_payload", payload); + await this.state.storage.setAlarm(when); + }, + suspendForUser: (message) => { + this.suspendReason = "waiting_for_user"; + this.suspendMessage = message; + }, + suspendForApproval: (message) => { + this.suspendReason = "waiting_for_approval"; + this.suspendMessage = message; + }, + }; + void toolContext; // Will be passed to tools when implemented + + let assistantContent = ""; + let finishReason = "stop"; + + for await (const event of runtime.runTurn({ + messages: coreMessages, + tools: agent.tools, + system: agent.instructions, + maxSteps: 10, + })) { + if (event.type === "text-delta") { + assistantContent += event.text; + } else if (event.type === "finish") { + finishReason = event.finishReason; + } else if (event.type === "suspend") { + this.suspendReason = event.reason; + this.suspendMessage = event.message ?? null; + } else if (event.type === "error") { + await insertEvent(this.env.DB, sessionId, "error", { + message: event.message, + }); + await updateSessionStatus(this.env.DB, sessionId, "idle"); + return json({ error: event.message }, 500); + } + } + + if (this.suspendReason) { + const continuation: ContinuationState = { + pendingMessage: this.suspendMessage ?? undefined, + suspendedAt: new Date().toISOString(), + reason: this.suspendReason, + }; + + await insertMessage(this.env.DB, sessionId, "assistant", assistantContent || this.suspendMessage); + await insertEvent(this.env.DB, sessionId, this.suspendReason, { + message: this.suspendMessage, + }); + await updateSessionStatus(this.env.DB, sessionId, this.suspendReason, continuation); + + return json({ + ok: true, + sessionId, + status: this.suspendReason, + message: assistantContent || this.suspendMessage, + suspended: true, + }); + } + + if (assistantContent) { + await insertMessage(this.env.DB, sessionId, "assistant", assistantContent); + await insertEvent(this.env.DB, sessionId, "agent_message", { + content: assistantContent, + finishReason, + }); + } + + await updateSessionStatus(this.env.DB, sessionId, "idle"); + + return json({ + ok: true, + sessionId, + status: "idle", + message: assistantContent, + finishReason, + }); + } finally { + this.turnInProgress = false; + } + } + + /** + * Handle alarm trigger (for internal routing). + */ + private async handleAlarmTrigger( + _request: Request, + sessionId: string + ): Promise { + await this.alarm(); + return json({ ok: true, sessionId, alarm: "triggered" }); + } + }; +} diff --git a/packages/cloud-agent/src/storage.test.ts b/packages/cloud-agent/src/storage.test.ts new file mode 100644 index 0000000..d4deeaf --- /dev/null +++ b/packages/cloud-agent/src/storage.test.ts @@ -0,0 +1,172 @@ +/** + * Tests for cloud-agent storage operations. + * Uses a mock CloudDatabase for testing. + */ + +import { describe, it, beforeEach } from "node:test"; +import assert from "node:assert"; + +/** + * Mock CloudDatabase implementation for testing. + * Stores data in memory. + */ +class MockDatabase { + private tables: Map = new Map(); + private execStatements: string[] = []; + + async exec(sql: string): Promise { + this.execStatements.push(sql); + const createTableMatches = sql.matchAll(/CREATE TABLE IF NOT EXISTS (\w+)/g); + for (const match of createTableMatches) { + if (!this.tables.has(match[1])) { + this.tables.set(match[1], []); + } + } + } + + prepare(sql: string) { + const db = this; + let boundValues: unknown[] = []; + + return { + bind(...values: unknown[]) { + boundValues = values; + return this; + }, + async run() { + const insertMatch = sql.match(/INSERT INTO (\w+)/i); + if (insertMatch) { + const table = insertMatch[1]; + const rows = db.tables.get(table) ?? []; + rows.push({ values: boundValues }); + db.tables.set(table, rows); + return { meta: { last_row_id: rows.length } }; + } + return {}; + }, + async first(): Promise { + const selectMatch = sql.match(/SELECT .* FROM (\w+) WHERE id = \?/i); + if (selectMatch) { + const table = selectMatch[1]; + const rows = db.tables.get(table) ?? []; + const row = rows.find((r: any) => r.values?.[0] === boundValues[0]); + if (row) { + return row as T; + } + } + return null; + }, + async all(): Promise<{ results: T[] }> { + const selectMatch = sql.match(/SELECT .* FROM (\w+)/i); + if (selectMatch) { + const table = selectMatch[1]; + const rows = db.tables.get(table) ?? []; + return { results: rows as T[] }; + } + return { results: [] }; + }, + }; + } + + getExecStatements() { + return this.execStatements; + } + + getTable(name: string) { + return this.tables.get(name); + } +} + +import { + initSchema, + getOrCreateSession, + updateSessionStatus, + insertEvent, + insertMessage, + getSessionMessages, + insertActivityEvent, + insertSummary, +} from "./storage.js"; + +describe("storage", () => { + let db: MockDatabase; + + beforeEach(() => { + db = new MockDatabase(); + }); + + describe("initSchema", () => { + it("creates all required tables", async () => { + await initSchema(db as any); + const statements = db.getExecStatements(); + assert.ok(statements.length > 0); + assert.ok(statements[0].includes("CREATE TABLE IF NOT EXISTS sessions")); + assert.ok(statements[0].includes("CREATE TABLE IF NOT EXISTS events")); + assert.ok(statements[0].includes("CREATE TABLE IF NOT EXISTS messages")); + assert.ok(statements[0].includes("CREATE TABLE IF NOT EXISTS activity_events")); + assert.ok(statements[0].includes("CREATE TABLE IF NOT EXISTS summaries")); + }); + }); + + describe("getOrCreateSession", () => { + it("creates a new session when none exists", async () => { + await initSchema(db as any); + const session = await getOrCreateSession(db as any, "test-session-1", "user-1"); + assert.strictEqual(session.id, "test-session-1"); + assert.strictEqual(session.userId, "user-1"); + assert.strictEqual(session.status, "idle"); + assert.ok(session.conversationId); + }); + }); + + describe("insertEvent", () => { + it("inserts a lifecycle event", async () => { + await initSchema(db as any); + const id = await insertEvent(db as any, "session-1", "user_message", { text: "hello" }); + assert.ok(id); + const events = db.getTable("events"); + assert.ok(events && events.length > 0); + }); + }); + + describe("insertMessage", () => { + it("inserts a conversation message", async () => { + await initSchema(db as any); + const id = await insertMessage(db as any, "session-1", "user", "Hello world"); + assert.ok(id); + const messages = db.getTable("messages"); + assert.ok(messages && messages.length > 0); + }); + }); + + describe("insertActivityEvent", () => { + it("inserts an activity event from collector", async () => { + await initSchema(db as any); + const id = await insertActivityEvent( + db as any, + "aw", + { bucket: "aw-watcher-window" }, + "2024-01-01T12:00:00Z" + ); + assert.ok(id); + const events = db.getTable("activity_events"); + assert.ok(events && events.length > 0); + }); + }); + + describe("insertSummary", () => { + it("inserts an activity summary", async () => { + await initSchema(db as any); + const id = await insertSummary( + db as any, + "session-1", + "2024-01-01T10:00:00Z", + "2024-01-01T12:00:00Z", + { totalMinutes: 120 } + ); + assert.ok(id); + const summaries = db.getTable("summaries"); + assert.ok(summaries && summaries.length > 0); + }); + }); +}); diff --git a/packages/cloud-agent/src/storage.ts b/packages/cloud-agent/src/storage.ts new file mode 100644 index 0000000..a0bcf65 --- /dev/null +++ b/packages/cloud-agent/src/storage.ts @@ -0,0 +1,350 @@ +/** + * Storage operations for sessions, events, messages, and summaries. + * All operations use CloudDatabase (D1-compatible). + */ + +import type { CloudDatabase } from "@mieweb/cloud-types"; +import type { + Session, + SessionStatus, + LifecycleEvent, + ConversationMessage, + ContinuationState, + EventType, + MessageRole, +} from "./types.js"; + +function generateId(): string { + return crypto.randomUUID(); +} + +function nowISO(): string { + return new Date().toISOString(); +} + +/** + * Initialize the schema. Idempotent - safe to call on every request. + */ +export async function initSchema(db: CloudDatabase): Promise { + await db.exec(` + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + user_id TEXT, + status TEXT NOT NULL DEFAULT 'idle', + conversation_id TEXT, + continuation TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + type TEXT NOT NULL, + payload TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS activity_events ( + id TEXT PRIMARY KEY, + source TEXT NOT NULL, + payload TEXT, + occurred_at TEXT NOT NULL, + ingested_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS summaries ( + id TEXT PRIMARY KEY, + session_id TEXT, + range_start TEXT NOT NULL, + range_end TEXT NOT NULL, + summary TEXT, + created_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); + CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id); + CREATE INDEX IF NOT EXISTS idx_activity_events_occurred ON activity_events(occurred_at); + `); +} + +/** + * Get or create a session by ID. + */ +export async function getOrCreateSession( + db: CloudDatabase, + sessionId: string, + userId?: string +): Promise { + const existing = await db + .prepare("SELECT * FROM sessions WHERE id = ?") + .bind(sessionId) + .first<{ + id: string; + user_id: string | null; + status: string; + conversation_id: string | null; + continuation: string | null; + created_at: string; + updated_at: string; + }>(); + + if (existing) { + return { + id: existing.id, + userId: existing.user_id ?? undefined, + status: existing.status as SessionStatus, + conversationId: existing.conversation_id ?? undefined, + continuation: existing.continuation + ? JSON.parse(existing.continuation) + : undefined, + createdAt: existing.created_at, + updatedAt: existing.updated_at, + }; + } + + const now = nowISO(); + const conversationId = generateId(); + await db + .prepare( + `INSERT INTO sessions (id, user_id, status, conversation_id, created_at, updated_at) + VALUES (?, ?, 'idle', ?, ?, ?)` + ) + .bind(sessionId, userId ?? null, conversationId, now, now) + .run(); + + return { + id: sessionId, + userId, + status: "idle", + conversationId, + createdAt: now, + updatedAt: now, + }; +} + +/** + * Update session status and optionally continuation state. + */ +export async function updateSessionStatus( + db: CloudDatabase, + sessionId: string, + status: SessionStatus, + continuation?: ContinuationState | null +): Promise { + const now = nowISO(); + await db + .prepare( + `UPDATE sessions + SET status = ?, continuation = ?, updated_at = ? + WHERE id = ?` + ) + .bind( + status, + continuation ? JSON.stringify(continuation) : null, + now, + sessionId + ) + .run(); +} + +/** + * Insert a lifecycle event. + */ +export async function insertEvent( + db: CloudDatabase, + sessionId: string, + type: EventType, + payload?: unknown +): Promise { + const id = generateId(); + const now = nowISO(); + await db + .prepare( + `INSERT INTO events (id, session_id, type, payload, created_at) + VALUES (?, ?, ?, ?, ?)` + ) + .bind(id, sessionId, type, payload ? JSON.stringify(payload) : null, now) + .run(); + return id; +} + +interface EventRow { + id: string; + session_id: string; + type: string; + payload: string | null; + created_at: string; +} + +/** + * Get recent events for a session. + */ +export async function getSessionEvents( + db: CloudDatabase, + sessionId: string, + limit = 100 +): Promise { + const rows = await db + .prepare( + `SELECT id, session_id, type, payload, created_at + FROM events + WHERE session_id = ? + ORDER BY created_at DESC + LIMIT ?` + ) + .bind(sessionId, limit) + .all(); + + return (rows.results ?? []).map((row: EventRow) => ({ + id: row.id, + sessionId: row.session_id, + type: row.type as EventType, + payload: row.payload ? JSON.parse(row.payload) : null, + createdAt: row.created_at, + })); +} + +/** + * Insert a conversation message. + */ +export async function insertMessage( + db: CloudDatabase, + sessionId: string, + role: MessageRole, + content: unknown +): Promise { + const id = generateId(); + const now = nowISO(); + await db + .prepare( + `INSERT INTO messages (id, session_id, role, content, created_at) + VALUES (?, ?, ?, ?, ?)` + ) + .bind(id, sessionId, role, JSON.stringify(content), now) + .run(); + return id; +} + +interface MessageRow { + id: string; + session_id: string; + role: string; + content: string | null; + created_at: string; +} + +/** + * Get conversation messages for a session. + */ +export async function getSessionMessages( + db: CloudDatabase, + sessionId: string +): Promise { + const rows = await db + .prepare( + `SELECT id, session_id, role, content, created_at + FROM messages + WHERE session_id = ? + ORDER BY created_at ASC` + ) + .bind(sessionId) + .all(); + + return (rows.results ?? []).map((row: MessageRow) => ({ + id: row.id, + sessionId: row.session_id, + role: row.role as MessageRole, + content: row.content ? JSON.parse(row.content) : null, + createdAt: row.created_at, + })); +} + +/** + * Insert an activity event from collector. + */ +export async function insertActivityEvent( + db: CloudDatabase, + source: string, + payload: unknown, + occurredAt: string +): Promise { + const id = generateId(); + const now = nowISO(); + await db + .prepare( + `INSERT INTO activity_events (id, source, payload, occurred_at, ingested_at) + VALUES (?, ?, ?, ?, ?)` + ) + .bind(id, source, JSON.stringify(payload), occurredAt, now) + .run(); + return id; +} + +interface ActivityEventRow { + id: string; + source: string; + payload: string | null; + occurred_at: string; +} + +/** + * Get activity events in a time range. + */ +export async function getActivityEvents( + db: CloudDatabase, + start: string, + end: string, + source?: string +): Promise> { + const query = source + ? `SELECT id, source, payload, occurred_at + FROM activity_events + WHERE occurred_at >= ? AND occurred_at <= ? AND source = ? + ORDER BY occurred_at ASC` + : `SELECT id, source, payload, occurred_at + FROM activity_events + WHERE occurred_at >= ? AND occurred_at <= ? + ORDER BY occurred_at ASC`; + + const rows = source + ? await db.prepare(query).bind(start, end, source).all() + : await db.prepare(query).bind(start, end).all(); + + return (rows.results ?? []).map((row: ActivityEventRow) => ({ + id: row.id, + source: row.source, + payload: row.payload ? JSON.parse(row.payload) : null, + occurredAt: row.occurred_at, + })); +} + +/** + * Insert a summary. + */ +export async function insertSummary( + db: CloudDatabase, + sessionId: string | null, + rangeStart: string, + rangeEnd: string, + summary: unknown +): Promise { + const id = generateId(); + const now = nowISO(); + await db + .prepare( + `INSERT INTO summaries (id, session_id, range_start, range_end, summary, created_at) + VALUES (?, ?, ?, ?, ?, ?)` + ) + .bind(id, sessionId, rangeStart, rangeEnd, JSON.stringify(summary), now) + .run(); + return id; +} diff --git a/packages/cloud-agent/src/types.ts b/packages/cloud-agent/src/types.ts new file mode 100644 index 0000000..e64e75a --- /dev/null +++ b/packages/cloud-agent/src/types.ts @@ -0,0 +1,244 @@ +/** + * @mieweb/cloud-agent type definitions + * + * Types for the event-driven agent host that binds an agent definition + + * AgentRuntime to a Durable Object with queue-driven turns, suspend/resume, + * and alarms. + */ + +import type { + CloudDatabase, + CloudQueue, + CloudStatefulNamespace, + CloudStatefulState, + CloudVectorIndex, + CloudBucket, +} from "@mieweb/cloud-types"; + +/** + * Session status states. + * - `idle`: No turn in progress, ready for new messages + * - `running`: Turn currently executing + * - `waiting_for_user`: Agent asked a question, waiting for user response + * - `waiting_for_approval`: Tool needs human approval + * - `scheduled`: Alarm scheduled for future wake + */ +export type SessionStatus = + | "idle" + | "running" + | "waiting_for_user" + | "waiting_for_approval" + | "scheduled"; + +/** + * Event types in the lifecycle log. + */ +export type EventType = + | "user_message" + | "agent_message" + | "external" + | "scheduled_wake" + | "waiting_for_user" + | "waiting_for_approval" + | "resumed" + | "error"; + +/** + * A lifecycle event in the event log. + */ +export interface LifecycleEvent { + id: string; + sessionId: string; + type: EventType; + payload: unknown; + createdAt: string; +} + +/** + * Conversation message role. + */ +export type MessageRole = "user" | "assistant" | "system" | "tool"; + +/** + * A message in the conversation history. + */ +export interface ConversationMessage { + id: string; + sessionId: string; + role: MessageRole; + content: unknown; + createdAt: string; +} + +/** + * Continuation state for suspend/resume. + * Persisted when the agent suspends waiting for user input or approval. + */ +export interface ContinuationState { + /** The pending question or approval request */ + pendingMessage?: string; + /** Partial tool call state if suspended mid-tool */ + partialToolState?: unknown; + /** Timestamp when suspension occurred */ + suspendedAt: string; + /** Reason for suspension */ + reason: "waiting_for_user" | "waiting_for_approval"; +} + +/** + * Session record in the database. + */ +export interface Session { + id: string; + userId?: string; + status: SessionStatus; + conversationId?: string; + continuation?: ContinuationState; + createdAt: string; + updatedAt: string; +} + +/** + * Turn job queued for processing. + */ +export interface TurnJob { + sessionId: string; + eventId: string; + message?: string; + isResume?: boolean; + scheduledPayload?: unknown; +} + +/** + * Agent definition: instructions and tools. + */ +export interface AgentDefinition { + /** Agent name (used for routing) */ + name: string; + /** System instructions / persona */ + instructions: string; + /** Tools available to the agent (will be passed to runtime) */ + tools?: unknown; +} + +/** + * Minimal AgentRuntime interface expected by the host. + * Matches the AgentRuntime port from @mieweb/jerry-agent-runtime. + */ +export interface AgentRuntime { + /** Execute a turn and yield events */ + runTurn(input: TurnInput): AsyncIterable; +} + +/** + * Input for a single turn of the agent runtime. + */ +export interface TurnInput { + messages: Array<{ role: string; content: unknown }>; + tools?: unknown; + system?: string; + maxSteps?: number; +} + +/** + * Events emitted during a turn. + */ +export type RuntimeEvent = + | { type: "start" } + | { type: "text-delta"; text: string } + | { type: "tool-call"; toolCallId: string; toolName: string; input: unknown } + | { + type: "tool-result"; + toolCallId: string; + toolName: string; + output: unknown; + } + | { type: "finish"; finishReason: string; usage?: unknown } + | { type: "error"; message: string; cause?: unknown } + | { type: "suspend"; reason: "waiting_for_user" | "waiting_for_approval"; message?: string }; + +/** + * Store bindings required by the host. + */ +export interface HostStore { + db: CloudDatabase; + vectors?: CloudVectorIndex; + bucket?: CloudBucket; +} + +/** + * Trigger definition for the host. + */ +export type Trigger = + | { type: "fetch"; path: string; method?: string } + | { type: "queue"; topic?: string } + | { type: "scheduled"; cron: string }; + +/** + * Environment bindings expected by the host. + */ +export interface HostEnv { + DB: CloudDatabase; + JOBS: CloudQueue; + SESSION: CloudStatefulNamespace; + VECTORS?: CloudVectorIndex; + BUCKET?: CloudBucket; +} + +/** + * Host configuration for hostAgent(). + */ +export interface HostAgentConfig { + /** Agent definition (name, instructions, tools) */ + agent: AgentDefinition; + /** Factory function to create runtime for a turn (receives profile from request) */ + createRuntime: (profile?: unknown) => AgentRuntime; + /** Store bindings (built from env) */ + store: HostStore; + /** Trigger definitions (optional, defaults to standard routes) */ + triggers?: Trigger[]; +} + +/** + * Tool context passed to tools during execution. + */ +export interface ToolContext { + sessionId: string; + db: CloudDatabase; + vectors?: CloudVectorIndex; + bucket?: CloudBucket; + /** Schedule a future wake-up */ + scheduleWake: (at: Date | string, payload?: unknown) => Promise; + /** Suspend the turn waiting for user input */ + suspendForUser: (message: string) => void; + /** Suspend the turn waiting for approval */ + suspendForApproval: (message: string) => void; +} + +/** + * Result from hostAgent() - wiring helpers for the worker. + */ +export interface HostAgentResult { + /** The DO class to export */ + SessionClass: new (state: CloudStatefulState, env: HostEnv) => AgentSessionDO; + /** Handle a fetch request (routes to DO) */ + handleFetch: ( + request: Request, + env: HostEnv + ) => Promise; + /** Handle a queue batch (forwards to DO) */ + handleQueue: ( + batch: { messages: Array<{ body: TurnJob; ack: () => void }> }, + env: HostEnv + ) => Promise; + /** Handle scheduled events (optional) */ + handleScheduled?: (event: { cron: string }, env: HostEnv) => Promise; +} + +/** + * The AgentSession Durable Object interface. + */ +export interface AgentSessionDO { + fetch(request: Request): Promise; + alarm?(): Promise; +} From ecb8aa768625be05dda2388fa18b3da90aaf9414 Mon Sep 17 00:00:00 2001 From: Shubhdeep Sarkar Date: Tue, 30 Jun 2026 17:58:11 -0400 Subject: [PATCH 2/6] feat(cloud-agent): run --call turns synchronously and inject tools via createTools POST /messages now executes the turn inline instead of only enqueueing, and hostAgent accepts an optional createTools(ctx) factory so agents like Jerry can bind runtime tools with DB/vector/alarm context. Co-authored-by: Cursor --- packages/cloud-agent/src/host.ts | 4 +-- packages/cloud-agent/src/session.ts | 40 ++++++++++++++--------------- packages/cloud-agent/src/types.ts | 7 +++++ 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/packages/cloud-agent/src/host.ts b/packages/cloud-agent/src/host.ts index 659a2b5..361407e 100644 --- a/packages/cloud-agent/src/host.ts +++ b/packages/cloud-agent/src/host.ts @@ -46,9 +46,9 @@ function extractUserId(request: Request): string | undefined { * - handleScheduled: Optional cron handler */ export function hostAgent(config: HostAgentConfig): HostAgentResult { - const { agent, createRuntime } = config; + const { agent, createRuntime, createTools } = config; - const SessionClass = createSessionClass(agent, createRuntime); + const SessionClass = createSessionClass(agent, createRuntime, createTools); async function handleFetch( request: Request, diff --git a/packages/cloud-agent/src/session.ts b/packages/cloud-agent/src/session.ts index 9aa2fa9..e7683b3 100644 --- a/packages/cloud-agent/src/session.ts +++ b/packages/cloud-agent/src/session.ts @@ -32,7 +32,8 @@ const json = (data: unknown, status = 200) => */ export function createSessionClass( agent: AgentDefinition, - createRuntime: (profile?: unknown) => AgentRuntime + createRuntime: (profile?: unknown) => AgentRuntime, + createTools?: (ctx: ToolContext) => unknown ) { return class AgentSession { private state: CloudStatefulState; @@ -115,7 +116,7 @@ export function createSessionClass( request: Request, sessionId: string ): Promise { - const { message, userId, profile: _profile } = await request.json() as { + const { message, userId, profile } = await request.json() as { message: string; userId?: string; profile?: unknown; @@ -143,22 +144,22 @@ export function createSessionClass( await insertEvent(this.env.DB, sessionId, "resumed"); } - await this.env.JOBS.send({ - sessionId, - eventId, - message, - isResume, - }); - await updateSessionStatus(this.env.DB, sessionId, "running"); - return json({ - ok: true, - sessionId, - eventId, - status: "queued", - wasResume: isResume, + // Synchronous turn for --call (POST /messages). Async enqueue uses /enqueue. + const turnRequest = new Request("http://internal/turn", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + sessionId, + eventId, + message, + isResume, + profile, + } satisfies TurnJob), }); + + return this.handleTurn(turnRequest, sessionId); } /** @@ -189,10 +190,8 @@ export function createSessionClass( content: m.content, })); - const runtime = createRuntime(); + const runtime = createRuntime(job.profile); - // Tool context for tools to access bindings and control flow - // TODO: Pass this to tools when they're implemented const toolContext: ToolContext = { sessionId, db: this.env.DB, @@ -212,14 +211,15 @@ export function createSessionClass( this.suspendMessage = message; }, }; - void toolContext; // Will be passed to tools when implemented + + const tools = createTools?.(toolContext) ?? agent.tools; let assistantContent = ""; let finishReason = "stop"; for await (const event of runtime.runTurn({ messages: coreMessages, - tools: agent.tools, + tools, system: agent.instructions, maxSteps: 10, })) { diff --git a/packages/cloud-agent/src/types.ts b/packages/cloud-agent/src/types.ts index e64e75a..1f214e3 100644 --- a/packages/cloud-agent/src/types.ts +++ b/packages/cloud-agent/src/types.ts @@ -107,6 +107,8 @@ export interface TurnJob { message?: string; isResume?: boolean; scheduledPayload?: unknown; + /** Privacy profile override from the request (runtime, model, egress). */ + profile?: unknown; } /** @@ -193,6 +195,11 @@ export interface HostAgentConfig { agent: AgentDefinition; /** Factory function to create runtime for a turn (receives profile from request) */ createRuntime: (profile?: unknown) => AgentRuntime; + /** + * Build tools for a turn from host bindings (DB, vectors, alarms, …). + * When omitted, static `agent.tools` is used. + */ + createTools?: (ctx: ToolContext) => unknown; /** Store bindings (built from env) */ store: HostStore; /** Trigger definitions (optional, defaults to standard routes) */ From 054795c3983356f4f42ce28bd63d0db75476a26c Mon Sep 17 00:00:00 2001 From: Shubhdeep Sarkar Date: Wed, 1 Jul 2026 16:16:13 -0400 Subject: [PATCH 3/6] chore: update pnpm-lock.yaml to match package.json Add lockfile entries for @types/node@^20.0.0 in cloud-agent and cloud-agent-cli so CI pnpm install --frozen-lockfile succeeds. Co-authored-by: Cursor --- pnpm-lock.yaml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cacc103..43bbc4f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -42,6 +42,22 @@ importers: specifier: workspace:* version: link:../cloud-workers + packages/cloud-agent: + dependencies: + '@mieweb/cloud-types': + specifier: workspace:* + version: link:../cloud-types + devDependencies: + '@types/node': + specifier: ^20.0.0 + version: 20.19.43 + + packages/cloud-agent-cli: + devDependencies: + '@types/node': + specifier: ^20.0.0 + version: 20.19.43 + packages/cloud-local: dependencies: '@hono/node-server': @@ -393,6 +409,9 @@ packages: '@types/node@12.20.55': resolution: {integrity: sha512-J8xLz7q2OFulZ2cyGTLE1TbbZcjpno7FaN6zdJNrgAdrJ+DZzh/uFR6YrTb4C+nXakvud8Q4+rbhoIWlYQbUFQ==} + '@types/node@20.19.43': + resolution: {integrity: sha512-6oYBAi5ikg4Pl+kGsoYtawUMBT2zZMCvPNF7pVLnHZfd1zf38DRiWn/gT01RYCdUqkv7Fhr+C9ot4/tb+2sVvA==} + '@types/node@22.19.20': resolution: {integrity: sha512-6tELRwSDYWW9EdZhbeZmYGZ1/7Djkt+Ah3/ScEYT9cDord7UJzasR/4D3VONg9tQI5CDp+/CZC1AXj2pCFOvpw==} @@ -1530,6 +1549,10 @@ snapshots: '@types/node@12.20.55': {} + '@types/node@20.19.43': + dependencies: + undici-types: 6.21.0 + '@types/node@22.19.20': dependencies: undici-types: 6.21.0 From ed365460562b8ce73e0ad6d5c7bf329b22416d23 Mon Sep 17 00:00:00 2001 From: Shubhdeep Sarkar Date: Wed, 1 Jul 2026 16:17:39 -0400 Subject: [PATCH 4/6] fix: add tsx devDependency for cloud-agent test scripts Tests use `node --import tsx` but tsx was not declared, causing CI unit job failures on a clean install. Co-authored-by: Cursor --- packages/cloud-agent-cli/package.json | 3 +- packages/cloud-agent/package.json | 3 +- pnpm-lock.yaml | 293 ++++++++++++++++++++++++++ 3 files changed, 297 insertions(+), 2 deletions(-) diff --git a/packages/cloud-agent-cli/package.json b/packages/cloud-agent-cli/package.json index 88e1a52..4e3ae26 100644 --- a/packages/cloud-agent-cli/package.json +++ b/packages/cloud-agent-cli/package.json @@ -32,6 +32,7 @@ "test": "node --import tsx --test src/**/*.test.ts" }, "devDependencies": { - "@types/node": "^20.0.0" + "@types/node": "^20.0.0", + "tsx": "^4.19.0" } } diff --git a/packages/cloud-agent/package.json b/packages/cloud-agent/package.json index 45df0e1..991f91d 100644 --- a/packages/cloud-agent/package.json +++ b/packages/cloud-agent/package.json @@ -35,6 +35,7 @@ "@mieweb/cloud-types": "workspace:*" }, "devDependencies": { - "@types/node": "^20.0.0" + "@types/node": "^20.0.0", + "tsx": "^4.19.0" } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 43bbc4f..f147ca5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -51,12 +51,18 @@ importers: '@types/node': specifier: ^20.0.0 version: 20.19.43 + tsx: + specifier: ^4.19.0 + version: 4.22.4 packages/cloud-agent-cli: devDependencies: '@types/node': specifier: ^20.0.0 version: 20.19.43 + tsx: + specifier: ^4.19.0 + version: 4.22.4 packages/cloud-local: dependencies: @@ -277,6 +283,162 @@ packages: '@cloudflare/workers-types@4.20260607.1': resolution: {integrity: sha512-TSiusluJ8+5esTMYwxGFuT1SNU/PRzPmt9VMsmAlzjIK0mhc24Zsc1bbGEVH5qyMZ8hrdRtrAPdt2+T8Vph2+Q==} + '@esbuild/aix-ppc64@0.28.1': + resolution: {integrity: sha512-Svl7tq8k/08+p6CXPpRjQ1fKX+1odH/BQbb48fV6fj3CWHhsoIOoY87w1oHXm0qEpkIK3ZfVgp0hed3XBXzXMQ==} + engines: {node: '>=18'} + cpu: [ppc64] + os: [aix] + + '@esbuild/android-arm64@0.28.1': + resolution: {integrity: sha512-34EGEbCIAgosYz6goLcopX6Mo7NyGv9tfwEM2/7Ce2VcVRk568iSvniGWcUXIy7wEDR1wzolcxcriFVrWYcwBg==} + engines: {node: '>=18'} + cpu: [arm64] + os: [android] + + '@esbuild/android-arm@0.28.1': + resolution: {integrity: sha512-0k2F129Xdio1TdJfzJ8sy1Q47vUD2NnwdhiAf7drUN1EBTfPf4hsFCtmMgu/6m8JSzsBrlmVjudMBQqOfG8usQ==} + engines: {node: '>=18'} + cpu: [arm] + os: [android] + + '@esbuild/android-x64@0.28.1': + resolution: {integrity: sha512-dbwY7ltSMDWsRatcRpCnES4F+im88OCUgGZjy52shC7GqHRE/cYlxNbB4Z4UpJswpcc4Qxd2oE/ufM0p61IKng==} + engines: {node: '>=18'} + cpu: [x64] + os: [android] + + '@esbuild/darwin-arm64@0.28.1': + resolution: {integrity: sha512-TZbWkQY7kvTAXbXUT7uVACR5cMHsDiSz9z7ZKAX/RTq/WJEk3QyRr0wZpNhBDX+/0CtdqUIJlOiodQcta6tY3Q==} + engines: {node: '>=18'} + cpu: [arm64] + os: [darwin] + + '@esbuild/darwin-x64@0.28.1': + resolution: {integrity: sha512-zfdzgK9ACBNZLI/CyHTOx81SyNbM6YXn7rxSgX97VjyiPl9W1i4Ka4fgKECEoFCKGpvBj5qArWIGgQjOwkgskQ==} + engines: {node: '>=18'} + cpu: [x64] + os: [darwin] + + '@esbuild/freebsd-arm64@0.28.1': + resolution: {integrity: sha512-wG2EA8ENdEI0qhkSZMjfqrdY+ziCYCPMmtZjjIwOmXFjmyzEHn+UUxk5of+SYsjtfs3VpnlC7QLzSI5hY/rOAw==} + engines: {node: '>=18'} + cpu: [arm64] + os: [freebsd] + + '@esbuild/freebsd-x64@0.28.1': + resolution: {integrity: sha512-i7dZ9vQgnvSCzi/rYCXNgtF/U+eKZNJBzu3eTQbRgHnM7tNSizLOkRFAl3qzVc/Op/u5YkHHa4pf/3DOYHthLQ==} + engines: {node: '>=18'} + cpu: [x64] + os: [freebsd] + + '@esbuild/linux-arm64@0.28.1': + resolution: {integrity: sha512-yHs+0uc8+nvEAfAfxrWQKK5peSNzBc4PegcMO0EJ2hT71uA7vB8Ihg2e77R2P7SG5uYjPbHlLLmve4LLLRCf0g==} + engines: {node: '>=18'} + cpu: [arm64] + os: [linux] + + '@esbuild/linux-arm@0.28.1': + resolution: {integrity: sha512-qVXBOHQS+d5Y722GwJzJUtOLlX7km3CraOaGormF1pDtPd2C/l1SHRPgjLunLGe51Sh5YYWKMFDyV4SxgMQYTQ==} + engines: {node: '>=18'} + cpu: [arm] + os: [linux] + + '@esbuild/linux-ia32@0.28.1': + resolution: {integrity: sha512-d1z4ZuP0ajrfz/FhGT4vv278rX8KnPPJx8i5+AtK7TYbx9Le9F1hyzurZpkEyjkGa9dUGhQow4C1NmeGvqxN2w==} + engines: {node: '>=18'} + cpu: [ia32] + os: [linux] + + '@esbuild/linux-loong64@0.28.1': + resolution: {integrity: sha512-M5sRjUVZrkm1OAPR3dlOYzNmN+loZKGVi1VUQGrwuqLcbR6qeAz+famMhjASeH3YVKvZz+zT1jlh/keC3Rj/lg==} + engines: {node: '>=18'} + cpu: [loong64] + os: [linux] + + '@esbuild/linux-mips64el@0.28.1': + resolution: {integrity: sha512-mRObBZeHh2OxcBFPWE/FjylkRgZdYuiTR3vaTozquCGOH14iP9oN4x4Ge81CoIDYQrXmIxpFumJBu5MtZpnQJQ==} + engines: {node: '>=18'} + cpu: [mips64el] + os: [linux] + + '@esbuild/linux-ppc64@0.28.1': + resolution: {integrity: sha512-slScBsMAb3GFDcdrCgLwZtPYRoH2H/youv10QiZyRjmsP48fznoveWytSgCI/R0ZcUgpc0ZhIUEx6LHts8yrfQ==} + engines: {node: '>=18'} + cpu: [ppc64] + os: [linux] + + '@esbuild/linux-riscv64@0.28.1': + resolution: {integrity: sha512-kw0owk1o0GFETUJyW0jc0G4Yzs0BHZn0JDZ8JRT088vjJYX777BAs1fDGxAC+q831qOs2DTC96mNsG2opdfyyQ==} + engines: {node: '>=18'} + cpu: [riscv64] + os: [linux] + + '@esbuild/linux-s390x@0.28.1': + resolution: {integrity: sha512-/lAIjX8aYFRByhh6L5rYtPEDRqa9de/4V/juOXcta5frjvzXO4/sqEtyytse0g3zZFuWu5cDN0MkLz2qRDD2Ag==} + engines: {node: '>=18'} + cpu: [s390x] + os: [linux] + + '@esbuild/linux-x64@0.28.1': + resolution: {integrity: sha512-u/anNYF2mmVOEDwLtnQ1wOr3EZ9sTNGLWrsYGYwHWzGA3Si84IOkHXlbWTD1NB+9/1lcnweYKO54uhxZydNzfA==} + engines: {node: '>=18'} + cpu: [x64] + os: [linux] + + '@esbuild/netbsd-arm64@0.28.1': + resolution: {integrity: sha512-oks0DYbLwWMmaakTsCb+zL4E+aHRVLom9IJZOAthMQEPiQmydXHkziYEsGYRx0uNV/IjEKGAV941JzH02pflqw==} + engines: {node: '>=18'} + cpu: [arm64] + os: [netbsd] + + '@esbuild/netbsd-x64@0.28.1': + resolution: {integrity: sha512-aeL6lAnN89Hz43Mlh1G8ARasbuoYvSITDEx0tHh5b7jJnHcssqgjy9Yx430GDpmCa6OyrKoS0aNRjKundRizGg==} + engines: {node: '>=18'} + cpu: [x64] + os: [netbsd] + + '@esbuild/openbsd-arm64@0.28.1': + resolution: {integrity: sha512-MEFJe5C3R8pwXdZ5Y21oo6m7ePiS0d9pWucn99O/wvyJZChoIQKrQDxKrGeW8F5+T0okTHesAmDeiHDTIq0V/Q==} + engines: {node: '>=18'} + cpu: [arm64] + os: [openbsd] + + '@esbuild/openbsd-x64@0.28.1': + resolution: {integrity: sha512-i/ZLIOafE0Z8cI/XANJAixoJL/uRAoS2xOA3rb0xN+KK0K177cMAsQYkzHtBrtMXAKuAc7HGgcWiZ/sRC1Nxgw==} + engines: {node: '>=18'} + cpu: [x64] + os: [openbsd] + + '@esbuild/openharmony-arm64@0.28.1': + resolution: {integrity: sha512-ge+Z7EXFNt2BO1oAMsVpiQ8EwndV9i1xXerAeTIK7AtPs3bKFXQM7nlRxDSIUIMeueR1CNXxqztLzdNeReKBJg==} + engines: {node: '>=18'} + cpu: [arm64] + os: [openharmony] + + '@esbuild/sunos-x64@0.28.1': + resolution: {integrity: sha512-BEjgtECkL3vY+SaSQ6nzVfiALUeFxpawyp8Jmf5PtYhf1Ug40N1h/hxlhts+f1FvSvarEigdxS3BlSMI2PJLcQ==} + engines: {node: '>=18'} + cpu: [x64] + os: [sunos] + + '@esbuild/win32-arm64@0.28.1': + resolution: {integrity: sha512-lCv9eK/H6ZJWbE7bh2nw54CZ9M2nupBxJcTsdk/QQnWkdSjKGuxmmH8/GWrlT1eMmZfn4dGcCjRte397WqfQXA==} + engines: {node: '>=18'} + cpu: [arm64] + os: [win32] + + '@esbuild/win32-ia32@0.28.1': + resolution: {integrity: sha512-zvb/mB2bSCoJOpoCBgYKKpX6YM6mJBlBUVUtVj41DlZJVEB6/0CKlRYxP5wWl1C1ILiCoAU5wZZ4q1P3qeS6Eg==} + engines: {node: '>=18'} + cpu: [ia32] + os: [win32] + + '@esbuild/win32-x64@0.28.1': + resolution: {integrity: sha512-bm4Mowrv+GXMlpWX++EcXw/iLyd1o3+bJkC2DkWXYVvgZCqD/bSj9ctZeAMC3cIxgjRVR2Dufaiu4YPxr5gW1A==} + engines: {node: '>=18'} + cpu: [x64] + os: [win32] + '@hono/node-server@1.19.14': resolution: {integrity: sha512-GwtvgtXxnWsucXvbQXkRgqksiH2Qed37H9xHZocE5sA3N8O8O8/8FA3uclQXxXVzc9XBZuEOMK7+r02FmSpHtw==} engines: {node: '>=18.14.1'} @@ -524,6 +686,11 @@ packages: resolution: {integrity: sha512-rRqJg/6gd538VHvR3PSrdRBb/1Vy2YfzHqzvbhGIQpDRKIa4FgV/54b5Q1xYSxOOwKvjXweS26E0Q+nAMwp2pQ==} engines: {node: '>=8.6'} + esbuild@0.28.1: + resolution: {integrity: sha512-HrJrvZv5ayxBzPfwphOoNzkzOIIlifzk0KJrGK2c8R4+LKpMtpYLQeUdjnwjWv/LZlkH2laZk+4w78pi99D4Vw==} + engines: {node: '>=18'} + hasBin: true + esprima@4.0.1: resolution: {integrity: sha512-eGuFFw7Upda+g4p+QHvnW0RyTX/SVeJBDM/gCtMARO0cLuT2HcEKnTPvhjV6aGeqrCB/sbNop0Kszm0jsaWU4A==} engines: {node: '>=4'} @@ -580,6 +747,11 @@ packages: resolution: {integrity: sha512-yhlQgA6mnOJUKOsRUFsgJdQCvkKhcz8tlZG5HBQfReYZy46OwLcY+Zia0mtdHsOo9y/hP+CxMN0TU9QxoOtG4g==} engines: {node: '>=6 <7 || >=8'} + fsevents@2.3.3: + resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==} + engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} + os: [darwin] + github-from-package@0.0.0: resolution: {integrity: sha512-SyHy3T1v2NUXn29OsWdxmK6RwHD+vkj3v8en8AOBZ1wBQ/hCAQ5bAQTD02kW4W9tUp/3Qh6J8r9EvntiyCmOOw==} @@ -929,6 +1101,11 @@ packages: tslib@2.8.1: resolution: {integrity: sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==} + tsx@4.22.4: + resolution: {integrity: sha512-X8EX+XV4QR5xCsrgxaED954zTDfY8KqlDtskKEL0cHhyS/P8b4IFOvGDQpsC9Q1XnLq915wEfwwY/zzskCtmhg==} + engines: {node: '>=18.0.0'} + hasBin: true + tunnel-agent@0.6.0: resolution: {integrity: sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==} @@ -1381,6 +1558,84 @@ snapshots: '@cloudflare/workers-types@4.20260607.1': {} + '@esbuild/aix-ppc64@0.28.1': + optional: true + + '@esbuild/android-arm64@0.28.1': + optional: true + + '@esbuild/android-arm@0.28.1': + optional: true + + '@esbuild/android-x64@0.28.1': + optional: true + + '@esbuild/darwin-arm64@0.28.1': + optional: true + + '@esbuild/darwin-x64@0.28.1': + optional: true + + '@esbuild/freebsd-arm64@0.28.1': + optional: true + + '@esbuild/freebsd-x64@0.28.1': + optional: true + + '@esbuild/linux-arm64@0.28.1': + optional: true + + '@esbuild/linux-arm@0.28.1': + optional: true + + '@esbuild/linux-ia32@0.28.1': + optional: true + + '@esbuild/linux-loong64@0.28.1': + optional: true + + '@esbuild/linux-mips64el@0.28.1': + optional: true + + '@esbuild/linux-ppc64@0.28.1': + optional: true + + '@esbuild/linux-riscv64@0.28.1': + optional: true + + '@esbuild/linux-s390x@0.28.1': + optional: true + + '@esbuild/linux-x64@0.28.1': + optional: true + + '@esbuild/netbsd-arm64@0.28.1': + optional: true + + '@esbuild/netbsd-x64@0.28.1': + optional: true + + '@esbuild/openbsd-arm64@0.28.1': + optional: true + + '@esbuild/openbsd-x64@0.28.1': + optional: true + + '@esbuild/openharmony-arm64@0.28.1': + optional: true + + '@esbuild/sunos-x64@0.28.1': + optional: true + + '@esbuild/win32-arm64@0.28.1': + optional: true + + '@esbuild/win32-ia32@0.28.1': + optional: true + + '@esbuild/win32-x64@0.28.1': + optional: true + '@hono/node-server@1.19.14(hono@4.12.23)': dependencies: hono: 4.12.23 @@ -1667,6 +1922,35 @@ snapshots: ansi-colors: 4.1.3 strip-ansi: 6.0.1 + esbuild@0.28.1: + optionalDependencies: + '@esbuild/aix-ppc64': 0.28.1 + '@esbuild/android-arm': 0.28.1 + '@esbuild/android-arm64': 0.28.1 + '@esbuild/android-x64': 0.28.1 + '@esbuild/darwin-arm64': 0.28.1 + '@esbuild/darwin-x64': 0.28.1 + '@esbuild/freebsd-arm64': 0.28.1 + '@esbuild/freebsd-x64': 0.28.1 + '@esbuild/linux-arm': 0.28.1 + '@esbuild/linux-arm64': 0.28.1 + '@esbuild/linux-ia32': 0.28.1 + '@esbuild/linux-loong64': 0.28.1 + '@esbuild/linux-mips64el': 0.28.1 + '@esbuild/linux-ppc64': 0.28.1 + '@esbuild/linux-riscv64': 0.28.1 + '@esbuild/linux-s390x': 0.28.1 + '@esbuild/linux-x64': 0.28.1 + '@esbuild/netbsd-arm64': 0.28.1 + '@esbuild/netbsd-x64': 0.28.1 + '@esbuild/openbsd-arm64': 0.28.1 + '@esbuild/openbsd-x64': 0.28.1 + '@esbuild/openharmony-arm64': 0.28.1 + '@esbuild/sunos-x64': 0.28.1 + '@esbuild/win32-arm64': 0.28.1 + '@esbuild/win32-ia32': 0.28.1 + '@esbuild/win32-x64': 0.28.1 + esprima@4.0.1: {} expand-template@2.0.3: @@ -1738,6 +2022,9 @@ snapshots: jsonfile: 4.0.0 universalify: 0.1.2 + fsevents@2.3.3: + optional: true + github-from-package@0.0.0: optional: true @@ -2094,6 +2381,12 @@ snapshots: tslib@2.8.1: optional: true + tsx@4.22.4: + dependencies: + esbuild: 0.28.1 + optionalDependencies: + fsevents: 2.3.3 + tunnel-agent@0.6.0: dependencies: safe-buffer: 5.2.1 From f3d3790ba0b5e9999d0f15a483a96fb576a41b7f Mon Sep 17 00:00:00 2001 From: Shubhdeep Sarkar Date: Thu, 2 Jul 2026 11:55:43 -0400 Subject: [PATCH 5/6] fix(cloud-agent): address Copilot PR review on async turns - Forward profile and userId through the /enqueue path into TurnJob so async turns honor the per-request privacy profile. - Persist the enqueued user message and status transitions inside handleTurn so queued turns actually consume the new message; slim handleMessage to delegate and avoid double-inserts. - Reset session status to idle (and log an error event) when a turn throws, preventing sessions from getting stuck in "running". - Retry failed/exception queue jobs instead of ack-ing them so transient failures are not dropped permanently. Co-authored-by: Cursor --- packages/cloud-agent/src/host.ts | 12 +++-- packages/cloud-agent/src/session.ts | 73 ++++++++++++++++------------- packages/cloud-agent/src/types.ts | 4 +- 3 files changed, 53 insertions(+), 36 deletions(-) diff --git a/packages/cloud-agent/src/host.ts b/packages/cloud-agent/src/host.ts index 361407e..f8c3d67 100644 --- a/packages/cloud-agent/src/host.ts +++ b/packages/cloud-agent/src/host.ts @@ -9,6 +9,7 @@ import type { HostEnv, TurnJob, } from "./types.js"; +import type { CloudMessageBatch } from "@mieweb/cloud-types"; import { createSessionClass } from "./session.js"; import { initSchema, insertActivityEvent } from "./storage.js"; @@ -110,13 +111,16 @@ export function hostAgent(config: HostAgentConfig): HostAgentResult { } if (path.endsWith("/enqueue") && request.method === "POST") { - const body = await request.json() as { message: string }; + const body = await request.json() as { message: string; profile?: unknown }; + const userId = extractUserId(request); const eventId = crypto.randomUUID(); await env.JOBS.send({ sessionId, eventId, message: body.message, + userId, + profile: body.profile, }); return json({ ok: true, sessionId, eventId, status: "queued" }); @@ -131,7 +135,7 @@ export function hostAgent(config: HostAgentConfig): HostAgentResult { } async function handleQueue( - batch: { messages: Array<{ body: TurnJob; ack: () => void }> }, + batch: CloudMessageBatch, env: HostEnv ): Promise { for (const message of batch.messages) { @@ -152,12 +156,14 @@ export function hostAgent(config: HostAgentConfig): HostAgentResult { if (!response.ok) { const error = await response.text(); console.error(`Turn failed for session ${job.sessionId}:`, error); + message.retry(); + continue; } message.ack(); } catch (err) { console.error(`Queue processing error for session ${job.sessionId}:`, err); - message.ack(); + message.retry(); } } } diff --git a/packages/cloud-agent/src/session.ts b/packages/cloud-agent/src/session.ts index e7683b3..4db4bdc 100644 --- a/packages/cloud-agent/src/session.ts +++ b/packages/cloud-agent/src/session.ts @@ -110,7 +110,7 @@ export function createSessionClass( } /** - * Handle incoming message - record event and enqueue turn. + * Handle incoming message - delegate to handleTurn for persistence and execution. */ private async handleMessage( request: Request, @@ -122,39 +122,14 @@ export function createSessionClass( profile?: unknown; }; - const session = await getOrCreateSession(this.env.DB, sessionId, userId); - - if (session.status === "running") { - return json( - { error: "Turn already in progress", status: session.status }, - 409 - ); - } - - const eventId = await insertEvent(this.env.DB, sessionId, "user_message", { - message, - }); - await insertMessage(this.env.DB, sessionId, "user", message); - - const isResume = - session.status === "waiting_for_user" || - session.status === "waiting_for_approval"; - - if (isResume) { - await insertEvent(this.env.DB, sessionId, "resumed"); - } - - await updateSessionStatus(this.env.DB, sessionId, "running"); - - // Synchronous turn for --call (POST /messages). Async enqueue uses /enqueue. const turnRequest = new Request("http://internal/turn", { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ sessionId, - eventId, + eventId: crypto.randomUUID(), message, - isResume, + userId, profile, } satisfies TurnJob), }); @@ -178,11 +153,36 @@ export function createSessionClass( this.suspendMessage = null; try { - // Read job from request (used for context, e.g. scheduledPayload) const job = await request.json() as TurnJob; - void job; // Used in future for scheduledPayload handling - // Ensure session exists (also validates sessionId) - await getOrCreateSession(this.env.DB, sessionId); + const session = await getOrCreateSession( + this.env.DB, + sessionId, + job.userId + ); + + if (session.status === "running") { + return json( + { error: "Turn already in progress", status: session.status }, + 409 + ); + } + + if (job.message) { + const isResume = + session.status === "waiting_for_user" || + session.status === "waiting_for_approval"; + + await insertEvent(this.env.DB, sessionId, "user_message", { + message: job.message, + }); + await insertMessage(this.env.DB, sessionId, "user", job.message); + + if (isResume) { + await insertEvent(this.env.DB, sessionId, "resumed"); + } + } + + await updateSessionStatus(this.env.DB, sessionId, "running"); const messages = await getSessionMessages(this.env.DB, sessionId); const coreMessages = messages.map((m) => ({ @@ -278,6 +278,15 @@ export function createSessionClass( message: assistantContent, finishReason, }); + } catch (err) { + await insertEvent(this.env.DB, sessionId, "error", { + message: String(err instanceof Error ? err.message : err), + }); + await updateSessionStatus(this.env.DB, sessionId, "idle"); + return json( + { error: String(err instanceof Error ? err.message : err) }, + 500 + ); } finally { this.turnInProgress = false; } diff --git a/packages/cloud-agent/src/types.ts b/packages/cloud-agent/src/types.ts index 1f214e3..1acec20 100644 --- a/packages/cloud-agent/src/types.ts +++ b/packages/cloud-agent/src/types.ts @@ -8,6 +8,7 @@ import type { CloudDatabase, + CloudMessageBatch, CloudQueue, CloudStatefulNamespace, CloudStatefulState, @@ -105,6 +106,7 @@ export interface TurnJob { sessionId: string; eventId: string; message?: string; + userId?: string; isResume?: boolean; scheduledPayload?: unknown; /** Privacy profile override from the request (runtime, model, egress). */ @@ -235,7 +237,7 @@ export interface HostAgentResult { ) => Promise; /** Handle a queue batch (forwards to DO) */ handleQueue: ( - batch: { messages: Array<{ body: TurnJob; ack: () => void }> }, + batch: CloudMessageBatch, env: HostEnv ) => Promise; /** Handle scheduled events (optional) */ From f68bd1dcf81397f15c44f0977b53dbea9e85169a Mon Sep 17 00:00:00 2001 From: Shubhdeep Sarkar Date: Thu, 2 Jul 2026 12:08:48 -0400 Subject: [PATCH 6/6] fix(cloud-agent): harden alarm enqueue and event payload persistence - alarm(): create the scheduled_wake event first and reuse its eventId for the queued wake job, and only delete alarm_payload after JOBS.send() succeeds so a failed enqueue can be retried. - insertEvent(): only store NULL when payload is undefined, so falsy payloads (0, false, "") are persisted instead of silently dropped. Co-authored-by: Cursor --- packages/cloud-agent/src/session.ts | 12 +++++++++--- packages/cloud-agent/src/storage.ts | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/cloud-agent/src/session.ts b/packages/cloud-agent/src/session.ts index 4db4bdc..2a6b99d 100644 --- a/packages/cloud-agent/src/session.ts +++ b/packages/cloud-agent/src/session.ts @@ -86,15 +86,21 @@ export function createSessionClass( async alarm(): Promise { const sessionId = this.state.id.toString(); const payload = await this.state.storage.get("alarm_payload"); - await this.state.storage.delete("alarm_payload"); - await insertEvent(this.env.DB, sessionId, "scheduled_wake", payload); + const eventId = await insertEvent( + this.env.DB, + sessionId, + "scheduled_wake", + payload + ); await this.env.JOBS.send({ sessionId, - eventId: crypto.randomUUID(), + eventId, scheduledPayload: payload, }); + + await this.state.storage.delete("alarm_payload"); } /** diff --git a/packages/cloud-agent/src/storage.ts b/packages/cloud-agent/src/storage.ts index a0bcf65..8160143 100644 --- a/packages/cloud-agent/src/storage.ts +++ b/packages/cloud-agent/src/storage.ts @@ -172,7 +172,7 @@ export async function insertEvent( `INSERT INTO events (id, session_id, type, payload, created_at) VALUES (?, ?, ?, ?, ?)` ) - .bind(id, sessionId, type, payload ? JSON.stringify(payload) : null, now) + .bind(id, sessionId, type, payload === undefined ? null : JSON.stringify(payload), now) .run(); return id; }