diff --git a/services/platform/convex/agent_tools/workflows/internal_mutations.ts b/services/platform/convex/agent_tools/workflows/internal_mutations.ts index 3a6ff9f99c..8c1f30dbd1 100644 --- a/services/platform/convex/agent_tools/workflows/internal_mutations.ts +++ b/services/platform/convex/agent_tools/workflows/internal_mutations.ts @@ -136,7 +136,7 @@ export const triggerWorkflowCompletionResponse = internalMutation({ promptMessageId, maxSteps: 20, userId: thread?.userId, - deadlineMs: Date.now() + 60_000, + deadlineMs: Date.now() + 420_000, }, ); }, diff --git a/services/platform/convex/lib/agent_completion/on_agent_complete.ts b/services/platform/convex/lib/agent_completion/on_agent_complete.ts index 1878b88c19..eed67435d7 100644 --- a/services/platform/convex/lib/agent_completion/on_agent_complete.ts +++ b/services/platform/convex/lib/agent_completion/on_agent_complete.ts @@ -7,11 +7,9 @@ * This function runs in action context and calls mutations as needed. */ -import { listMessages } from '@convex-dev/agent'; - import type { ActionCtx } from '../../_generated/server'; -import { components, internal } from '../../_generated/api'; +import { internal } from '../../_generated/api'; import { createDebugLog } from '../debug_log'; const debugLog = createDebugLog('DEBUG_AGENT_COMPLETION', '[AgentCompletion]'); @@ -26,6 +24,7 @@ type Usage = { export interface AgentResponseResult { threadId: string; + messageId?: string; text?: string; model?: string; provider?: string; @@ -78,74 +77,48 @@ export async function onAgentComplete( }); // Step 1: Save message metadata (unless skipped) - if (!options?.skipMetadata && result.usage) { - try { - // Find the first assistant message in the current response - // This matches the UIMessage.id logic used by @convex-dev/agent - const messages = await listMessages(ctx, components.agent, { - threadId, - paginationOpts: { cursor: null, numItems: 20 }, - excludeToolMessages: false, - }); - - const sortedMessages = messages.page.sort( - (a, b) => b._creationTime - a._creationTime, - ); - - const latestAssistantMessage = sortedMessages.find( - (msg) => msg.message?.role === 'assistant', - ); - - if (latestAssistantMessage) { - const currentOrder = latestAssistantMessage.order; - - // Find the FIRST message (by stepOrder) in this response group - const messagesInCurrentResponse = sortedMessages - .filter( - (msg) => - msg.order === currentOrder && - (msg.message?.role === 'assistant' || - msg.message?.role === 'tool'), - ) - .sort((a, b) => a.stepOrder - b.stepOrder); - - const firstMessageInResponse = messagesInCurrentResponse[0]; - - if (firstMessageInResponse) { - await ctx.runMutation( - internal.message_metadata.internal_mutations.saveMessageMetadata, - { - messageId: firstMessageInResponse._id, - threadId, - model: result.model, - provider: result.provider, - inputTokens: result.usage.inputTokens, - outputTokens: result.usage.outputTokens, - totalTokens: result.usage.totalTokens, - reasoningTokens: result.usage.reasoningTokens, - cachedInputTokens: result.usage.cachedInputTokens, - reasoning: result.reasoning, - durationMs: result.durationMs, - timeToFirstTokenMs: result.timeToFirstTokenMs, - toolsUsage: result.toolsUsage, - contextWindow: result.contextWindow, - contextStats: result.contextStats, - }, - ); - - debugLog('Metadata saved', { + if (!options?.skipMetadata) { + const messageId = result.messageId; + + if (messageId) { + try { + await ctx.runMutation( + internal.message_metadata.internal_mutations.saveMessageMetadata, + { + messageId, threadId, - agentType, - messageId: firstMessageInResponse._id, model: result.model, - }); - } + provider: result.provider, + inputTokens: result.usage?.inputTokens, + outputTokens: result.usage?.outputTokens, + totalTokens: result.usage?.totalTokens, + reasoningTokens: result.usage?.reasoningTokens, + cachedInputTokens: result.usage?.cachedInputTokens, + reasoning: result.reasoning, + durationMs: result.durationMs, + timeToFirstTokenMs: result.timeToFirstTokenMs, + toolsUsage: result.toolsUsage, + contextWindow: result.contextWindow, + contextStats: result.contextStats, + }, + ); + + debugLog('Metadata saved', { + threadId, + agentType, + messageId, + model: result.model, + }); + } catch (error) { + console.error(`[${agentType}] Failed to save message metadata:`, { + threadId, + error, + }); } - } catch (error) { - // Non-fatal: log and continue - console.error(`[${agentType}] Failed to save message metadata:`, { + } else { + debugLog('No messageId provided, skipping metadata save', { threadId, - error, + agentType, }); } } diff --git a/services/platform/convex/lib/agent_response/generate_response.ts b/services/platform/convex/lib/agent_response/generate_response.ts index 9a3554e02b..03e3df8b04 100644 --- a/services/platform/convex/lib/agent_response/generate_response.ts +++ b/services/platform/convex/lib/agent_response/generate_response.ts @@ -460,6 +460,10 @@ export async function generateAgentResponse( // Track time to first token for streaming let firstTokenTime: number | null = null; + // The first saved message ID for this generation, used for metadata and approval linking. + // Captured from the agent SDK's savedMessages before any retry logic can overwrite `result`. + let savedMessageId: string | undefined; + // Generate response - streaming or non-streaming let result: { text?: string; @@ -538,6 +542,8 @@ export async function generateAgentResponse( }, ); + savedMessageId = streamResult.savedMessages?.[0]?._id; + // Wait for stream to complete (with timeout) const [ streamText, @@ -736,6 +742,8 @@ export async function generateAgentResponse( abortController, ); + savedMessageId = generateResult.savedMessages?.[0]?._id; + debugLog('Generate completed', { threadId, elapsedMs: Date.now() - startTime, @@ -1134,6 +1142,7 @@ export async function generateAgentResponse( agentType, result: { threadId, + messageId: savedMessageId, text: responseResult.text, model: actualModel, provider, @@ -1148,43 +1157,19 @@ export async function generateAgentResponse( }); // Link approvals to message (only for main agent, not sub-agents) - if (!parentThreadId) { + if (!parentThreadId && savedMessageId) { try { - const messagesResult = await listMessages(ctx, components.agent, { - threadId, - paginationOpts: { cursor: null, numItems: 50 }, - excludeToolMessages: false, - }); - - const latestAssistantMessage = messagesResult.page.find( - (m: MessageDoc) => m.message?.role === 'assistant', + const linkedCount = await ctx.runMutation( + internal.approvals.internal_mutations.linkApprovalsToMessage, + { + threadId, + messageId: savedMessageId, + }, ); - - if (latestAssistantMessage) { - const currentOrder = latestAssistantMessage.order; - const messagesInSameOrder = messagesResult.page.filter( - (m: MessageDoc) => - m.order === currentOrder && m.message?.role !== 'user', - ); - - messagesInSameOrder.sort( - (a: MessageDoc, b: MessageDoc) => a.stepOrder - b.stepOrder, - ); - const firstMessageInOrder = - messagesInSameOrder[0] || latestAssistantMessage; - - const linkedCount = await ctx.runMutation( - internal.approvals.internal_mutations.linkApprovalsToMessage, - { - threadId, - messageId: firstMessageInOrder._id, - }, + if (linkedCount > 0) { + debugLog( + `Linked ${linkedCount} pending approvals to message ${savedMessageId}`, ); - if (linkedCount > 0) { - debugLog( - `Linked ${linkedCount} pending approvals to message ${firstMessageInOrder._id}`, - ); - } } } catch (error) { console.error(