OffBroadwayWebSocket
An Elixir library providing an Off-Broadway producer for resilient WebSocket ingestion with :gun.
It is designed for exchange-style or session-oriented WebSocket feeds where a Broadway pipeline needs:
- demand-aware delivery
- reconnect and backoff handling
- ping/pong and idle-timeout monitoring
- optional post-upgrade bootstrap frames
- optional stateful inbound frame handling
Documentation
HexDocs includes the API reference and focused guides:
- Getting Started
- Configuration
- Auth Refresh and Handshake Failures
- On-Upgrade Bootstrap
- Frame Handler
- Retry and Liveness
- Telemetry
Installation
Add off_broadway_websocket to your list of dependencies in mix.exs:
def deps do
[
{:off_broadway_websocket, "~> 1.2.1"}
]
endThen fetch dependencies:
mix deps.getQuickstart
defmodule MyApp.Broadway do
use Broadway
alias Broadway.Message
alias Broadway.NoopAcknowledger
def start_link(_args) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
OffBroadwayWebSocket.Producer,
url: "wss://example.com:443",
path: "/stream/trades",
ws_timeout: 15_000,
telemetry_id: :my_app_ws,
gun_opts: %{
transport: :tls,
protocols: [:http],
tls_opts: [
verify: :verify_peer,
cacertfile: CAStore.file_path(),
verify_fun: {
&:ssl_verify_hostname.verify_fun/3,
[check_hostname: String.to_charlist("example.com")]
}
],
http_opts: %{version: :"HTTP/1.1"},
ws_opts: %{keepalive: 10_000, silence_pings: false}
},
on_upgrade: {MyApp.WebSocket, :subscription_frames, [[]]},
frame_handler: {MyApp.WebSocket, :handle_frame, []},
frame_handler_state: %{subscriptions: %{}}
},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [default: [min_demand: 0, max_demand: 100, concurrency: 4]]
)
end
def handle_message(_stage, %Message{data: payload} = message, _context) do
message
|> Map.put(:data, payload)
end
def transform(payload, _opts) do
%Broadway.Message{
data: payload,
acknowledger: NoopAcknowledger.init()
}
end
endCore Concepts
On-upgrade bootstrap
Use :on_upgrade when a websocket API requires subscribe or auth frames to be sent
immediately after upgrade and before the connection should be considered ready.
The callback must return one of:
{:ok, []}{:ok, [{:text | :binary, iodata()}, ...]}{:error, reason}
Immediate callback failure or immediate :gun.ws_send/3 failure is treated as a
bootstrap failure and follows the reconnect/backoff path.
Stateful inbound frame handling
Use :frame_handler when raw websocket frames depend on connection-local session
state, for example subscription ids or channel mappings.
The callback receives a normalized inbound payload and the current handler state and must return one of:
{:emit, [payload, ...], new_state}{:skip, new_state}{:error, reason, new_state}
frame_handler_state resets to its initial value on reconnect.
Configuration Overview
Required startup options:
:url:path
Common optional startup options:
:headers:headers_fn:ws_timeout:await_timeout:telemetry_id:gun_opts:ws_retry_opts:ws_retry_fun:on_upgrade:frame_handler:frame_handler_state
See Configuration for the full option contract and defaults.
Use Auth Refresh and Handshake Failures for rotating websocket auth headers and troubleshooting failed upgrades.
Telemetry
Connection telemetry is emitted under:
[:<telemetry_id>, :connection, :success][:<telemetry_id>, :connection, :failure][:<telemetry_id>, :connection, :disconnected][:<telemetry_id>, :connection, :timeout][:<telemetry_id>, :connection, :status]
See Telemetry for event shapes and usage examples.
Boundary
OffBroadwayWebSocket owns:
- websocket connection lifecycle
- reconnect/backoff behavior
- idle-timeout and keepalive handling
- optional bootstrap frame dispatch
- optional connection-local inbound frame handling
Application code owns:
- payload decoding and validation
- domain-specific adaptation
- downstream routing and publishing
- session policy beyond the transport boundary
Running Tests
mix testLicense
Apache License 2.0. See LICENSE.