feat(api): streaming agent turns — LlmClient.stream() + SSE chat endpoint #340
Labels
No labels
area:auth
area:ci
area:db
area:infra
area:native
area:pwa
area:service
epic
feature
foundation
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
james/carol#340
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Add token-by-token streaming to the built-in agent (ADR-0029 §3): a
stream()method on the #337LlmClient(deferred there), a streaming variant of the #339 agent loop, and SSE responses on the conversation endpoints so the PWA gets live output. Part of epic #47. No schema change (reusesconversations/messages).Scope
LlmClient.stream(req): AsyncIterable<LlmStreamEvent>on the interface + both adapters:LlmStreamEvent={ type: "text_delta", text }|{ type: "tool_call", toolCall }(emitted once a tool call is fully accumulated) |{ type: "done", result: LlmResult }(final text + toolCalls + stopReason).anthropic.ts): use the SDK'sclient.messages.stream(...)— accumulatetext_deltaandinput_json_delta(tool_use) events, emit deltas + the finaldone. Injectable transport for tests.openai-compatible.ts): POST withstream: true, parse the SSEdata:lines, accumulatechoices[0].delta(content +tool_callsdeltas), emit deltas +done. Injectablefetch(returns an SSEReadableStream) for tests.generate()(callers/tests may still use it).lib/agent/loop.tsso the turn is an internal async event generator that both the existing JSONrunTurn/resumeTurn(collect toTurnResult) and the new streaming path consume — sharing tool-dispatch + persistence so behaviour is identical (the #339 loop tests are the regression guard). Emit conversation-level events:text_delta,tool_call(a tool is being run),tool_result(read tool done),awaiting_confirmation(write pause — carries the proposal),message(a row was persisted),done(final status),error. Persistence is unchanged (assistant/tool messages still saved; write tools still pause).POST /api/conversations/{id}/messagesandPOST /api/conversations/{id}/resume: whenAccept: text/event-stream, return a streamingResponse(text/event-stream,Cache-Control: no-cache,X-Accel-Buffering: no) whose body emitsevent: <type>\ndata: <json>\n\nfor each conversation event; otherwise the existing JSON behaviour (back-compatible with #339). Auth viagetAuthIdentity→ 401; per-user (cross-user conversation → 404) unchanged. The proxy/auth middleware must let the stream through; confirm the SW/offline shell isn't affected (it only bypasses non-cached routes).fetchreturning a canned SSE body) — assert deltas + accumulated tool calls +done; the streaming loop with a fake streaming client (text-only turn, a read-tool round, a write pause); the SSE route (POST withAccept: text/event-stream, read the event stream, assert the event sequence + that JSON mode still works). Both engines for the DB-touching paths. No live provider calls.Acceptance criteria
LlmClient.stream()on both adapters yields text deltas + tool calls + a final result; non-streaminggenerate()still works.Accept: text/event-stream, emitting deltas / tool events / a write-pause event / done; JSON mode unchanged. Per-user 404/401 hold.Out of scope
Depends on #337 (adapters), #339 (loop). Implements ADR-0029 §3.