From 0593b385d87582c9b6d02879baee025fb876a82c Mon Sep 17 00:00:00 2001 From: Hendrik Mans Date: Sat, 21 Mar 2026 09:22:51 +0100 Subject: [PATCH] fix: prevent subscription resolvers from dropping notifications under backpressure (Refs: beans-x0zh) - Change AgentSessionChanged and ActiveAgentStatuses resolvers to use a latest-value pattern instead of blocking on send - When a new notification arrives while the resolver is blocked sending to the WebSocket transport, refresh the pending state so the frontend always receives the most recent session status - Add test for latest-value delivery under rapid notifications --- ...ption-resolver-dropping-notifications-u.md | 19 +++++ internal/graph/schema.resolvers.go | 82 +++++++++++++------ internal/graph/schema.resolvers_test.go | 46 +++++++++++ 3 files changed, 121 insertions(+), 26 deletions(-) create mode 100644 .beans/beans-x0zh--fix-subscription-resolver-dropping-notifications-u.md diff --git a/.beans/beans-x0zh--fix-subscription-resolver-dropping-notifications-u.md b/.beans/beans-x0zh--fix-subscription-resolver-dropping-notifications-u.md new file mode 100644 index 00000000..c5c94886 --- /dev/null +++ b/.beans/beans-x0zh--fix-subscription-resolver-dropping-notifications-u.md @@ -0,0 +1,19 @@ +--- +# beans-x0zh +title: Fix subscription resolver dropping notifications under backpressure +status: completed +type: bug +priority: normal +created_at: 2026-03-21T08:16:46Z +updated_at: 2026-03-21T08:21:04Z +--- + +The AgentSessionChanged and ActiveAgentStatuses subscription resolvers block on sending to the out channel. While blocked, new notifications on the buffer-1 ch channel are silently dropped. This can cause the frontend to miss entire RUNNING status transitions, resulting in the agent working indicator disappearing while the agent is still active. Fix by using a latest-value pattern that refreshes pending state when new notifications arrive during a blocked send. + +## Summary of Changes + +Changed the AgentSessionChanged and ActiveAgentStatuses subscription resolvers from a simple block-on-send pattern to a latest-value pattern. When the resolver is blocked sending a payload to the WebSocket transport and a new notification arrives, it now refreshes the pending state with the latest session data instead of dropping the notification. This ensures the frontend always receives the most recent session status, preventing the 'Agent is working' spinner from disappearing while the agent is actively working. + +- Modified AgentSessionChanged resolver in schema.resolvers.go +- Modified ActiveAgentStatuses resolver in schema.resolvers.go +- Added test TestAgentSessionSubscription_DeliversLatestState diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 8e99a268..ea8f794b 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -1508,6 +1508,19 @@ func (r *subscriptionResolver) AgentSessionChanged(ctx context.Context, beanID s defer r.AgentMgr.Unsubscribe(beanID, ch) defer close(out) + fetchState := func() *model.AgentSession { + if s := r.AgentMgr.GetSession(beanID); s != nil { + return agentSessionToModel(s) + } + // Session was cleared — send an empty session so the UI resets + return &model.AgentSession{ + BeanID: beanID, + AgentType: "claude", + Status: model.AgentSessionStatusIdle, + Messages: []*model.AgentMessage{}, + } + } + // Emit current state immediately (if session exists) if s := r.AgentMgr.GetSession(beanID); s != nil { select { @@ -1517,30 +1530,34 @@ func (r *subscriptionResolver) AgentSessionChanged(ctx context.Context, beanID s } } - // Then emit on each change + // Use a latest-value pattern: if new notifications arrive while + // we're blocked sending to out, refresh the pending state so the + // frontend always receives the most recent session state. This + // prevents missed status transitions (e.g. IDLE→RUNNING) when + // the WebSocket transport is slow to consume. + var pending *model.AgentSession for { - select { - case <-ctx.Done(): - return - case _, ok := <-ch: - if !ok { + if pending == nil { + // Wait for a notification + select { + case <-ctx.Done(): return - } - s := r.AgentMgr.GetSession(beanID) - var ms *model.AgentSession - if s != nil { - ms = agentSessionToModel(s) - } else { - // Session was cleared — send an empty session so the UI resets - ms = &model.AgentSession{ - BeanID: beanID, - AgentType: "claude", - Status: model.AgentSessionStatusIdle, - Messages: []*model.AgentMessage{}, + case _, ok := <-ch: + if !ok { + return } + pending = fetchState() } + } else { + // Try to send, but refresh if a new notification arrives select { - case out <- ms: + case out <- pending: + pending = nil + case _, ok := <-ch: + if !ok { + return + } + pending = fetchState() case <-ctx.Done(): return } @@ -1573,17 +1590,30 @@ func (r *subscriptionResolver) ActiveAgentStatuses(ctx context.Context) (<-chan return } - // Then emit on each change + // Latest-value pattern (same as AgentSessionChanged above): + // refresh pending state when new notifications arrive during + // a blocked send. + var pending []*model.ActiveAgentStatus for { - select { - case <-ctx.Done(): - return - case _, ok := <-ch: - if !ok { + if pending == nil { + select { + case <-ctx.Done(): return + case _, ok := <-ch: + if !ok { + return + } + pending = activeAgentsToModel(r.AgentMgr.ListRunningSessions()) } + } else { select { - case out <- activeAgentsToModel(r.AgentMgr.ListRunningSessions()): + case out <- pending: + pending = nil + case _, ok := <-ch: + if !ok { + return + } + pending = activeAgentsToModel(r.AgentMgr.ListRunningSessions()) case <-ctx.Done(): return } diff --git a/internal/graph/schema.resolvers_test.go b/internal/graph/schema.resolvers_test.go index c66da690..95c5a304 100644 --- a/internal/graph/schema.resolvers_test.go +++ b/internal/graph/schema.resolvers_test.go @@ -3,10 +3,12 @@ package graph import ( "context" "errors" + "fmt" "os" "path/filepath" "strings" "testing" + "time" "github.com/hmans/beans/internal/agent" "github.com/hmans/beans/internal/graph/model" @@ -1145,6 +1147,50 @@ func TestSubscriptionBeanChanged(t *testing.T) { }) } +func TestAgentSessionSubscription_DeliversLatestState(t *testing.T) { + // Verify that when multiple rapid notifications occur, the subscriber + // eventually receives the latest state (not a stale intermediate). + mgr := agent.NewManager("", nil) + + beanID := "test-latest-value" + resolver := &Resolver{AgentMgr: mgr} + sr := resolver.Subscription() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := sr.AgentSessionChanged(ctx, beanID) + if err != nil { + t.Fatalf("AgentSessionChanged() error = %v", err) + } + + // No session exists yet, so no initial emission. Fire rapid + // notifications by adding multiple info messages without reading. + for i := range 10 { + mgr.AddInfoMessage(beanID, fmt.Sprintf("msg-%d", i)) + } + + // Drain the channel. Use a short idle timeout: once we stop receiving + // updates for 100ms, assume all coalesced notifications have been delivered. + var lastSession *model.AgentSession + for { + select { + case s := <-ch: + lastSession = s + case <-time.After(100 * time.Millisecond): + goto done + } + } +done: + if lastSession == nil { + t.Fatal("received no updates") + } + // The final state must include all info messages + if len(lastSession.Messages) != 10 { + t.Errorf("last update had %d messages, want 10", len(lastSession.Messages)) + } +} + func TestRelationshipFieldsWithFilter(t *testing.T) { resolver, core := setupTestResolver(t) ctx := context.Background()