OffBroadwayWebSocket

An Elixir library that provides a Broadway producer for handling WebSocket connections using the gun library. It supports ping/pong timeout monitoring, and demand-based message dispatching in an Off-Broadway setup.

Features

Installation

Add off_broadway_websocket to your list of dependencies in mix.exs:

def deps do
  [
    {:off_broadway_websocket, "~> 0.0.5"}
  ]
end

Run the following to fetch and compile the dependency:

mix deps.get
mix deps.compile

Usage

Basic Setup with Broadway

In your project, create a Broadway module to use the OffBroadwayWebSocket.Producer as the producer.

defmodule MyApp.Broadway do
  use Broadway
  require Logger

  alias Broadway.Message

  def start_link(_args) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          OffBroadwayWebSocket.Producer,
          url: "wss://example.com",
          path: "/path_to_ws_endpoint",
          reconnect_delay: 5_000,
          ws_timeout: 15_000,
          ws_opts: %{keepalive: 10_000, silence_pings: false},
          http_opts: %{version: :"HTTP/1.1"}
        },
        transformer: {__MODULE__, :transform, []},
        concurrency: 1
      ],
      processors: [
        default: [min_demand: 0, max_demand: 100, concurrency: 8]
      ]
    )
  end

  @impl true
  def handle_message(_processor, %Message{data: raw_message} = message, _context) do
    case decode_message(raw_message) do
      {:ok, %{"type" => "heartbeat"}} ->
        Logger.debug("Heartbeat message received.")
        message

      {:ok, data} ->
        Logger.info("Data received: #{inspect(data)}")
        message

      {:error, error} ->
        Logger.error("Failed to decode message: #{inspect(error)}")
        message
    end
  end

  defp decode_message(message) when is_binary(message) do
    Jason.decode(message)
  end

  defp decode_message(other) do
    {:error, {:unsupported_message_format, other}}
  end

  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, _successful, _failed) do
    :ok
  end
end

Configuration Options

Complete list of options accepted by http_opts and ws_opts is available here.

Telemetry Events

OffBroadwayWebSocket emits telemetry events for key WebSocket operations. These events can be used for monitoring and integration with tools like Prometheus, Datadog, or other observability platforms.

Event Table

Event NameMeasurementsMetadataDescription
[:websocket_producer, :connection, :success]count: 1url: String Emitted when a connection is successfully established.
[:websocket_producer, :connection, :failure]count: 1reason: term() Emitted when a connection attempt fails.
[:websocket_producer, :connection, :disconnected]count: 1reason: term() Emitted when the WebSocket connection is disconnected.
[:websocket_producer, :connection, :timeout]count: 1 (none) Emitted when a ping/pong timeout occurs.
[:websocket_producer, :connection, :status]value: [0,1] (none) Emitted to indicate the current WebSocket connection status (0 = down, 1 = up).

Example Usage

You can attach custom handlers to these telemetry events for logging or monitoring purposes. Here’s an example:

:telemetry.attach(
  "log-connection-success",
  [:websocket_producer, :connection, :success],
  fn event_name, measurements, metadata, _config ->
    IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
  end,
  nil
)

This allows you to customize behavior or integrate the events into observability tools.

Running Tests

To run tests:

mix test

Ensure your WebSocket endpoint is reachable and configured properly for end-to-end tests.

Running Dialyzer

For static analysis with Dialyzer, make sure PLTs are built:

mix dialyzer --plt
mix dialyzer

Contributing

Feel free to open issues or submit PRs to enhance the functionality of OffBroadwayWebSocket. Contributions are welcome!

License

This project is licensed under the Apache License, Version 2.0.