feat(api): streaming agent turns — LlmClient.stream() + SSE chat endpoint #341

Merged
james merged 1 commit from 340-agent-streaming into main 2026-06-29 01:49:24 +00:00
Owner

Token-by-token streaming for the built-in agent (ADR-0029 §3): a stream() method on the LlmClient, a unified streaming agent-loop core, and content-negotiated SSE on the two conversation POST routes. No DB schema change (reuses conversations/messages). Part of epic #47.

What's in it

  • LlmClient.stream(req) (required interface method) yielding text_delta / tool_call / done events; generate() stays for non-loop callers. Implemented on both adapters:
    • Anthropic — SDK messages.stream(...): forwards text deltas live, accumulates tool_use blocks, emits done from normalizeResponse(stream.finalMessage()) (reused so done.resultgenerate()).
    • OpenAI-compatiblefetch with stream: true, hand-parsed SSE chunks (buffered line split, indexed tool_calls argument accumulation, finish_reason); a shared buildResult(...) keeps streaming and non-streaming in lockstep.
  • Unified loop — the turn body is now a single private async generator (driveLoopEvents). runTurn/resumeTurn keep their exact signatures and drain it (JSON behaviour byte-for-byte unchanged — the #339 loop tests stayed green untouched); streamTurn/streamResume forward its ConversationEvents. Same persistence order, MAX_TOOL_ROUNDS guard, and propose-then-confirm write-pause as before.
  • SSE on the two routes — content-negotiate on Accept: text/event-stream. Setup failures (404/401/409/llm_not_configured) stay real HTTP status codes via a pre-pull of the first event before the 200 is sent; once headers are out, a mid-stream provider failure surfaces as a terminal error event. JSON mode is unchanged.

Adapter event shape: text_delta | tool_call | done. Conversation/SSE event shape (lib/agent/events.ts): text_delta, tool_call, tool_result, message (carries toMessageDto, never the raw entity), awaiting_confirmation, done, error. SSE wire: event: <type>\ndata: <json>\n\n.

Verification (all run locally on this branch)

  • typecheck ✓ · lint ✓ (0 warnings)
  • test against both engines (ephemeral Postgres via TEST_POSTGRES_URL): 1206 passed, 0 skipped — Postgres leg ran, no engine-specific SQL; +23 tests (adapter streaming, streaming loop event sequences + persisted-row parity, SSE route incl. JSON-mode-unchanged and 401/404-as-JSON-not-SSE)
  • openapi:check ✓ up to date · openapi:coverage 116 pairs (SSE isn't a JSON DTO; no new schema)
  • semgrep full CI pack set ✓ 0 findings (incl. an explicit scan of the new/changed core files)

Out of scope

  • The chat UI client (next ticket consumes this). Voice. Effort/thinking tuning.

Closes #340

🤖 Generated with Claude Code

Token-by-token streaming for the built-in agent (ADR-0029 §3): a `stream()` method on the LlmClient, a unified streaming agent-loop core, and content-negotiated SSE on the two conversation POST routes. No DB schema change (reuses `conversations`/`messages`). Part of epic #47. ## What's in it - **`LlmClient.stream(req)`** (required interface method) yielding `text_delta` / `tool_call` / `done` events; `generate()` stays for non-loop callers. Implemented on both adapters: - **Anthropic** — SDK `messages.stream(...)`: forwards text deltas live, accumulates `tool_use` blocks, emits `done` from `normalizeResponse(stream.finalMessage())` (reused so `done.result` ≡ `generate()`). - **OpenAI-compatible** — `fetch` with `stream: true`, hand-parsed SSE chunks (buffered line split, indexed `tool_calls` argument accumulation, `finish_reason`); a shared `buildResult(...)` keeps streaming and non-streaming in lockstep. - **Unified loop** — the turn body is now a single private async generator (`driveLoopEvents`). `runTurn`/`resumeTurn` keep their exact signatures and **drain** it (JSON behaviour byte-for-byte unchanged — the #339 loop tests stayed green untouched); `streamTurn`/`streamResume` forward its `ConversationEvent`s. Same persistence order, `MAX_TOOL_ROUNDS` guard, and propose-then-confirm write-pause as before. - **SSE on the two routes** — content-negotiate on `Accept: text/event-stream`. Setup failures (404/401/409/`llm_not_configured`) stay real HTTP status codes via a **pre-pull** of the first event before the 200 is sent; once headers are out, a mid-stream provider failure surfaces as a terminal `error` event. JSON mode is unchanged. Adapter event shape: `text_delta` | `tool_call` | `done`. Conversation/SSE event shape (`lib/agent/events.ts`): `text_delta`, `tool_call`, `tool_result`, `message` (carries `toMessageDto`, never the raw entity), `awaiting_confirmation`, `done`, `error`. SSE wire: `event: <type>\ndata: <json>\n\n`. ## Verification (all run locally on this branch) - `typecheck` ✓ · `lint` ✓ (0 warnings) - `test` against **both engines** (ephemeral Postgres via `TEST_POSTGRES_URL`): **1206 passed, 0 skipped** — Postgres leg ran, no engine-specific SQL; +23 tests (adapter streaming, streaming loop event sequences + persisted-row parity, SSE route incl. JSON-mode-unchanged and 401/404-as-JSON-not-SSE) - `openapi:check` ✓ up to date · `openapi:coverage` 116 pairs (SSE isn't a JSON DTO; no new schema) - semgrep full CI pack set ✓ **0 findings** (incl. an explicit scan of the new/changed core files) ## Out of scope - The chat UI client (next ticket consumes this). Voice. Effort/thinking tuning. Closes #340 🤖 Generated with [Claude Code](https://claude.com/claude-code)
feat(api): streaming agent turns — LlmClient.stream() + SSE chat endpoint
All checks were successful
Commits / Conventional Commits (pull_request) Successful in 20s
PR / Static analysis (pull_request) Successful in 1m55s
PR / OSV-Scanner (pull_request) Successful in 18s
PR / pnpm audit (pull_request) Successful in 2m45s
PR / OpenAPI (pull_request) Successful in 3m47s
PR / Lint (pull_request) Successful in 4m28s
PR / Typecheck (pull_request) Successful in 4m57s
PR / Client (web export smoke) (pull_request) Successful in 5m3s
PR / Build (pull_request) Successful in 5m11s
PR / Package age policy (soft) (pull_request) Successful in 1m27s
PR / Test (postgres) (pull_request) Successful in 5m16s
PR / Test (sqlite) (pull_request) Successful in 5m24s
PR / Trivy (image) (pull_request) Successful in 3m9s
Secrets / gitleaks (pull_request) Successful in 57s
PR / E2E (Playwright) (pull_request) Successful in 6m18s
PR / Coverage (soft) (pull_request) Successful in 4m14s
729d3fdb65
Add token-by-token streaming to the built-in agent (ADR-0029 §3): a
`stream()` method on the LlmClient, a unified streaming agent-loop core,
and content-negotiated SSE on the two conversation POST routes. No DB
schema change (reuses conversations/messages).

- LlmClient gains a required `stream(req)` yielding text_delta / tool_call
  / done events; `generate()` stays for non-loop callers. Both adapters
  implement it — Anthropic via the SDK's messages.stream + finalMessage,
  OpenAI-compatible via fetch stream:true + hand-parsed SSE chunks. The
  done.result reuses the same normalization so it can't drift from
  generate().
- The agent loop is refactored to a single private async generator
  (driveLoopEvents) that is the only turn body. runTurn/resumeTurn drain
  it (JSON behaviour byte-for-byte unchanged); streamTurn/streamResume
  forward its ConversationEvents. Same persistence order, MAX_TOOL_ROUNDS
  guard, and propose-then-confirm write-pause as before.
- The messages/resume routes negotiate on Accept: text/event-stream.
  Setup failures (404/401/409/llm_not_configured) stay real HTTP status
  codes via a pre-pull of the first event; once 200 + headers are sent, a
  mid-stream provider failure surfaces as a terminal `error` event.

Closes #340

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

📊 Test coverage

Patch coverage: no testable lines changed.

Overall (app/, lib/, db/, excluding UI per ADR-0019):

Metric Value Soft target
Lines 79.3% ≥ 50%
Branches 70.9% ⚠️ ≥ 75%
Functions 80.2% informational

Soft thresholds per ADR-0019. Coverage is informational and does not block merge.

<!-- coverage-comment --> ## 📊 Test coverage **Patch coverage:** no testable lines changed. **Overall** (`app/`, `lib/`, `db/`, excluding UI per ADR-0019): | Metric | Value | Soft target | |---|---|---| | Lines | 79.3% ✅ | ≥ 50% | | Branches | 70.9% ⚠️ | ≥ 75% | | Functions | 80.2% | informational | Soft thresholds per [ADR-0019](docs/adr/0019-coverage-soft-targets.md). Coverage is informational and does not block merge.
james merged commit 8b758329ca into main 2026-06-29 01:49:24 +00:00
james deleted branch 340-agent-streaming 2026-06-29 01:49:24 +00:00
Sign in to join this conversation.
No description provided.