codecovHex.pm VersionLicenseDocumentationBuild StatusElixir Version

OffBroadwayWebSocket

An Elixir library providing an Off-Broadway producer for resilient WebSocket ingestion with :gun.

It is designed for exchange-style or session-oriented WebSocket feeds where a Broadway pipeline needs:

Documentation

HexDocs includes the API reference and focused guides:

Installation

Add off_broadway_websocket to your list of dependencies in mix.exs:

def deps do
  [
    {:off_broadway_websocket, "~> 1.2.1"}
  ]
end

Then fetch dependencies:

mix deps.get

Quickstart

defmodule MyApp.Broadway do
  use Broadway

  alias Broadway.Message
  alias Broadway.NoopAcknowledger

  def start_link(_args) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          OffBroadwayWebSocket.Producer,
          url: "wss://example.com:443",
          path: "/stream/trades",
          ws_timeout: 15_000,
          telemetry_id: :my_app_ws,
          gun_opts: %{
            transport: :tls,
            protocols: [:http],
            tls_opts: [
              verify: :verify_peer,
              cacertfile: CAStore.file_path(),
              verify_fun: {
                &:ssl_verify_hostname.verify_fun/3,
                [check_hostname: String.to_charlist("example.com")]
              }
            ],
            http_opts: %{version: :"HTTP/1.1"},
            ws_opts: %{keepalive: 10_000, silence_pings: false}
          },
          on_upgrade: {MyApp.WebSocket, :subscription_frames, [[]]},
          frame_handler: {MyApp.WebSocket, :handle_frame, []},
          frame_handler_state: %{subscriptions: %{}}
        },
        transformer: {__MODULE__, :transform, []},
        concurrency: 1
      ],
      processors: [default: [min_demand: 0, max_demand: 100, concurrency: 4]]
    )
  end

  def handle_message(_stage, %Message{data: payload} = message, _context) do
    message
    |> Map.put(:data, payload)
  end

  def transform(payload, _opts) do
    %Broadway.Message{
      data: payload,
      acknowledger: NoopAcknowledger.init()
    }
  end
end

Core Concepts

On-upgrade bootstrap

Use :on_upgrade when a websocket API requires subscribe or auth frames to be sent immediately after upgrade and before the connection should be considered ready.

The callback must return one of:

Immediate callback failure or immediate :gun.ws_send/3 failure is treated as a bootstrap failure and follows the reconnect/backoff path.

Stateful inbound frame handling

Use :frame_handler when raw websocket frames depend on connection-local session state, for example subscription ids or channel mappings.

The callback receives a normalized inbound payload and the current handler state and must return one of:

frame_handler_state resets to its initial value on reconnect.

Configuration Overview

Required startup options:

Common optional startup options:

See Configuration for the full option contract and defaults.

Use Auth Refresh and Handshake Failures for rotating websocket auth headers and troubleshooting failed upgrades.

Telemetry

Connection telemetry is emitted under:

See Telemetry for event shapes and usage examples.

Boundary

OffBroadwayWebSocket owns:

Application code owns:

Running Tests

mix test

License

Apache License 2.0. See LICENSE.