feat(api): streaming agent turns — LlmClient.stream() + SSE chat endpoint #340

Closed
opened 2026-06-29 01:06:48 +00:00 by james · 0 comments
Owner

Add token-by-token streaming to the built-in agent (ADR-0029 §3): a stream() method on the #337 LlmClient (deferred there), a streaming variant of the #339 agent loop, and SSE responses on the conversation endpoints so the PWA gets live output. Part of epic #47. No schema change (reuses conversations/messages).

Scope

  • LlmClient.stream(req): AsyncIterable<LlmStreamEvent> on the interface + both adapters:
    • LlmStreamEvent = { type: "text_delta", text } | { type: "tool_call", toolCall } (emitted once a tool call is fully accumulated) | { type: "done", result: LlmResult } (final text + toolCalls + stopReason).
    • Anthropic (anthropic.ts): use the SDK's client.messages.stream(...) — accumulate text_delta and input_json_delta (tool_use) events, emit deltas + the final done. Injectable transport for tests.
    • OpenAI-compatible (openai-compatible.ts): POST with stream: true, parse the SSE data: lines, accumulate choices[0].delta (content + tool_calls deltas), emit deltas + done. Injectable fetch (returns an SSE ReadableStream) for tests.
    • Keep the existing non-streaming generate() (callers/tests may still use it).
  • Streaming loop — prefer refactoring lib/agent/loop.ts so the turn is an internal async event generator that both the existing JSON runTurn/resumeTurn (collect to TurnResult) and the new streaming path consume — sharing tool-dispatch + persistence so behaviour is identical (the #339 loop tests are the regression guard). Emit conversation-level events: text_delta, tool_call (a tool is being run), tool_result (read tool done), awaiting_confirmation (write pause — carries the proposal), message (a row was persisted), done (final status), error. Persistence is unchanged (assistant/tool messages still saved; write tools still pause).
  • SSE endpoints — content-negotiate on the existing POST /api/conversations/{id}/messages and POST /api/conversations/{id}/resume: when Accept: text/event-stream, return a streaming Response (text/event-stream, Cache-Control: no-cache, X-Accel-Buffering: no) whose body emits event: <type>\ndata: <json>\n\n for each conversation event; otherwise the existing JSON behaviour (back-compatible with #339). Auth via getAuthIdentity → 401; per-user (cross-user conversation → 404) unchanged. The proxy/auth middleware must let the stream through; confirm the SW/offline shell isn't affected (it only bypasses non-cached routes).
  • Tests: streaming adapters with injected transport (a fake Anthropic stream; a fake fetch returning a canned SSE body) — assert deltas + accumulated tool calls + done; the streaming loop with a fake streaming client (text-only turn, a read-tool round, a write pause); the SSE route (POST with Accept: text/event-stream, read the event stream, assert the event sequence + that JSON mode still works). Both engines for the DB-touching paths. No live provider calls.

Acceptance criteria

  • LlmClient.stream() on both adapters yields text deltas + tool calls + a final result; non-streaming generate() still works.
  • The conversation endpoints stream over SSE when Accept: text/event-stream, emitting deltas / tool events / a write-pause event / done; JSON mode unchanged. Per-user 404/401 hold.
  • Tests use injected transport (no live API); both-engine DB tests; typecheck/lint/semgrep + OpenAPI drift green.

Out of scope

  • The chat UI client (next ticket consumes this). Voice. Server-side compaction/effort/thinking tuning.

Depends on #337 (adapters), #339 (loop). Implements ADR-0029 §3.

Add token-by-token streaming to the built-in agent (ADR-0029 §3): a `stream()` method on the #337 `LlmClient` (deferred there), a streaming variant of the #339 agent loop, and SSE responses on the conversation endpoints so the PWA gets live output. Part of epic #47. No schema change (reuses `conversations`/`messages`). ## Scope - **`LlmClient.stream(req): AsyncIterable<LlmStreamEvent>`** on the interface + **both** adapters: - `LlmStreamEvent` = `{ type: "text_delta", text }` | `{ type: "tool_call", toolCall }` (emitted once a tool call is fully accumulated) | `{ type: "done", result: LlmResult }` (final text + toolCalls + stopReason). - **Anthropic** (`anthropic.ts`): use the SDK's `client.messages.stream(...)` — accumulate `text_delta` and `input_json_delta` (tool_use) events, emit deltas + the final `done`. Injectable transport for tests. - **OpenAI-compatible** (`openai-compatible.ts`): POST with `stream: true`, parse the SSE `data:` lines, accumulate `choices[0].delta` (content + `tool_calls` deltas), emit deltas + `done`. Injectable `fetch` (returns an SSE `ReadableStream`) for tests. - Keep the existing non-streaming `generate()` (callers/tests may still use it). - **Streaming loop** — prefer refactoring `lib/agent/loop.ts` so the turn is an internal **async event generator** that both the existing JSON `runTurn`/`resumeTurn` (collect to `TurnResult`) and the new streaming path consume — sharing tool-dispatch + persistence so behaviour is identical (the #339 loop tests are the regression guard). Emit conversation-level events: `text_delta`, `tool_call` (a tool is being run), `tool_result` (read tool done), `awaiting_confirmation` (write pause — carries the proposal), `message` (a row was persisted), `done` (final status), `error`. Persistence is unchanged (assistant/tool messages still saved; write tools still pause). - **SSE endpoints** — content-negotiate on the existing `POST /api/conversations/{id}/messages` and `POST /api/conversations/{id}/resume`: when `Accept: text/event-stream`, return a streaming `Response` (`text/event-stream`, `Cache-Control: no-cache`, `X-Accel-Buffering: no`) whose body emits `event: <type>\ndata: <json>\n\n` for each conversation event; otherwise the existing JSON behaviour (back-compatible with #339). Auth via `getAuthIdentity` → 401; per-user (cross-user conversation → 404) unchanged. The proxy/auth middleware must let the stream through; confirm the SW/offline shell isn't affected (it only bypasses non-cached routes). - **Tests**: streaming adapters with injected transport (a fake Anthropic stream; a fake `fetch` returning a canned SSE body) — assert deltas + accumulated tool calls + `done`; the streaming loop with a fake streaming client (text-only turn, a read-tool round, a write pause); the SSE route (POST with `Accept: text/event-stream`, read the event stream, assert the event sequence + that JSON mode still works). Both engines for the DB-touching paths. **No live provider calls.** ## Acceptance criteria - [ ] `LlmClient.stream()` on both adapters yields text deltas + tool calls + a final result; non-streaming `generate()` still works. - [ ] The conversation endpoints stream over SSE when `Accept: text/event-stream`, emitting deltas / tool events / a write-pause event / done; JSON mode unchanged. Per-user 404/401 hold. - [ ] Tests use injected transport (no live API); both-engine DB tests; typecheck/lint/semgrep + OpenAPI drift green. ## Out of scope - The chat UI client (next ticket consumes this). Voice. Server-side compaction/effort/thinking tuning. Depends on #337 (adapters), #339 (loop). Implements ADR-0029 §3.
james closed this issue 2026-06-29 01:49:24 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
james/carol#340
No description provided.