PubSubx
PubSubx is a lightweight event router for Elixir. It is built for apps that
need more than exact pubsub topics but do not want a larger event bus:
hierarchical topic patterns, subscriber-side filtering, structured event
envelopes, and Telemetry hooks.
Why use it
-
Subscribe with exact topics or wildcard patterns like
"orders.*"and"orders.**". -
Receive
%PubSubx.Event{}envelopes with topic, payload, timestamp, metadata, correlation ID, and trace ID. - Filter events at the subscriber so a broad subscription can still be selective.
- Observe subscribe, unsubscribe, publish, delivery, drop, distributed publish, and subscriber-count events through Telemetry.
Installation
If available in Hex, the package can be
installed by adding pub_subx to your list of dependencies in mix.exs:
def deps do
[
{:pub_subx, "~> 0.2.0"}
]
endAPI conventions
The public API keeps the same argument order throughout:
subscribe(pubsub, topic_pattern, pid, opts \\ [])publish(pubsub, topic, payload, opts \\ [])unsubscribe(pubsub, topic_pattern, pid)
Usage
Add PubSubx to your supervision tree
In your application.ex, start the PubSub server as part of your supervision
tree:
def start(_type, _args) do
children = [
{MyApp.MyPubSub, []}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
endBasic subscribe and publish
Subscriptions are automatically removed when the subscriber process exits, so
explicit unsubscribe/3 calls are only needed when you want to stop receiving
events before the process terminates.
:ok = MyApp.MyPubSub.subscribe("orders.created", self())
:ok = MyApp.MyPubSub.publish("orders.created", %{id: 1},
metadata: %{source: :checkout},
correlation_id: "corr-123",
trace_id: "trace-123"
)
receive do
%PubSubx.Event{} = event ->
IO.inspect(event.topic)
IO.inspect(event.payload)
endSubscribe with an optional filter
The :filter option is optional. Use it only when a subscriber wants to accept
just a subset of the events that match a topic pattern.
As with exact subscriptions, the subscriber is automatically deregistered if its process exits.
:ok =
MyApp.MyPubSub.subscribe("orders.*", self(),
filter: fn event -> event.payload.region == :eu end
)
:ok =
MyApp.MyPubSub.publish("orders.created", %{id: 1, region: :eu},
metadata: %{source: :checkout},
correlation_id: "corr-123",
trace_id: "trace-123"
)
receive do
%PubSubx.Event{} = event ->
IO.inspect(event.topic)
IO.inspect(event.payload)
endDistributed publish across interconnected nodes
If your Erlang nodes are already connected (eg: libcluster usage),
PubSubx.Utils.distribute_publish/4 can fan a publish out to all nodes or
selected nodes. Local delivery and node_opts: [:visible, :this] are enabled
by default; pass include_local?: false only when you want remote-only fanout.
This is best-effort delivery; it does not synchronize subscriptions or wait for
acknowledgements.
It also relies on the PubSub GenServer being started in each application
supervision tree under the same PubSub module name.
:ok = MyApp.MyPubSub.subscribe("orders.created", self())
summary =
PubSubx.Utils.distribute_publish(MyApp.MyPubSub, "orders.created", %{id: 1},
publish: [
metadata: %{source: :cluster},
correlation_id: "dist-123"
]
)
IO.inspect(summary.attempted_nodes)
# Remote-only fanout:
# PubSubx.Utils.distribute_publish(MyApp.MyPubSub, "orders.created", %{id: 1},
# include_local?: false
# )
receive do
%PubSubx.Event{} = event ->
IO.inspect(event.topic)
IO.inspect(event.metadata)
endTopic matching
- Exact atom topics remain exact-only.
-
Binary topics can be hierarchical:
"orders.created","orders.eu.created". *matches one segment.**matches zero or more trailing segments and must be the last segment.
Distributed publish
PubSubx.Utils.distribute_publish/4 provides best-effort cross-node fanout.
It can include the local node, forwards publish options, and emits
[:pub_subx, :distribute, :publish].
It does not:
- synchronize subscriptions across nodes
- wait for acknowledgements
- retry failed remote deliveries
Telemetry
PubSubx emits the following Telemetry events:
[:pub_subx, :subscribe][:pub_subx, :unsubscribe][:pub_subx, :publish][:pub_subx, :delivery][:pub_subx, :drop][:pub_subx, :subscriber_count][:pub_subx, :distribute, :publish]
Benchmarks
Benchmark scaffolding lives in bench/pub_subx_bench.exs
and compares:
PubSubxexact publishPubSubxwildcard publishPhoenix.PubSubexact publish-
plain
Registryexact dispatch
Run it with:
mix run bench/pub_subx_bench.exs
The script starts the phoenix_pubsub application it needs before running the
comparison, so the command above is the intended way to execute the benchmark.
Example run on a MacBook Pro 13-inch (Mid 2017, no Touch Bar, two Thunderbolt 3 ports), Intel Core i5-7360U 2.30 GHz, 4 cores, 8 GB RAM:
Name ips average
pub_subx exact publish 917.28 K 1.09 μs
pub_subx wildcard publish 834.32 K 1.20 μs
registry exact dispatch 459.20 K 2.18 μs
phoenix_pubsub exact publish 266.79 K 3.75 μsRelative to this run:
pub_subx exact publishwas243.82%faster thanphoenix_pubsub exact publishpub_subx exact publishwas99.76%faster thanregistry exact dispatchpub_subx wildcard publishwas212.73%faster thanphoenix_pubsub exact publishpub_subx wildcard publishwas81.69%faster thanregistry exact dispatchpub_subx wildcard publishwas9.94%slower thanpub_subx exact publish
This example is illustrative only. Benchmark results will vary with CPU, Elixir/Erlang versions, scheduler behavior, and system load.
Future direction
If repeated event schemas emerge across multiple users of the library, a later release can add typed event helpers or macros on top of the current event envelope. That is intentionally deferred for now.
The docs can be found at https://hexdocs.pm/pub_subx.
License
This project is licensed under the MIT License. See LICENSE for details.