PubSubx

Hex.pm VersionLicenseIssues

PubSubx is a lightweight event router for Elixir. It is built for apps that need more than exact pubsub topics but do not want a larger event bus: hierarchical topic patterns, subscriber-side filtering, structured event envelopes, and Telemetry hooks.

Why use it

Installation

If available in Hex, the package can be installed by adding pub_subx to your list of dependencies in mix.exs:

def deps do
  [
    {:pub_subx, "~> 0.2.0"}
  ]
end

API conventions

The public API keeps the same argument order throughout:

Usage

Add PubSubx to your supervision tree

In your application.ex, start the PubSub server as part of your supervision tree:

def start(_type, _args) do
  children = [
    {MyApp.MyPubSub, []}
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

Basic subscribe and publish

Subscriptions are automatically removed when the subscriber process exits, so explicit unsubscribe/3 calls are only needed when you want to stop receiving events before the process terminates.

:ok = MyApp.MyPubSub.subscribe("orders.created", self())

:ok = MyApp.MyPubSub.publish("orders.created", %{id: 1},
  metadata: %{source: :checkout},
  correlation_id: "corr-123",
  trace_id: "trace-123"
)

receive do
  %PubSubx.Event{} = event ->
    IO.inspect(event.topic)
    IO.inspect(event.payload)
end

Subscribe with an optional filter

The :filter option is optional. Use it only when a subscriber wants to accept just a subset of the events that match a topic pattern.

As with exact subscriptions, the subscriber is automatically deregistered if its process exits.

:ok =
  MyApp.MyPubSub.subscribe("orders.*", self(),
    filter: fn event -> event.payload.region == :eu end
  )

:ok =
  MyApp.MyPubSub.publish("orders.created", %{id: 1, region: :eu},
    metadata: %{source: :checkout},
    correlation_id: "corr-123",
    trace_id: "trace-123"
  )

receive do
  %PubSubx.Event{} = event ->
    IO.inspect(event.topic)
    IO.inspect(event.payload)
end

Distributed publish across interconnected nodes

If your Erlang nodes are already connected (eg: libcluster usage), PubSubx.Utils.distribute_publish/4 can fan a publish out to all nodes or selected nodes. Local delivery and node_opts: [:visible, :this] are enabled by default; pass include_local?: false only when you want remote-only fanout. This is best-effort delivery; it does not synchronize subscriptions or wait for acknowledgements. It also relies on the PubSub GenServer being started in each application supervision tree under the same PubSub module name.

:ok = MyApp.MyPubSub.subscribe("orders.created", self())

summary =
  PubSubx.Utils.distribute_publish(MyApp.MyPubSub, "orders.created", %{id: 1},
    publish: [
      metadata: %{source: :cluster},
      correlation_id: "dist-123"
    ]
  )

IO.inspect(summary.attempted_nodes)

# Remote-only fanout:
# PubSubx.Utils.distribute_publish(MyApp.MyPubSub, "orders.created", %{id: 1},
#   include_local?: false
# )

receive do
  %PubSubx.Event{} = event ->
    IO.inspect(event.topic)
    IO.inspect(event.metadata)
end

Topic matching

Distributed publish

PubSubx.Utils.distribute_publish/4 provides best-effort cross-node fanout. It can include the local node, forwards publish options, and emits [:pub_subx, :distribute, :publish].

It does not:

Telemetry

PubSubx emits the following Telemetry events:

Benchmarks

Benchmark scaffolding lives in bench/pub_subx_bench.exs and compares:

Run it with:

mix run bench/pub_subx_bench.exs

The script starts the phoenix_pubsub application it needs before running the comparison, so the command above is the intended way to execute the benchmark.

Example run on a MacBook Pro 13-inch (Mid 2017, no Touch Bar, two Thunderbolt 3 ports), Intel Core i5-7360U 2.30 GHz, 4 cores, 8 GB RAM:

Name                                   ips        average
pub_subx exact publish            917.28 K        1.09 μs
pub_subx wildcard publish         834.32 K        1.20 μs
registry exact dispatch           459.20 K        2.18 μs
phoenix_pubsub exact publish      266.79 K        3.75 μs

Relative to this run:

This example is illustrative only. Benchmark results will vary with CPU, Elixir/Erlang versions, scheduler behavior, and system load.

Future direction

If repeated event schemas emerge across multiple users of the library, a later release can add typed event helpers or macros on top of the current event envelope. That is intentionally deferred for now.

The docs can be found at https://hexdocs.pm/pub_subx.

License

This project is licensed under the MIT License. See LICENSE for details.