-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathapp.py
More file actions
122 lines (94 loc) · 4.34 KB
/
Copy pathapp.py
File metadata and controls
122 lines (94 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""codegraff agent served over turboAPI.
The Python mirror of `sdk/typescript/example` (the Next.js demo): the codegraff
agent runs *in-process* and is exposed as a fast HTTP API backed by turboAPI's
Zig HTTP core. Request bodies are validated by dhi (turboAPI's Pydantic-/Zod-
compatible model layer) before Python ever runs the handler.
GET / -> SDK + server info
POST /chat -> run one agent turn, return the full markdown answer
POST /chat/stream -> stream every agent event as Server-Sent Events
GET /conversations -> list stored conversations
Run (see README.md for the 3.14t / source-build caveats):
pip install -r requirements.txt
export CODEGRAFF_API_KEY="cg_sk_..." # your codegraff gateway key
python3.14t app.py # http://127.0.0.1:8000
"""
from __future__ import annotations
import json
import os
from typing import Iterable, Optional
from turboapi import TurboAPI, HTTPException
from turboapi.responses import StreamingResponse
from dhi import BaseModel
from codegraff import AgentEvent, Graff, version as sdk_version
# One long-lived agent for the whole process. BYOK via env, exactly like the
# TS example's `Graff.init({ provider, apiKey, model })`. With no api_key set,
# the SDK falls back to ~/.forge/forge.toml like the graff CLI does.
graff = Graff(
provider=os.environ.get("CODEGRAFF_PROVIDER", "codegraff"),
api_key=os.environ.get("CODEGRAFF_API_KEY"),
model=os.environ.get("CODEGRAFF_MODEL", "deepseek-v4-pro"),
)
app = TurboAPI()
class ChatRequest(BaseModel):
"""Validated by dhi's Zig core *before* the handler runs."""
prompt: str
model: Optional[str] = None
conversation_id: Optional[str] = None
def _collect(events: Iterable[AgentEvent]) -> dict:
"""Collapse a chat() event stream into one JSON answer.
Assistant text arrives as `TaskMessage` events whose `content.kind` is
"Markdown" (the same contract the TS example consumes); tool calls surface
as `ToolCallStart`. See sdk/python/src/wire.rs for the full event shape.
"""
chunks: list[str] = []
tools: list[str] = []
conversation_id: Optional[str] = None
for ev in events:
if ev.type == "ConversationStarted":
conversation_id = ev.data.get("conversationId")
elif ev.type == "TaskMessage":
content = ev.data.get("content") or {}
if content.get("kind") == "Markdown":
chunks.append(content.get("text", ""))
elif ev.type == "ToolCallStart":
tool_call = ev.data.get("tool_call") or {}
name = tool_call.get("name") or tool_call.get("tool_name")
if name:
tools.append(name)
return {
"conversation_id": conversation_id,
"text": "".join(chunks),
"tools_used": tools,
}
@app.get("/")
def root():
return {"sdk": "codegraff", "sdk_version": sdk_version(), "server": "turboapi"}
@app.post("/chat")
def chat(req: ChatRequest):
"""Run one agent turn and return the full markdown answer."""
if not req.prompt:
raise HTTPException(status_code=400, detail="prompt is required")
events = graff.chat(req.prompt, conversation_id=req.conversation_id, model=req.model)
return _collect(events)
@app.post("/chat/stream")
def chat_stream(req: ChatRequest):
"""Stream every agent event to the client as Server-Sent Events.
graff.chat() is a blocking generator that releases the GIL while it waits on
the agent (py.allow_threads in ChatStreamHandle.next_event), so it drives
turboAPI's StreamingResponse without starving the server loop.
"""
if not req.prompt:
raise HTTPException(status_code=400, detail="prompt is required")
def sse() -> Iterable[str]:
for ev in graff.chat(req.prompt, conversation_id=req.conversation_id, model=req.model):
yield f"data: {json.dumps({'type': ev.type, 'data': ev.data})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(sse(), media_type="text/event-stream")
@app.get("/conversations")
def conversations(limit: int = 20):
return {"conversations": graff.list_conversations(limit)}
if __name__ == "__main__":
host = os.environ.get("HOST", "127.0.0.1")
port = int(os.environ.get("PORT", "8000"))
print(f"codegraff {sdk_version()} -> turboapi on http://{host}:{port}")
app.run(host=host, port=port)