--- title: "Streaming responses (chunked HTTP, SSE)" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Streaming responses (chunked HTTP, SSE)} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include=FALSE} knitr::opts_chunk$set(eval = FALSE, comment = "#>") ``` A normal route handler builds the entire response body in one shot and returns it. Sometimes you need the opposite — start sending bytes before the work is done, and keep emitting more over time. Common cases: a Server-Sent-Events feed, an LLM token stream, an NDJSON log tail, anything where the client cares about *first byte* time more than *last byte* time. drogonR supports this through `dr_stream()` and the SSE convenience wrapper `dr_stream_sse()`. ## How it works A streaming handler returns a `drogon_stream` value (instead of a normal response list). The dispatcher recognises the class, opens a HTTP chunked-transfer response on Drogon's side, and from then on calls your `next_chunk()` function on the main R thread, one chunk at a time. Each call returns the bytes to send and a `done` flag. ```text R handler returns drogon_stream | v Drogon sends 200 + chunked headers ────► client | v pump: next_chunk(state, cancelled = FALSE) | returns list(chunk, state, done = FALSE) v Drogon sends one chunk ────► client | v pump: next_chunk(state, …) ... | returns list(..., done = TRUE) v Drogon closes the chunked response ────► client ``` If the client disconnects mid-stream, the dispatcher catches the close event from Drogon and runs your `next_chunk()` one final time with `cancelled = TRUE`, so you can free state. The stream is then torn down regardless of what that final call returns. ## `dr_stream()` — the base API ```r library(drogonR) app <- dr_app() |> dr_get("/numbers", function(req) { dr_stream( state = list(i = 0L, n = 10L), next_chunk = function(state, cancelled) { if (cancelled || state$i >= state$n) { return(list(chunk = "", state = state, done = TRUE)) } state$i <- state$i + 1L list( chunk = sprintf("%d\n", state$i), state = state, done = FALSE) }, content_type = "text/plain") }) dr_serve(app, port = 8080L) ``` `next_chunk()` always returns `list(chunk = , state = , done = )`. `chunk` is sent verbatim — format SSE / NDJSON / whatever yourself, or use one of the helpers built on top. ## `dr_stream_sse()` — Server-Sent Events 90% of streaming endpoints just emit `data:` frames. `dr_stream_sse()` takes care of: * SSE framing (`data: \n\n`, with multi-line `data` automatically split per the spec), * default headers (`Content-Type: text/event-stream`, `Cache-Control: no-cache`, `X-Accel-Buffering: no` so reverse proxies don't buffer). ```r app <- dr_app() |> dr_get("/sse", function(req) { dr_stream_sse( state = list(i = 0L, n = 5L), generator = function(state, cancelled) { if (cancelled || state$i >= state$n) { return(list(data = "", state = state, done = TRUE)) } state$i <- state$i + 1L list( data = sprintf("tick %d", state$i), state = state, done = FALSE) }) }) dr_serve(app, port = 8080L) ``` Test it: ```sh curl -N http://127.0.0.1:8080/sse # data: tick 1 # # data: tick 2 # ... ``` Need `event:`, `id:`, or `retry:`? Use `dr_stream()` directly and build the frame yourself — the helper deliberately keeps to just `data:`. ## Threading: keep each pump short `next_chunk()` always runs on the **main R thread**. R is single-threaded, so this is the only place it could safely run. Heavy work inside one pump blocks every other request and every other stream until it returns. * **Good:** each pump computes one row, formats it, returns. * **Bad:** each pump does a 500 ms database query, or `Sys.sleep(1)`, or runs a long loop in pure R. If you have CPU-bound work, split it across many pumps (carry progress in `state`). If you have blocking I/O — read it on a worker process and pass results in via `state` updates from outside, not from inside the pump. ## Cancellation contract When the client goes away, `next_chunk()` is called **exactly once** with `cancelled = TRUE`. Use it to release per-stream resources (file handles, DB cursors, accumulated buffers). The return value is ignored — the stream is torn down either way. ```r generator <- function(state, cancelled) { if (cancelled) { if (!is.null(state$conn)) close(state$conn) return(list(data = "", state = state, done = TRUE)) } # ... normal path ... } ``` The notification arrives as soon as Drogon's I/O thread sees the TCP close (epoll on Linux, kqueue on BSD/macOS), not on the next attempted send — drogonR carries a small Drogon patch (`setUserCloseCallback`) that wires this up. Without it, small SSE chunks would keep ticking long after the kernel had silently absorbed the writes for a closed connection. ## Errors inside `next_chunk()` If your generator raises an R error mid-stream, the dispatcher catches it (`R_tryEval`), prints `drogonR stream: next_chunk() raised an error; closing stream` to stderr, and tears the session down. Headers are already on the wire by then, so the client sees a truncated chunked response — there is no way to send a 500 once streaming has started. The server stays up and other requests are unaffected. If you want a custom on-error behaviour, `tryCatch()` inside the generator yourself and return a final error frame plus `done = TRUE`. ## Middleware does not wrap individual chunks [dr_use()] middleware runs **once**, when the request enters R and the handler returns the `drogon_stream` object. After that, the dispatcher pumps `next_chunk()` directly — middleware is *not* called per chunk, and [dr_on_error()] is *not* invoked for errors raised inside `next_chunk()` (those are handled as described above). If you need per-chunk hooks (metrics, mutation), wrap your generator yourself. ## Native (C / C++) streaming For LLM token streams or any other case where R-side overhead is the bottleneck, register a streaming handler that bypasses R entirely: ```r app <- dr_app() |> dr_get_cpp_stream("/v1/generate", package = "myllmbackend", callable = "generate") ``` The backend implements `drogonr_stream_handler_t` from `` (shipped under `inst/include/`) and pushes chunks via `dr_send_chunk()` / `dr_close_chunk()` on a drogonR worker thread. R-side middleware and the [dr_on_error()] hook do **not** apply — the request never enters R. See `?dr_routes_cpp_stream`. ## Caveats * Chunked streaming requires HTTP/1.1 keep-alive. Clients that send `Connection: close` won't receive any chunks — Drogon shortcuts to a single non-chunked response in that case. * Don't call `dr_serve()` and immediately exit a script — the call is non-blocking. Standalone `Rscript` files that host a server need a loop pump, e.g. `repeat later::run_now(timeoutSecs = 3600)`, or the process will exit before the stream produces anything. Interactive R sessions don't need this. * Heavy / many-chunk streams are end-to-end tested in `inst/examples/bench-stream-*.R` if you want to gauge throughput on your hardware.