codecovHex.pm VersionLicenseDocumentationBuild StatusElixir Version

OffBroadwayWebSocket

An Elixir library providing a Broadway producer for resilient WebSocket connections using gun.
Supports unified gun_opts, idle‐timeout detection (ping/pong & data frames), demand‐based dispatch, and custom retry strategies.


Installation

Add to your mix.exs:

def deps do
  [
    {:off_broadway_websocket, "~> 1.0.0"}
  ]
end

Fetch & compile:

mix deps.get
mix deps.compile

Quickstart

defmodule MyApp.Broadway do
  use Broadway
  
  require Logger
  
  alias Broadway.Message
  alias Broadway.NoopAcknowledger

  def start_link(_args) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          OffBroadwayWebSocket.Producer,
          # Your WebSocket endpoint:
          url: "wss://example.com:443",
          path: "/stream/updates",

          # Idle timeout (ms) for no ping/data before reconnect:
          ws_timeout: 15_000,
          # How long to wait (ms) for Gun to come up:
          await_timeout: 8_000,

          # Retry configuration – must include at least :retries_left and :delay:
          ws_retry_opts: %{
            max_retries:     5,
            retries_left:    5,
            delay:           1_000,   # initial backoff (ms)
            max_delay:       30_000,  # cap for backoff (ms)
            backoff_factor:  2,       # exponential factor
            jitter_fraction: 0.1      # ±10% random jitter
          },
          ws_retry_fun: &MyApp.Backoff.exponential_backoff_with_jitter/1,

          # Gun options (TCP/TLS, HTTP, WS):
          gun_opts: %{
            connect_timeout: 5_000,      # TCP/TLS handshake timeout
            protocols:       [:http],     # application protocols
            transport:       :tls,        # :tcp or :tls

            tls_opts: [
              verify:         :verify_peer,
              cacertfile:     CAStore.file_path(),
              depth:          10,
              reuse_sessions: false,
              verify_fun:     {
                &:ssl_verify_hostname.verify_fun/3,
                [check_hostname: String.to_charlist("example.com")]
              }
            ],

            ws_opts: %{
              keepalive:     10_000,  # send ping if silent
              silence_pings: false
            },

            http_opts: %{
              version:       :"HTTP/1.1"
            }
          },

          # Prefix for telemetry events:
          telemetry_id: :custom_telemetry,
          # Optional headers
          headers: [  
            {"X-ABC-APIKEY", "api-key"},
            {"X-ABC-PAYLOAD", %{}},
            {"X-ABC-SIGNATURE", "signature"}
          ],
        },
        transformer: {__MODULE__, :transform, []},
        concurrency: 1
      ],
      processors: [
        default: [min_demand: 0, max_demand: 100, concurrency: 8]
      ],
      context: []
    )
  end

  @impl true
  def handle_message(_stage, %Message{data: raw} = msg, _ctx) do
    case Jason.decode(raw) do
      {:ok, data} ->
        Logger.debug(fn -> "Data: #{inspect(data)}" end)
        msg

      {:error, err} ->
        Logger.error("Decode error: #{inspect(err)}")
        Message.failed(msg, err)
    end
  end

  def transform(event, _opts) do
    %Broadway.Message{
      data:        event,
      acknowledger: NoopAcknowledger.init()
    }
  end
end

Configuration Options

When calling OffBroadwayWebSocket.Producer, you may pass:


Default Configuration

Out of the box, OffBroadwayWebSocket.Producer uses these defaults:

Option Default Description
:url WebSocket URL (required)
:path WebSocket path (required)
:ws_timeoutnil Idle timeout (ms) for ping/data
:await_timeout10_000gun.await_up/2 timeout (ms)
:headers[] Upgrade HTTP headers
:min_demand10 Broadway min_demand
:max_demand100 Broadway max_demand
:telemetry_id:websocket_producer Prefix for telemetry events
:gun_opts%{} Direct options to :gun.open/3, etc.
:ws_retry_opts see Default ws_retry_opts Initial retry state
:ws_retry_fun&OffBroadwayWebSocket.State.default_ws_retry_fun/1 Backoff function contract

Default Backoff Function

By default, a constant backoff function is used with the config shown below:

%{
  max_retries:  5,     # total retry attempts
  retries_left: 5,     # decremented on each failure
  delay:        10_000 # constant delay in ms between retries
}

Telemetry Events

Fired under [:<telemetry_id>, :connection, <event>]:

Event Measurements Metadata Description
:success%{count: 1}%{url: String} Handshake completed
:failure%{count: 1}%{reason: term} Connect or upgrade failed
:disconnected%{count: 1}%{reason: term} Underlying TCP connection dropped
:timeout%{count: 1}%{} Idle ping/data timeout

| :status | %{value: 0|1} | %{} | 0=down, 1=up |

Attach as usual:

:telemetry.attach(
  "log-connection-success",
  [:websocket_producer, :connection, :success],
  fn event_name, measurements, metadata, _config ->
    IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
  end,
  nil
)

Running Tests

mix test

Dialyzer

mix dialyzer --plt
mix dialyzer

Contributing

PRs and issues welcome! Please follow Elixir conventions and include tests.


License

Apache License 2.0 © 2025