LangENKO

Streaming responses

SSE and chunked streams for AI chat, live progress, and large payloads — without breaking the filling chain.

since v0.22
On this page

Streaming responses

TL;DRctx.sse(emit => ...) for Server-Sent Events, ctx.stream(async function* () { ... }) for chunked text. Both stay inside the filling chain so middleware (auth, CSRF) still applies.

Why

LLM tokens, progress updates, large CSV exports — anything that benefits from "first byte fast, rest later". The filling chain handles backpressure and shutdown signals so your handler can stay focused on producing data.

SSE — Server-Sent Events

// app/api/chat/stream/route.ts
import { Mandu } from "@mandujs/core";
import { withSession } from "@/server/lib/auth";
import { streamCompletion } from "@/server/ai/openai";

export default Mandu.filling()
  .use(withSession())
  .get((ctx) =>
    ctx.sse(async (emit) => {
      const prompt = ctx.query.q ?? "";
      for await (const token of streamCompletion(prompt)) {
        emit({ data: token });
      }
      emit({ event: "done", data: "" });
    }),
  );

Client side:

const es = new EventSource("/api/chat/stream?q=hello");
es.onmessage = (e) => append(e.data);
es.addEventListener("done", () => es.close());

Chunked text / JSON

// app/api/export/users/route.ts
import { Mandu } from "@mandujs/core";

export default Mandu.filling().get((ctx) =>
  ctx.stream(async function* () {
    yield "id,email,created_at\n";
    for await (const user of db.users.iter()) {
      yield `${user.id},${user.email},${user.createdAt.toISOString()}\n`;
    }
  }, { contentType: "text/csv" }),
);

The runtime sets Transfer-Encoding: chunked and flushes each yielded chunk immediately.

🤖 Agent Prompt

🤖 Agent Prompt — Add a streaming endpoint
Add a streaming endpoint to my Mandu app at `app/api/<NAME>/route.ts`.

Pick the kind:
- SSE (live LLM tokens, progress, push updates) →
    return ctx.sse(async (emit) => {
      ... emit({ data }); ...
      emit({ event: 'done', data: '' });
    })
  from a `.get()` handler.

- Chunked text/JSON (CSV export, NDJSON, large payloads) →
    return ctx.stream(async function* () { yield '...'; },
                      { contentType: '...' });

Required invariants:
- Stay inside the filling chain — `.use(withSession())` etc. still applies.
- Never construct a `new Response(stream, ...)` directly.
- For SSE: emit a `done` event before the generator returns.
- For LLM streaming: emit per token; do not accumulate before emitting.
- SSE handlers use `.get(...)`, never `.post(...)`.

After writing the route, test with `curl -N` for SSE or
`curl --no-buffer` for chunked, and run `bun run guard`.

Pitfalls

  • Browsers only EventSource GET. A .post() SSE handler will look fine in curl but won't connect from the browser.
  • Don't wrap LLM streams in another buffer. Most provider SDKs return an async iterator; emit each token as it arrives.
  • Send done before closing. Without it, clients reconnect indefinitely and burn quota.
  • ctx.sse and ctx.stream set their own headers. Don't add Content-Type manually — pass { contentType } as the second arg.

For Agents

AI hint

For Server-Sent Events, return `ctx.sse((emit) => { ... emit({ data }); ... })` from a `.get()` handler. For incremental chunked text, use `ctx.stream(async function* () { yield "..."; })`. Both keep the filling chain (`.use()`, `.guard()`) intact and live in `app/api/<route>/route.ts`.

Invariants
  • SSE handlers must return `ctx.sse(emitter)`; never `new Response(stream, { ... })` manually
  • Chunked text/JSON streams use `ctx.stream(asyncGenerator)` so the runtime sets `Transfer-Encoding: chunked` and flushes correctly
  • Don't mix `.post(...)` and `ctx.sse(...)` on the same route — SSE clients always GET
  • When piping LLM output, await the model's stream and `emit({ data: token })` per token; never accumulate before emitting
  • Always send a `done` event before closing so clients can stop reconnecting
Guard scope
api-route