Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ export namespace SessionProcessor {
return timeout
}

class StallError extends Error {
constructor(message: string) {
super(message)
this.name = "StallError"
}
}

function isStallError(error: unknown): error is StallError {
return error instanceof StallError
}

export type Info = Awaited<ReturnType<typeof create>>
export type Result = Awaited<ReturnType<Info["process"]>>

Expand All @@ -68,6 +79,8 @@ export namespace SessionProcessor {
log.info("process")
needsCompaction = false
const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true
const session = await Session.get(input.sessionID)
let stallErr: StallError | undefined
while (true) {
try {
let currentText: MessageV2.TextPart | undefined
Expand All @@ -78,10 +91,10 @@ export namespace SessionProcessor {

for await (const value of stream.fullStream) {
input.abort.throwIfAborted()
if (Date.now() - lastTokenTime > stallTimeout) {
if (session.parentID && Date.now() - lastTokenTime > stallTimeout) {
log.warn("stall", { sessionID: input.sessionID, elapsed: Date.now() - lastTokenTime })
markSessionStalled(input.sessionID)
throw new Error(`LLM stream stalled: no tokens received for ${Math.round(stallTimeout / 60000)} minutes`)
throw new StallError(`LLM stream stalled: no tokens received for ${Math.round(stallTimeout / 60000)} minutes`)
}
switch (value.type) {
case "start":
Expand Down Expand Up @@ -325,7 +338,7 @@ export namespace SessionProcessor {
reason: value.finishReason,
snapshot: await Snapshot.track(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
sessionID: input.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
Expand Down Expand Up @@ -418,12 +431,17 @@ export namespace SessionProcessor {
}
if (needsCompaction) break
}
} catch (e: any) {
} catch (e: unknown) {
log.error("process", {
error: e,
stack: JSON.stringify(e.stack),
stack: JSON.stringify(e instanceof Error ? e.stack : undefined),
})
const error = MessageV2.fromError(e, { providerID: input.model.providerID })
// Stall errors should propagate out — they're not retryable
if (isStallError(e)) {
stallErr = e
break
}
const error = MessageV2.fromError(e as Error, { providerID: input.model.providerID })
if (MessageV2.ContextOverflowError.isInstance(error)) {
// TODO: Handle context overflow error
}
Expand Down Expand Up @@ -481,6 +499,7 @@ export namespace SessionProcessor {
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
if (stallErr) throw stallErr
if (needsCompaction) {
clearSessionStalled(input.sessionID)
return "compact"
Expand All @@ -504,4 +523,4 @@ export namespace SessionProcessor {

export function isSessionStalled(id: string): boolean {
return SessionProcessor.isSessionStalled(id)
}
}
229 changes: 48 additions & 181 deletions packages/opencode/test/session/processor-stall.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { describe, expect, test } from "bun:test"
import path from "path"
import { SessionProcessor, isSessionStalled } from "../../src/session/processor"
import { MessageV2 } from "../../src/session/message-v2"
import { Identifier } from "@/id/id"
import type { Provider } from "@/provider/provider"
import { Session } from "../../src/session"
import { LLM } from "../../src/session/llm"
import { Instance } from "../../src/project/instance"
import { Identifier } from "../../src/id/id"
import type {Provider} from "../../src/provider/provider"

const projectRoot = path.join(__dirname, "../..")

function createModel(): Provider.Model {
return {
Expand Down Expand Up @@ -66,110 +72,52 @@ describe("SessionProcessor stall detection configuration", () => {
})
})

describe("SessionProcessor stall detection behavior", () => {
test("processor can be created with abort signal for stall checks", () => {
const sessionID = Identifier.descending("session")
const msg = createAssistantMessage(sessionID)
const abort = new AbortController()

const processor = SessionProcessor.create({
assistantMessage: msg,
sessionID,
model: createModel(),
abort: abort.signal,
})

expect(processor).toBeDefined()
expect(processor.message.id).toBe(msg.id)
})

test("processor tracks lastTokenTime for stall detection", () => {
const sessionID = Identifier.descending("session")
const msg = createAssistantMessage(sessionID)
const abort = new AbortController()

const processor = SessionProcessor.create({
assistantMessage: msg,
sessionID,
model: createModel(),
abort: abort.signal,
describe("Stall detection parentID gating", () => {
test("subagent session (with parentID) evaluates stall condition correctly", async () => {
await Instance.provide({
directory: projectRoot,
fn: async () => {
const parent = await Session.create({})
const session = await Session.create({ parentID: parent.id })
expect(session.parentID).toBe(parent.id)

// Test the exact condition from processor.ts line 94:
// if (session.parentID && Date.now() - lastTokenTime > stallTimeout)
const lastTokenTime = Date.now() - 1000 // 1 second ago
const stallTimeout = 500 // 500ms timeout
const elapsed = Date.now() - lastTokenTime
const shouldStall = !!session.parentID && elapsed > stallTimeout

// Subagent with parentID: parentID is truthy, so full condition evaluates
expect(shouldStall).toBe(true)

await Session.remove(session.id)
await Session.remove(parent.id)
},
})

expect(processor).toBeDefined()
expect(abort.signal.aborted).toBe(false)
})

test("updates lastTokenTime on reasoning-delta events", async () => {
const now = Date.now()
const before = Date.now()

const lastTokenTime = now

const elapsed = Date.now() - lastTokenTime

expect(elapsed).toBeGreaterThanOrEqual(0)
expect(elapsed).toBeLessThan(1000)
})

test("updates lastTokenTime on text-delta events", async () => {
const now = Date.now()
const before = Date.now()

const lastTokenTime = now
await new Promise((resolve) => setTimeout(resolve, 10))

const lastTokenTime2 = Date.now()
const elapsed = lastTokenTime2 - lastTokenTime

expect(elapsed).toBeGreaterThanOrEqual(5)
})

test("stall error includes timeout duration in message", async () => {
const stallTimeout = 180000
const minutes = Math.round(stallTimeout / 60000)

const error = new Error(`LLM stream stalled: no tokens received for ${minutes} minutes`)

expect(error.message).toContain("stalled")
expect(error.message).toContain("3 minutes")
})

test("stall calculation uses Date.now() - lastTokenTime", async () => {
const lastTokenTime = Date.now()
const elapsed = Date.now() - lastTokenTime

expect(elapsed).toBeGreaterThanOrEqual(0)
expect(elapsed).toBeLessThan(100)
})

test("tool-call events update lastTokenTime", async () => {
const lastTokenTime = Date.now()
await new Promise((resolve) => setTimeout(resolve, 5))

const lastTokenTime2 = Date.now()
const elapsed = lastTokenTime2 - lastTokenTime

expect(elapsed).toBeGreaterThanOrEqual(3)
})

test("tool-result events update lastTokenTime", async () => {
const lastTokenTime = Date.now()
await new Promise((resolve) => setTimeout(resolve, 5))

const lastTokenTime2 = Date.now()
const elapsed = lastTokenTime2 - lastTokenTime
test("root session (no parentID) does NOT evaluate stall condition", async () => {
await Instance.provide({
directory: projectRoot,
fn: async () => {
const session = await Session.create({})
expect(session.parentID).toBeUndefined()

expect(elapsed).toBeGreaterThanOrEqual(3)
})

test("tool-error events update lastTokenTime", async () => {
const lastTokenTime = Date.now()
await new Promise((resolve) => setTimeout(resolve, 5))
// Test the exact condition from processor.ts line 94:
// if (session.parentID && Date.now() - lastTokenTime > stallTimeout)
const lastTokenTime = Date.now() - 1000 // 1 second ago
const stallTimeout = 500 // 500ms timeout
const elapsed = Date.now() - lastTokenTime
const shouldStall = !!session.parentID && elapsed > stallTimeout

const lastTokenTime2 = Date.now()
const elapsed = lastTokenTime2 - lastTokenTime
// Root session: parentID is undefined/falsy, so short-circuits to false
// even though elapsed > stallTimeout would be true
expect(shouldStall).toBe(false)

expect(elapsed).toBeGreaterThanOrEqual(3)
await Session.remove(session.id)
},
})
})
})

Expand Down Expand Up @@ -203,89 +151,8 @@ describe("Stall timeout validation", () => {
})
})

describe("Stall detection integration with abort signal", () => {
test("abort signal is checked before stall check", () => {
const abort = new AbortController()
const processor = SessionProcessor.create({
assistantMessage: createAssistantMessage("test-session"),
sessionID: "test-session",
model: createModel(),
abort: abort.signal,
})

expect(abort.signal.aborted).toBe(false)

abort.abort()

expect(abort.signal.aborted).toBe(true)
expect(() => abort.signal.throwIfAborted()).toThrow()
})

test("abort takes precedence over stall check", () => {
const abort = new AbortController()
const processor = SessionProcessor.create({
assistantMessage: createAssistantMessage("test-session"),
sessionID: "test-session",
model: createModel(),
abort: abort.signal,
})

abort.abort()

expect(abort.signal.aborted).toBe(true)
expect(() => abort.signal.throwIfAborted()).toThrow(DOMException)
})
})

describe("Stall detection event coverage", () => {
const activityEventTypes = [
"reasoning-delta",
"text-delta",
"tool-call",
"tool-result",
"tool-error",
]

test("all LLM activity events are tracked for stall detection", () => {
expect(activityEventTypes).toContain("reasoning-delta")
expect(activityEventTypes).toContain("text-delta")
expect(activityEventTypes).toContain("tool-call")
expect(activityEventTypes).toContain("tool-result")
expect(activityEventTypes).toContain("tool-error")
expect(activityEventTypes).toHaveLength(5)
})
})

describe("Stall timeout validation", () => {
test("rejects invalid OPENCODE_STALL_TIMEOUT_MS values", () => {
const original = process.env.OPENCODE_STALL_TIMEOUT_MS

process.env.OPENCODE_STALL_TIMEOUT_MS = "abc"
const parsed = parseInt(process.env.OPENCODE_STALL_TIMEOUT_MS || "180000", 10)
expect(isNaN(parsed)).toBe(true)

process.env.OPENCODE_STALL_TIMEOUT_MS = "0"
const zero = parseInt(process.env.OPENCODE_STALL_TIMEOUT_MS || "180000", 10)
expect(zero <= 0).toBe(true)

process.env.OPENCODE_STALL_TIMEOUT_MS = "-1000"
const negative = parseInt(process.env.OPENCODE_STALL_TIMEOUT_MS || "180000", 10)
expect(negative <= 0).toBe(true)

if (original) process.env.OPENCODE_STALL_TIMEOUT_MS = original
else delete process.env.OPENCODE_STALL_TIMEOUT_MS
})
})

describe("Stalled sessions tracking", () => {
test("isSessionStalled reports stall status for sessions", () => {
const id = Identifier.descending("session")
// Note: markSessionStalled and clearSessionStalled are internal functions
// They are only called by the processor internally. The public API only
// exposes isSessionStalled for checking status. Testing the internal
// behavior through mark/clear would require exporting them or adding
// test helpers, which would defeat encapsulation.
// For now, we verify the function exists and has the correct type.
expect(typeof isSessionStalled).toBe("function")
expect(isSessionStalled("nonexistent-session")).toBe(false)
})
Expand Down
Loading