Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class PipelineConsumer implements Runnable {
P + "tool-use-result",
P + "tool-use-all-complete",
P + "tool-use-latency",
P + "session-cost",
P + "message-output"
);

Expand Down
2 changes: 0 additions & 2 deletions frontend/src/tabs/ExecutionTab.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ function buildRows(events: PipelineEvent[]): TableRow[] {
// Attach cost info to the enriched sub-row if available
if (currentMsgRow && v) {
const cost = v.cost != null ? Number(v.cost) : null;
const llmCalls = v.llm_calls != null ? Number(v.llm_calls) : null;
if (cost != null) {
// Update the enriched sub-row comment to include cost
const enrichedSub = currentMsgRow.subRows!.find(
Expand All @@ -156,7 +155,6 @@ function buildRows(events: PipelineEvent[]): TableRow[] {
const costStr = cost < 0.01 ? `$${cost.toFixed(6)}` : `$${cost.toFixed(4)}`;
const parts = [enrichedSub.comment];
parts.push(`cost: ${costStr}`);
if (llmCalls != null) parts.push(`${llmCalls} LLM call${llmCalls !== 1 ? "s" : ""}`);
enrichedSub.comment = parts.join(", ");
}
}
Expand Down
7 changes: 1 addition & 6 deletions frontend/src/tabs/PipelineEventsTab.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const TOPIC_LABELS: Record<string, string> = {
"tool-use-result": "Tool Result",
"tool-use-all-complete": "Tools Complete",
"tool-use-latency": "Tool Latency",
"session-cost": "Session Cost",
"message-output": "Final Output",
};

Expand All @@ -32,7 +31,6 @@ const TOPIC_ICONS: Record<string, string> = {
"tool-use-result": "RES",
"tool-use-all-complete": "ALL",
"tool-use-latency": "LAT",
"session-cost": "CST",
"message-output": "OUT",
};

Expand All @@ -46,7 +44,6 @@ const TOPIC_COLORS: Record<string, string> = {
"tool-use-result": "#10b981",
"tool-use-all-complete": "#14b8a6",
"tool-use-latency": "#06b6d4",
"session-cost": "#f97316",
"message-output": "#22c55e",
};

Expand Down Expand Up @@ -88,9 +85,7 @@ function eventSummary(event: PipelineEvent): string {
case "enriched-message-input":
return `history: ${Array.isArray(v.history) ? v.history.length : 0} items`;
case "session-context":
return `turns: ${v.llm_calls ?? 0}, cost: ${v.cost != null ? formatDollars(Number(v.cost)) : "-"}`;
case "session-cost":
return `cost: ${v.total_cost != null ? formatDollars(Number(v.total_cost)) : JSON.stringify(v).slice(0, 60)}`;
return `cost: ${v.cost != null ? formatDollars(Number(v.cost)) : "-"}`;
default:
return truncate(JSON.stringify(v), 80);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class MonitoringApp {
P + "tool-use-result",
P + "tool-use-all-complete",
P + "tool-use-latency",
P + "session-cost",
P + "message-output",
P + "session-end",
P + "memoir-context",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import io.flightdeck.streams.processors.EndTurnProcessor;
import io.flightdeck.streams.processors.EnrichInputMessageProcessor;
import io.flightdeck.streams.processors.ExtractToolUseItemsProcessor;
import io.flightdeck.streams.processors.SessionCostAggregationProcessor;
import io.flightdeck.streams.processors.MemoirSessionEndProcessor;
import io.flightdeck.streams.processors.SessionEndProcessor;
import io.flightdeck.streams.processors.TransformToolUseDoneProcessor;
import io.flightdeck.streams.model.SessionCost;
import io.flightdeck.streams.model.ThinkResponse;
import io.flightdeck.streams.serdes.JsonSerde;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -47,7 +45,7 @@ public class FlightDeckStreamsApp {

static final String MEMOIR_CONTEXT_STORE = "memoir-context-store";
static final String THINK_RESPONSE_STORE = "think-response-store";
static final String SESSION_COST_TABLE_STORE = "session-cost-table-store";


public static void main(String[] args) {
Properties props = buildConfig();
Expand Down Expand Up @@ -120,20 +118,9 @@ static Topology buildTopology(boolean memoirEnabled) {
.withValueSerde(JsonSerde.of(ThinkResponse.class))
);

// ── Shared KTable: session-cost (aggregated cost per session) ────────
KTable<String, SessionCost> sessionCostTable = builder.table(
Topics.SESSION_COST,
Consumed.with(Serdes.String(), JsonSerde.of(SessionCost.class)),
Materialized.<String, SessionCost>as(
Stores.persistentKeyValueStore(SESSION_COST_TABLE_STORE))
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(SessionCost.class))
);

// ── Register each processor fragment ──────────────────────────────────
EnrichInputMessageProcessor.register(builder, memoirTable, thinkTable, sessionCostTable);
EnrichInputMessageProcessor.register(builder, memoirTable, thinkTable);
ExtractToolUseItemsProcessor.register(builder, thinkStream);
SessionCostAggregationProcessor.register(builder, thinkStream);
EndTurnProcessor.register(builder, thinkStream);
AggregateToolExecutionResultProcessor.register(builder);
TransformToolUseDoneProcessor.register(builder);
Expand Down Expand Up @@ -169,7 +156,6 @@ private static void ensureTopicsExist(Properties streamsProps) {
Topics.TOOL_USE_DLQ,
Topics.TOOL_USE_RESULT,
Topics.TOOL_USE_ALL_COMPLETE,
Topics.SESSION_COST,
Topics.TOOL_USE_LATENCY,
Topics.MESSAGE_OUTPUT
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ private static String requireEnv(String key) {
public static final String TOOL_USE_ALL_COMPLETE = PREFIX + "tool-use-all-complete";

// ── Observability ─────────────────────────────────────────────────────────
/** Aggregated cost (tokens × pricing) per conversation session */
public static final String SESSION_COST = PREFIX + "session-cost";

/** Per-tool latency metrics, keyed by tool_name */
public static final String TOOL_USE_LATENCY = PREFIX + "tool-use-latency";

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
public record ThinkResponse(
@JsonProperty("session_id") String sessionId,
@JsonProperty("user_id") String userId,
@JsonProperty("cost") Double cost,
@JsonProperty("prev_session_cost") Double prevSessionCost,
@JsonProperty("input_tokens") int inputTokens,
@JsonProperty("output_tokens") int outputTokens,
@JsonProperty("total_session_cost") Double totalSessionCost,
@JsonProperty("previous_session_cost") Double previousSessionCost,
@JsonProperty("think_cost") Double thinkCost,
@JsonProperty("think_input_tokens") int thinkInputTokens,
@JsonProperty("think_output_tokens") int thinkOutputTokens,
@JsonProperty("previous_messages") List<MessageInput> previousMessages,
@JsonProperty("last_input_message") MessageInput lastInputMessage,
@JsonProperty("last_input_response") List<MessageInput> lastInputResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public record UserResponse(
@JsonProperty("session_id") String sessionId,
@JsonProperty("user_id") String userId,
@JsonProperty("content") String content,
@JsonProperty("llm_calls") int llmCalls,
@JsonProperty("input_tokens") int inputTokens,
@JsonProperty("output_tokens") int outputTokens,
@JsonProperty("cost") Double cost,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ static UserResponse toUserResponse(String sessionId, ThinkResponse response) {
sessionId,
response.userId(),
content,
1, // this response represents one LLM call
response.inputTokens(),
response.outputTokens(),
totalCost(response.prevSessionCost(), response.cost()),
response.thinkInputTokens(),
response.thinkOutputTokens(),
response.totalSessionCost(),
sourceAgent,
Instant.now().toString()
);
Expand All @@ -147,16 +146,6 @@ static UserResponse toUserResponse(String sessionId, ThinkResponse response) {
* Returns an empty string if the list is null, empty, or contains no
* assistant messages.
*/
/**
* Computes total session cost: prev_session_cost + current call cost.
* Returns null if both are null.
*/
static Double totalCost(Double prevSessionCost, Double callCost) {
if (prevSessionCost == null && callCost == null) return null;
return (prevSessionCost != null ? prevSessionCost : 0.0)
+ (callCost != null ? callCost : 0.0);
}

static String assembleContent(List<MessageInput> messages) {
if (messages == null || messages.isEmpty()) return "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.flightdeck.streams.config.Topics;
import io.flightdeck.streams.model.MessageInput;
import io.flightdeck.streams.model.FullSessionContext;
import io.flightdeck.streams.model.SessionCost;
import io.flightdeck.streams.model.ThinkResponse;
import io.flightdeck.streams.serdes.JsonSerde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -27,13 +26,10 @@
* │◄────────────── think-request-response (KTable — previous turn's full state)
* │
* │ leftJoin
* │◄────────────── session-cost (KTable — aggregated cost per session)
* │
* │ leftJoin
* │◄────────────── memoir-context (KTable — long-term memoir, shared)
* │
* ▼
* enriched-message-input (KStream — history + cost + memoir + latest input)
* enriched-message-input (KStream — history + memoir + latest input)
* </pre>
*
* <p>History is reconstructed from the previous ThinkResponse:
Expand All @@ -46,12 +42,10 @@ public class EnrichInputMessageProcessor {
/**
* @param memoirTable shared KTable for memoir-context (keyed by userId)
* @param thinkTable shared KTable for think-request-response (keyed by sessionId)
* @param sessionCostTable shared KTable for session-cost (keyed by sessionId)
*/
public static void register(StreamsBuilder builder,
KTable<String, String> memoirTable,
KTable<String, ThinkResponse> thinkTable,
KTable<String, SessionCost> sessionCostTable) {
KTable<String, ThinkResponse> thinkTable) {

// ── Left side: incoming user messages ────────────────────────────────
KStream<String, MessageInput> inputStream = builder.stream(
Expand All @@ -75,18 +69,6 @@ public static void register(StreamsBuilder builder,
)
);

// ── Join: enriched ⟕ session-cost (attach aggregated cost) ──────────
enriched = enriched
.leftJoin(
sessionCostTable,
EnrichInputMessageProcessor::enrichWithCost,
Joined.with(
Serdes.String(),
JsonSerde.of(FullSessionContext.class),
JsonSerde.of(SessionCost.class)
)
);

// If memoir is enabled, re-key by userId, join with memoir, re-key back
if (memoirTable != null) {
enriched = enriched
Expand Down Expand Up @@ -144,30 +126,14 @@ static FullSessionContext enrichWithThinkResponse(MessageInput message, ThinkRes
return new FullSessionContext(
message.sessionId(),
userId,
null,
(prevResponse != null) ? prevResponse.totalSessionCost() : null,
history,
message,
null,
Instant.now().toString()
);
}

/**
* Join: attach aggregated session cost from session-cost KTable.
*/
static FullSessionContext enrichWithCost(FullSessionContext enriched, SessionCost sessionCost) {
Double cost = (sessionCost != null) ? sessionCost.estimatedCostUsd() : null;
return new FullSessionContext(
enriched.sessionId(),
enriched.userId(),
cost,
enriched.history(),
enriched.latestInput(),
enriched.memoirContext(),
enriched.timestamp()
);
}

/**
* Join: attach memoir context to the already-enriched session context.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ static MemoirSessionEnd buildSnapshot(ThinkResponse think, String memoirCtx) {
if (think.lastInputResponse() != null) fullHistory.addAll(think.lastInputResponse());

ThinkResponse asResponse = new ThinkResponse(
think.sessionId(), think.userId(), think.cost(), think.prevSessionCost(),
0, 0,
think.sessionId(), think.userId(), think.totalSessionCost(), think.previousSessionCost(),
think.thinkCost(), 0, 0,
fullHistory, // previousMessages = full history for memoir
null, null, // lastInputMessage, lastInputResponse not needed for memoir
null, true, false, 0, 0, 0.0, Instant.now().toString());
Expand Down
Loading
Loading