feat: session adapter and per-message stream state#1
Conversation
* fix: anthropic tool call issues * fixing pnpm lock * ci: apply automated fixes * reworking model to uimessage conversions * simplifying the message conversion handling * ci: apply automated fixes * more small fixups * simplifying the message conversion handling * small test fixups * ci: apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
|
I had a good look through the code, it's sensibly structured, and follows the patterns from the repo. I also asked Codex5.3 and Opus4.6 (with the later having the context of the Codex review) to do reviews - I'll past them below. |
Codex Review:Findings
onStreamEnd: (message: UIMessage) => {
this.callbacksRef.current.onFinish(message)
this.setStatus('ready')
// Resolve the processing-complete promise so streamResponse can continue
this.processingResolve?.()
this.processingResolve = null
},// Set up promise that resolves when onStreamEnd fires
const processingComplete = this.waitForProcessing()
// Send through session adapter (pushes chunks to subscription queue)
await this.session.send(messages, mergedBody, this.abortController.signal)
// Wait for subscription loop to finish processing all chunks
await processingComplete
// Generate stream ID and start assistant message
this.currentStreamId = this.generateUniqueId('stream')
const messageId = this.processor.startAssistantMessage()
this.currentMessageId = messageId// Case 1: A manual message was created via startAssistantMessage()
if (this.pendingManualMessageId) {
const pendingId = this.pendingManualMessageId
this.pendingManualMessageId = null
if (pendingId !== messageId) {
// Update the message's ID in the messages array
this.messages = this.messages.map((msg) =>
msg.id === pendingId ? { ...msg, id: messageId } : msg,
)
// ... state key move only
}
// Ensure state exists
if (!this.messageStates.has(messageId)) {
this.createMessageState(messageId, role)
this.activeMessageIds.add(messageId)
}
// Per-message stream state
private messageStates: Map<string, MessageStreamState> = new Map()
private activeMessageIds: Set<string> = new Set()
private toolCallToMessage: Map<string, string> = new Map()
private pendingManualMessageId: string | null = nullprivate handleMessagesSnapshotEvent(
chunk: Extract<StreamChunk, { type: 'MESSAGES_SNAPSHOT' }>,
): void {
this.messages = [...chunk.messages]
this.emitMessagesChange()
}
chunk = await new Promise<StreamChunk | null>((resolve) => {
waiters.push(resolve)
signal?.addEventListener('abort', () => resolve(null), {
once: true,
})
})Testing gaps
Brief takeThe architectural direction is strong, but there are a few stream-lifecycle edge cases that could cause incorrect completion/loading behavior or wrong message-role reconciliation in durable/replay scenarios. |
|
Opus review: ReviewOverall, this is a well-structured PR. The architectural decision to unify around I have a few findings of my own, and I'll comment on the other review's findings where I agree or disagree. 1. Double
|
Plan: Address PR Review Findings for Resumable SessionsFix 1: Guard against double
|
|
All fixes implemented and verified. Here's a summary of the changes: Summary Fix 1: Guard against double onStreamEnd (High) processor.ts:651 — Added if (state.isComplete) return in handleTextMessageEndEvent to prevent firing onStreamEnd twice when RUN_FINISHED arrives before TEXT_MESSAGE_END. Fix 2: Clear dead waiters on subscribe exit (High) session-adapter.ts:77 — Added waiters.length = 0 in subscribe cleanup to prevent dead resolve functions from silently dropping chunks on the next subscription cycle. Fix 3: Reset transient state on MESSAGES_SNAPSHOT (Medium) processor.ts:678-681 — Clear messageStates, activeMessageIds, toolCallToMessage, and pendingManualMessageId when a snapshot replaces messages, preventing stale state Fix 4: Remove startAssistantMessage() from streamResponse() (Medium) chat-client.ts — Three changes:
Bonus fix in processor.ts:905-910 — Made handleStepFinishedEvent use ensureAssistantMessage() instead of silently returning when no active assistant exists, matching the Fix 5: Clean up abort listeners on normal resolution (Low) session-adapter.ts:65-72 — Wrapped the waiter callback to removeEventListener when resolved normally, preventing abort listener accumulation on long-lived subscriptions. New Tests
|
Widen TextMessageStartEvent.role to accept all message roles and add optional parentMessageId to ToolCallStartEvent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace single-message instance variables with a Map<string, MessageStreamState> keyed by messageId. Add explicit handlers for TEXT_MESSAGE_START, TEXT_MESSAGE_END, and STATE_SNAPSHOT events. Route tool calls via toolCallToMessage mapping. Maintains backward compat: startAssistantMessage() sets pendingManualMessageId which TEXT_MESSAGE_START associates with. ensureAssistantMessage() auto-creates state for streams without TEXT_MESSAGE_START. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add MessagesSnapshotEvent as a first-class AG-UI event type for conversation hydration. Replace the previous STATE_SNAPSHOT handler (which extracted messages from arbitrary state) with a dedicated MESSAGES_SNAPSHOT handler that accepts a typed messages array. - Add MessagesSnapshotEvent type to AGUIEventType and AGUIEvent unions - Add MESSAGES_SNAPSHOT case in StreamProcessor.processChunk() - Remove STATE_SNAPSHOT handler (falls through to default no-op) - Fix onStreamEnd to fire per-message (not only when no active messages remain) - Fix getActiveAssistantMessageId to return on first reverse match - Fix ensureAssistantMessage to emit onStreamStart and onMessagesChange - Add proposal docs for resumeable session support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…on model Replace direct ConnectionAdapter usage in ChatClient with a SessionAdapter-based subscription loop. When only a ConnectionAdapter is provided, it is wrapped in a DefaultSessionAdapter internally. This enables persistent session support while preserving existing timing semantics and backwards compatibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…unter reload() now cancels the active stream (abort controllers, subscription, processing promise) before starting a new one. A stream generation counter prevents a superseded stream's async cleanup from clobbering the new stream's state (abortController, isLoading, processor). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Guard against double onStreamEnd when RUN_FINISHED arrives before TEXT_MESSAGE_END - Clear dead waiters on subscribe exit to prevent chunk loss on reconnection - Reset transient processor state (messageStates, activeMessageIds, etc.) on MESSAGES_SNAPSHOT - Remove optimistic startAssistantMessage() from streamResponse(); let stream events create the message naturally via TEXT_MESSAGE_START or ensureAssistantMessage() - Clean up abort listeners on normal waiter resolution to prevent listener accumulation - Make handleStepFinishedEvent use ensureAssistantMessage() for backward compat with streams that lack TEXT_MESSAGE_START Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r race Reset processor stream state (prepareAssistantMessage) in streamResponse() before the subscription loop, preventing stale messageStates from blocking new assistant message creation on reload. Rewrite createDefaultSession with per-subscribe queue isolation: each subscribe() synchronously installs fresh buffer/waiters, drains pre-buffered chunks via splice(0), and removes async cleanup that raced with new subscription cycles. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5e71198 to
8c628ee
Compare
|
Superseded by TanStack/ai PR |
This PR introduces a
SessionAdapterabstraction and refactors the ChatClient to support durable sessions, where conversation state survives page reloads, network interruptions and reconnections.The core idea: instead of the
ChatClientowning the request-response lifecycle for each LLM call, it delegates to aSessionAdapterthat decouples sending messages from receiving chunks. Chunks arrive through a persistent subscription rather than being returned from the send call. This inversion makes it possible for an external session backend to replay missed events, inject message snapshots, and manage the stream lifecycle independently.What changes
1. Per-message stream state in
StreamProcessor(@tanstack/ai)The
StreamProcessorpreviously tracked a single "current assistant message" with flat instance variables (totalTextContent,toolCalls, etc.). This assumed one message streams at a time.Now it maintains a
Map<string, MessageStreamState>keyed bymessageId, with aSet<string>of active message IDs and atoolCallId → messageIdrouting map. This allows:TEXT_MESSAGE_START/ content /TEXT_MESSAGE_ENDevents)TEXT_MESSAGE_STARTrather than externalstartAssistantMessage()callsTEXT_MESSAGE_ENDfiresonStreamEndfor that message)Backward compatibility is preserved:
startAssistantMessage()still works by setting apendingManualMessageIdthat gets reconciled whenTEXT_MESSAGE_STARTarrives, or operates standalone if it never does.2. AG-UI type alignment (
@tanstack/ai)Three additive type changes to match the AG-UI protocol:
TextMessageStartEvent.rolewidened from'assistant'to'user' | 'assistant' | 'system' | 'tool'— session backends can replay messages of any roleToolCallStartEvent.parentMessageIdadded — enables routing tool calls to the correct message without relying on "current" message stateMessagesSnapshotEventadded — a first-class AG-UI event for hydrating the full conversation transcript on connect/reconnect (distinct fromSTATE_SNAPSHOTwhich carries arbitrary application state)3.
SessionAdapterinterface andcreateDefaultSession(@tanstack/ai-client)subscribe()returns a long-lived async iterable of AG-UI events.send()dispatches messages — responses arrive throughsubscribe(), not as a return value.createDefaultSession(connection)wraps the existingConnectionAdapterin this interface using an async queue:send()callsconnection.connect()and pushes chunks to the queue;subscribe()yields them. This means existingConnectionAdapterusers get the new architecture transparently.4.
ChatClientrefactored to useSessionAdapter(@tanstack/ai-client)The constructor resolves a
SessionAdapter— either provided directly via a newsessionoption, or by wrapping theconnectionoption withcreateDefaultSession. A background subscription loop (consumeSubscription) reads fromsubscribe()and feeds chunks to theStreamProcessor.streamResponse()now coordinates with this loop: it callssession.send()to push chunks into the subscription, then awaits aprocessingResolvepromise that fires whenonStreamEndis triggered by the processor. AstreamGenerationcounter prevents stale stream cleanup from clobbering a newer stream (e.g. whenreload()is called during an active stream).5. Framework hooks thread
sessionoption (ai-react,ai-solid,ai-vue,ai-svelte,ai-preact)Each framework's
useChat/createChathook now passesoptions.sessionto theChatClientconstructor. SinceChatClientOptionsaccepts eitherconnectionorsession, this flows through the existing options type with no additional framework-level changes.Why this approach
The key design decision is Unified SessionAdapter (the
ChatClientalways operates through aSessionAdapter). The alternative — havingChatClientconditionally use either aConnectionAdapterorSessionAdapter— would fork every method that touches the stream lifecycle. By always going throughSessionAdapter, theChatClienthas a single code path, andcreateDefaultSessionmakes the wrapping invisible to existing users.The per-message state refactor is a prerequisite because durable session backends emit
TEXT_MESSAGE_STARTevents for each message (potentially replaying an entire conversation). The processor needs to create and track messages from these events rather than relying on the client to callstartAssistantMessage()in advance.Migration
Existing usage is unchanged:
New session mode:
For example:
(Where
connectUrlis an endpoint to authorize session access. If authorized, the session proxy will handle fetching / materializing message history from the session stream).What's next
This PR establishes the transport and processing foundation. Follow-up work:
@durable-streams/sessions/tanstack-ai— a concreteSessionAdapterimplementation for durable stream backendsSTATE_SNAPSHOT/STATE_DELTAhandling with a managedsessionStatecontainer onChatClientonSessionStateChangeandonCustomEventcallbacks for application-specific eventsisLoadingtrackingTextEngine