Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ implement the same Cloudflare-shaped contract.
| [`cloud-workers`](cloud-workers) | The `DurableObject` base. Backs the **`mieweb:workers`** virtual import: re-exports `cloudflare:workers` on Cloudflare (workerd export condition), pure-JS base everywhere else. |
| [`cloud`](cloud) | Umbrella entry. Re-exports the contracts + `DurableObject` from one stable import surface (`@mieweb/cloud`). |
| [`cloud-local`](cloud-local) | Local/Node **adapters** (the POC): D1→SQLite, R2→filesystem, KV→in-memory, Queues→in-process, Durable Objects→in-process registry. Vectorize/Workers AI surface explicit `UnsupportedBindingError`. Includes the Node **host harness** that runs the unchanged worker handler and a migration runner. |
| [`cloud-agent`](cloud-agent) | Event-driven **agent host**. Binds an agent definition + `AgentRuntime` to a Durable Object with queue-driven turns, suspend/resume, and alarms. Provides `hostAgent()` which returns DO class + worker wiring helpers. |
| [`cloud-agent-cli`](cloud-agent-cli) | Message-first **CLI dispatcher** for `cloud-agent`. Agent identity from `basename(argv[0])`. Agent-specific packages (`jerry`, `lisa`, etc.) wrap this with their config. |
| [`cli`](cli) | The **`mieweb`** CLI. On the `cloudflare` target it delegates verbatim to `wrangler`; on other targets it drives the matching adapter. |

## How it wires into the app
Expand Down
16 changes: 16 additions & 0 deletions packages/cloud-agent-cli/bin/agent-cli.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env node
/**
* Generic agent CLI dispatcher.
* Agent name is determined by basename(argv[0]).
*/

import { run } from '../src/index.js';
import { basename } from 'path';

const agent = basename(process.argv[1]).replace(/\.(js|mjs|ts)$/, '');

run({
agent,
baseUrl: process.env.JERRY_URL ?? 'http://127.0.0.1:8787',
version: '0.1.0',
});
38 changes: 38 additions & 0 deletions packages/cloud-agent-cli/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "@mieweb/cloud-agent-cli",
"version": "0.1.0",
"description": "Message-first CLI dispatcher for @mieweb/cloud-agent. Agent identity is determined by basename(argv[0]) (busybox/git multicall pattern).",
"type": "module",
"license": "MIT",
"author": "MIEWEB",
"homepage": "https://github.com/mieweb/cloud#readme",
"bugs": "https://github.com/mieweb/cloud/issues",
"repository": {
"type": "git",
"url": "git+https://github.com/mieweb/cloud.git",
"directory": "packages/cloud-agent-cli"
},
"publishConfig": {
"access": "public"
},
"exports": {
".": {
"types": "./src/index.ts",
"default": "./src/index.ts"
}
},
"bin": {
"agent-cli": "./bin/agent-cli.js"
},
"files": [
"src",
"bin"
],
"scripts": {
"test": "node --import tsx --test src/**/*.test.ts"
},
"devDependencies": {
"@types/node": "^20.0.0",
"tsx": "^4.19.0"
}
}
230 changes: 230 additions & 0 deletions packages/cloud-agent-cli/src/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/**
* HTTP client for agent communication.
* Supports streaming (SSE) and fire-and-forget modes.
*/

import type { StreamEvent } from "./types.js";

interface RequestOptions {
profile?: unknown;
cwd?: string;
}

/**
* Send a message and stream the response.
* Uses Server-Sent Events for real-time streaming.
*/
export async function* streamCall(
baseUrl: string,
sessionId: string,
message: string,
options: RequestOptions
): AsyncGenerator<StreamEvent> {
const url = `${baseUrl}/v1/sessions/${sessionId}/messages`;

const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream, application/json",
},
body: JSON.stringify({
message,
profile: options.profile,
context: { cwd: options.cwd },
}),
});

if (!response.ok) {
const text = await response.text();
let errorMessage: string;
try {
const json = JSON.parse(text);
errorMessage = json.error ?? text;
} catch {
errorMessage = text;
}
yield { type: "error", message: errorMessage };
return;
}

const contentType = response.headers.get("content-type") ?? "";

// Handle SSE streaming
if (contentType.includes("text/event-stream")) {
yield* parseSSE(response);
return;
}

// Handle JSON response (non-streaming)
const json = await response.json() as {
ok?: boolean;
message?: string;
status?: string;
suspended?: boolean;
error?: string;
};

if (json.error) {
yield { type: "error", message: json.error };
return;
}

yield { type: "start" };

if (json.message) {
yield { type: "text", text: json.message };
}

if (json.suspended) {
yield {
type: "suspended",
reason: json.status ?? "waiting_for_user",
message: json.message,
};
} else {
yield { type: "finish", finishReason: "stop" };
}
}

/**
* Parse Server-Sent Events from a response.
*/
async function* parseSSE(response: Response): AsyncGenerator<StreamEvent> {
const reader = response.body?.getReader();
if (!reader) {
yield { type: "error", message: "No response body" };
return;
}

const decoder = new TextDecoder();
let buffer = "";

try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });

// Parse complete SSE events
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // Keep incomplete line in buffer

for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
yield { type: "finish", finishReason: "stop" };
return;
}

try {
const event = JSON.parse(data);
yield normalizeEvent(event);
} catch {
// Ignore malformed JSON
}
}
}
}
} finally {
reader.releaseLock();
}
}

/**
* Normalize server event to StreamEvent.
*/
function normalizeEvent(event: unknown): StreamEvent {
if (typeof event !== "object" || event === null) {
return { type: "error", message: "Invalid event" };
}

const e = event as Record<string, unknown>;

switch (e.type) {
case "start":
return { type: "start" };
case "text-delta":
case "text":
return { type: "text", text: String(e.text ?? "") };
case "tool-call":
return {
type: "tool-call",
toolName: String(e.toolName ?? ""),
input: e.input,
};
case "tool-result":
return {
type: "tool-result",
toolName: String(e.toolName ?? ""),
output: e.output,
};
case "finish":
return { type: "finish", finishReason: String(e.finishReason ?? "stop") };
case "error":
return { type: "error", message: String(e.message ?? "Unknown error") };
case "suspend":
case "suspended":
return {
type: "suspended",
reason: String(e.reason ?? "waiting_for_user"),
message: e.message as string | undefined,
};
default:
return { type: "error", message: `Unknown event type: ${e.type}` };
}
}

/**
* Send a message and return immediately (fire-and-forget).
* The agent processes the message asynchronously.
*/
export async function fireAndForget(
baseUrl: string,
sessionId: string,
message: string,
options: RequestOptions
): Promise<{ eventId: string; status: string }> {
const url = `${baseUrl}/v1/sessions/${sessionId}/enqueue`;

const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
message,
profile: options.profile,
context: { cwd: options.cwd },
}),
});

if (!response.ok) {
const text = await response.text();
throw new Error(`Request failed: ${text}`);
}

const json = await response.json() as { eventId: string; status: string };
return json;
}

/**
* Get session status.
*/
export async function getStatus(
baseUrl: string,
sessionId: string
): Promise<{ status: string; continuation?: unknown }> {
const url = `${baseUrl}/v1/sessions/${sessionId}/status`;

const response = await fetch(url);

if (!response.ok) {
const text = await response.text();
throw new Error(`Request failed: ${text}`);
}

return response.json() as Promise<{ status: string; continuation?: unknown }>;
}
28 changes: 28 additions & 0 deletions packages/cloud-agent-cli/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* @mieweb/cloud-agent-cli — Message-first CLI dispatcher for @mieweb/cloud-agent.
*
* Agent identity is determined by basename(argv[0]) (busybox/git multicall pattern).
* Agent-specific packages wrap this with their config.
*
* @example
* ```ts
* // packages/cli/bin/jerry.js
* import { run } from '@mieweb/cloud-agent-cli';
*
* run({
* agent: 'jerry',
* baseUrl: process.env.JERRY_URL ?? 'http://127.0.0.1:8787',
* version: '0.1.0',
* });
* ```
*/

export { run } from "./run.js";
export { parseArgs } from "./parse.js";
export { streamCall, fireAndForget, getStatus } from "./client.js";
export type {
CliConfig,
ParsedCommand,
CliOptions,
StreamEvent,
} from "./types.js";
Loading
Loading