ReqServerSentEvents

Req plugin for Server-Sent Events (SSE).

Decodes chunked SSE byte streams into %ReqServerSentEvents.Frame{} structs, transparently wrapping all three of Req's streaming hooks: into: fun, into: :self, and into: collectable.

Hex.pmDocumentationCICoverage

Installation

def deps do
  [
    {:req, "~> 0.5"},
    {:req_server_sent_events, "~> 0.2.0"}
  ]
end

Usage

Attach the plugin to any %Req.Request{} with ReqServerSentEvents.attach/1. It rewrites the into: option in place so that each complete SSE frame is decoded to a %ReqServerSentEvents.Frame{} before reaching your handler.

into: must be set on the request before calling attach/1 — pass it to Req.new/1, not to Req.get/2.

Options

attach/2 accepts an options keyword list:

Option Type Description
:max_frame_sizepos_integer() | nil Cap on the pending-frame buffer. If the buffer grows past this many bytes without a "\n\n" delimiter, a ReqServerSentEvents.FrameTooLargeError is raised. Defaults to nil (unbounded).
Req.new(url: url, into: [])
|> ReqServerSentEvents.attach(max_frame_size: 1_048_576)
|> Req.get!()

into: collectable

Decoded frames are collected into any Collectable. The request blocks until the server closes the connection, making this best suited for finite streams.

{:ok, resp} =
  Req.new(url: "https://example.com/events", into: [])
  |> ReqServerSentEvents.attach()
  |> Req.get()

frames = resp.body  # [%ReqServerSentEvents.Frame{}, ...]

into: fun

Your function receives {:sse_event, %ReqServerSentEvents.Frame{}} instead of {:data, binary}. Return {:cont, {req, resp}} to continue or {:halt, {req, resp}} to stop early.

Req.new(
  url: "https://example.com/events",
  into: fn {:sse_event, frame}, {req, resp} ->
    IO.inspect(frame)
    {:cont, {req, resp}}
  end
)
|> ReqServerSentEvents.attach()
|> Req.get!()

into: :self

Decoded frames are sent to the calling process as {ref, {:sse_event, %Frame{}}}. A {ref, :sse_done} sentinel is sent when the stream ends. Retrieve the ref with ReqServerSentEvents.ref/1.

Note:self() is captured when attach/1 is called. Call attach/1 in the same process that will receive the messages — typically inside a Task.async callback as shown below.

task = Task.async(fn ->
  Req.new(url: "https://example.com/events", into: :self)
  |> ReqServerSentEvents.attach()
  |> Req.get!()
end)

resp = Task.await(task)
ref = ReqServerSentEvents.ref(resp)

For short-lived or finite streams, a plain receive is sufficient:

receive do
  {^ref, {:sse_event, frame}} -> IO.inspect(frame)
  {^ref, :sse_done}           -> :done
after
  30_000 -> :timeout
end

For unbounded streams, wrap the receive in a Stream.resource/3:

Stream.resource(
  fn -> ref end,
  fn ref ->
    receive do
      {^ref, {:sse_event, frame}} -> {[frame], ref}
      {^ref, :sse_done}           -> {:halt, ref}
    after
      30_000 -> {:halt, ref}
    end
  end,
  fn _ -> :ok end
)
|> Enum.each(&IO.inspect/1)

Frame fields

Each decoded event is a %ReqServerSentEvents.Frame{} struct:

Field Type Description
eventString.t() | nil Event type (event: field)
dataString.t() | nil Payload; multiple data: lines joined with "\n"
idString.t() | nil Event ID for Last-Event-ID reconnect header
retrynon_neg_integer() | nil Reconnection delay in milliseconds
comments[String.t()] Lines starting with : (keepalive, diagnostics)

Frames with no data: field (e.g. comment-only keepalives) are passed through to the handler — filter or discard them as needed.

The id and retry fields follow the SSE reconnection protocol: retry is the server-suggested delay in milliseconds before reconnecting, and id should be sent as the Last-Event-ID request header on reconnect. This library decodes both fields but does not implement automatic reconnection — that is the caller's responsibility.

Reconnection example

A minimal into: fun consumer that tracks the most recent id and retry, then reconnects with Last-Event-ID after the suggested delay:

defmodule SSEReconnectExample do
  alias ReqServerSentEvents.Frame

  @default_retry 3_000

  def stream(url, last_id \\ nil) do
    headers = if last_id, do: [{"last-event-id", last_id}], else: []

    {:ok, resp} =
      Req.new(url: url, headers: headers, into: &handle_event/2)
      |> ReqServerSentEvents.attach()
      |> Req.get()

    last_id = resp.private[:last_id] || last_id
    retry_ms = resp.private[:retry] || @default_retry
    Process.sleep(retry_ms)
    stream(url, last_id)
  end

  defp handle_event({:sse_event, %Frame{} = frame}, {req, resp}) do
    resp =
      resp
      |> maybe_put_private(:last_id, frame.id)
      |> maybe_put_private(:retry, frame.retry)

    # ... process frame.data here ...

    {:cont, {req, resp}}
  end

  defp maybe_put_private(resp, _key, nil), do: resp
  defp maybe_put_private(resp, key, value), do: put_in(resp.private[key], value)
end

Wrap the recursive stream/2 call in a Task (or supervised GenServer) so it survives independent of the calling process, and add your own error handling for non-2xx responses, transport failures, and graceful shutdown.

Development

Requirements: Elixir ~> 1.17, Erlang/OTP compatible with your Elixir version.

# Install dependencies
mix deps.get

# Run tests
mix test

# Run unit tests only (skips Bypass integration tests for a faster loop)
mix test --exclude integration

# Run tests with coverage
mix coveralls.html

# Format code
mix format

# Check formatting without writing
mix format --check-formatted

Tests do not require a running server. The plugin's streaming logic is exercised by calling the rewritten into: handlers directly with synthetic byte chunks. The integration tests in test/req_server_sent_events_integration_test.exs use Bypass to spin up a local HTTP server. They can be excluded with mix test --exclude integration for a faster feedback loop.