feat(observability): add instrumented_chat_stream for LiteLLM streaming metrics#361
feat(observability): add instrumented_chat_stream for LiteLLM streaming metrics#361joshua0chen wants to merge 1 commit into
Conversation
|
This PR is targeting The
See |
| async def instrumented_chat_stream( | ||
| raw_stream: AsyncIterator, | ||
| response: Response, | ||
| model_name: str, | ||
| ) -> AsyncIterator[TResponseStreamEvent]: |
There was a problem hiding this comment.
model_name vs agent.model metric attribute mismatch
instrumented_chat_stream records TTFT/TTAT/TPS under {"model": model_name}, while LLMMetricsHooks.on_llm_end records requests and token counters under {"model": str(agent.model) if agent.model else "unknown"}. If callers pass the model identifier in a different format than what agent.model produces (e.g., "openai/gpt-4" vs "gpt-4"), the two metric families will have a different model label value and cannot be correlated in dashboards. The docstring should specify the expected format — or accept the model from the same source as agent.model to guarantee alignment.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/observability/instrumented_chat_stream.py
Line: 54-58
Comment:
**`model_name` vs `agent.model` metric attribute mismatch**
`instrumented_chat_stream` records TTFT/TTAT/TPS under `{"model": model_name}`, while `LLMMetricsHooks.on_llm_end` records requests and token counters under `{"model": str(agent.model) if agent.model else "unknown"}`. If callers pass the model identifier in a different format than what `agent.model` produces (e.g., `"openai/gpt-4"` vs `"gpt-4"`), the two metric families will have a different `model` label value and cannot be correlated in dashboards. The docstring should specify the expected format — or accept the model from the same source as `agent.model` to guarantee alignment.
How can I resolve this? If you propose a fix, please make it concise.| # --- Token detail counters ------------------------------------------- | ||
| # Prefer _hidden_params["usage"] (reconstructed by stream_chunk_builder | ||
| # with all detail fields) over raw per-chunk usage. | ||
| if _last_hidden_params is not None: | ||
| hp_usage = _last_hidden_params.get("usage") | ||
| if hp_usage is not None: | ||
| raw_usage = hp_usage | ||
|
|
||
| cached_tokens = 0 | ||
| reasoning_tokens = 0 | ||
| if raw_usage is not None: | ||
| # prompt_tokens_details.cached_tokens (standard OpenAI field) | ||
| ptd = getattr(raw_usage, "prompt_tokens_details", None) | ||
| if ptd is not None: | ||
| cached_tokens = getattr(ptd, "cached_tokens", 0) or 0 | ||
| # Fallback: LiteLLM PrivateAttr _cache_read_input_tokens | ||
| if not cached_tokens: | ||
| cached_tokens = getattr(raw_usage, "_cache_read_input_tokens", 0) or 0 | ||
|
|
||
| ctd = getattr(raw_usage, "completion_tokens_details", None) | ||
| if ctd is not None: | ||
| reasoning_tokens = getattr(ctd, "reasoning_tokens", 0) or 0 | ||
|
|
||
| if cached_tokens > 0: | ||
| m.cached_input_tokens.add(cached_tokens, attrs) | ||
| if reasoning_tokens > 0: | ||
| m.reasoning_tokens.add(reasoning_tokens, attrs) |
There was a problem hiding this comment.
Potential double-counting of
cached_input_tokens and reasoning_tokens
LLMMetricsHooks.on_llm_end also calls m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs) and m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs). The PR's invariant — that LiteLLM strips these detail fields from the assembled ModelResponse so on_llm_end will always see None and skip them — is not codified in the code or tests. If a future LiteLLM version or a different provider does populate input_tokens_details in the assembled response, both paths will .add() to the same OTel counter for the same request, doubling the values. Adding a brief inline comment (or an integration test assertion) that documents why double-counting cannot occur would make this assumption explicit and catch regressions early.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/observability/instrumented_chat_stream.py
Line: 145-171
Comment:
**Potential double-counting of `cached_input_tokens` and `reasoning_tokens`**
`LLMMetricsHooks.on_llm_end` also calls `m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs)` and `m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs)`. The PR's invariant — that LiteLLM strips these detail fields from the assembled `ModelResponse` so `on_llm_end` will always see `None` and skip them — is not codified in the code or tests. If a future LiteLLM version or a different provider does populate `input_tokens_details` in the assembled response, both paths will `.add()` to the same OTel counter for the same request, doubling the values. Adding a brief inline comment (or an integration test assertion) that documents why double-counting cannot occur would make this assumption explicit and catch regressions early.
How can I resolve this? If you propose a fix, please make it concise.
Summary: Adds instrumented_chat_stream, an async generator wrapper that instruments a LiteLLM ChatCompletions stream with OTel metrics (TTFT, TTAT, TPS, cached input tokens, reasoning tokens)
Context:
Currently we have two LLM streaming paths:
TemporalStreamingModel, which has inline ttft/ttat/tps instrumentation (done in feat(streaming): emit OTel metrics for ttft, tps, token counts #347)ChatCmplStreamHandler, with no SDK-level streaming metricsFor the second path, the existing metrics infrastructure covers token counters but not streaming latency:
Problem:
LLMMetricsHooks.on_llm_endfires after the Runner fully consumes the stream and assembles a final ModelResponse. It records requests, input_tokens, output_tokens, cached_input_tokens, and reasoning_token, but it cannot record TTFT/TTAT/TPS because it never sees individual chunks or their arrival times.Solution:
This PR introduces a reusable SDK helper for agents using LiteLLM streaming
Usage Example (DUA):
https://github.com/scaleapi/agentex-agents/pull/1556
Greptile Summary
This PR introduces
instrumented_chat_stream, an async generator wrapper that augments LiteLLM ChatCompletions streaming calls with OTel metrics (TTFT, TTAT, TPS, cached-token, and reasoning-token counts), filling the gap left byLLMMetricsHooks.on_llm_endwhich cannot observe individual streaming chunks.ChatCmplStreamHandler.handle_stream, capturingtime.perf_counter()timestamps on eachResponseTextDeltaEvent,ResponseReasoningTextDeltaEvent, andResponseFunctionCallArgumentsDeltaEventto derive per-request TTFT, TTAT, and TPS histograms.cached_input_tokens,reasoning_tokens) are extracted from LiteLLM's_hidden_paramsdict — a shared-by-reference object thatstream_chunk_builderpopulates after the stream ends — rather than from the assembled ModelResponse, where LiteLLM strips these fields.Confidence Score: 4/5
Safe to merge — the new file is additive and only fires when callers explicitly wrap their stream with it, leaving all existing paths unchanged.
The core instrumentation logic is sound: timing bookmarks are captured in the right order,
_hidden_paramsis accessed after iteration completes, and the fallback chain for token-detail extraction is well-documented. Two concerns: themodel_namelabel must match whatagent.modelproduces or latency and request/token metrics will diverge in dashboards; and the no-double-count guarantee for cached/reasoning tokens relies on an undocumented LiteLLM internal behavior that could change across upgrades.src/agentex/lib/core/observability/instrumented_chat_stream.py — verify the model_name convention matches agent.model and consider adding a comment or test to protect the no-double-count invariant.
Important Files Changed
model_namematchesagent.modelor timing and request metrics will be unjoined in dashboards. Potential future double-counting of cached/reasoning tokens if LiteLLM behavior changes.Sequence Diagram
sequenceDiagram participant Agent participant instrumented_chat_stream participant _usage_capturing_stream participant ChatCmplStreamHandler participant LiteLLM raw_stream Agent->>instrumented_chat_stream: async for event in ... Note over instrumented_chat_stream: stream_start = perf_counter() instrumented_chat_stream->>ChatCmplStreamHandler: handle_stream(response, _usage_capturing_stream()) loop Each chunk ChatCmplStreamHandler->>_usage_capturing_stream: __anext__() _usage_capturing_stream->>LiteLLM raw_stream: __anext__() LiteLLM raw_stream-->>_usage_capturing_stream: chunk (usage + _hidden_params captured) _usage_capturing_stream-->>ChatCmplStreamHandler: chunk ChatCmplStreamHandler-->>instrumented_chat_stream: TResponseStreamEvent alt TOKEN event (Text/Reasoning/FnCall delta) Note over instrumented_chat_stream: first_token_at / first_answer_at / last_token_at updated else ResponseCompletedEvent Note over instrumented_chat_stream: output_tokens_count captured end instrumented_chat_stream-->>Agent: yield event (unchanged) end Note over instrumented_chat_stream: finally block runs Note over instrumented_chat_stream: record ttft_ms, ttat_ms, tps Note over instrumented_chat_stream: extract cached/reasoning tokens from _hidden_params["usage"] Note over instrumented_chat_stream: record cached_input_tokens, reasoning_tokensPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "instrumented chat stream for observabili..." | Re-trigger Greptile