OffBroadway.EMQTT
An MQTT connector based on emqtt for Broadway.
MQTT is a lightweight publish/subscribe protocol widely used in IoT, industrial automation, and telemetry. This library connects a Broadway pipeline to an MQTT broker, using the MQTT protocol itself for backpressure and message reliability rather than an in-process buffer.
Installation
def deps do
[
{:off_broadway_emqtt, "~> 0.3.0"}
]
end
By default, :emqtt compiles the Quic transport library. To build without it:
BUILD_WITHOUT_QUIC=1 mix deps.compileUsage
defmodule MyBroadway do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {OffBroadway.EMQTT.Producer,
topics: [
{"sensors/status", :at_most_once}, # QoS 0
{"sensors/temperature", :at_least_once}, # QoS 1
{"sensors/humidity", :exactly_once} # QoS 2
],
max_inflight: 100,
config: [
host: "mqtt.example.com",
port: 1883,
username: "my-user",
password: "my-password",
clientid: "my-pipeline"
]
},
concurrency: 1
],
processors: [default: [concurrency: 10]],
batchers: [default: [batch_size: 100, batch_timeout: 500, concurrency: 5]]
)
end
@impl true
def handle_message(_, message, _context) do
IO.inspect(message.data, label: "payload")
IO.inspect(message.metadata.topic, label: "topic")
message
end
@impl true
def handle_batch(_, messages, _, _) do
messages
end
end
Each MQTT message is delivered as a Broadway message where data is the raw payload binary
and metadata contains the remaining fields from the MQTT packet (topic, qos, packet_id, etc.).
Reliability and message delivery
QoS levels
Use QoS 1 (:at_least_once) or QoS 2 (:exactly_once) for reliable delivery. Messages are only
acknowledged to the broker after Broadway successfully processes them. If Broadway fails to process
a message, the broker redelivers it (with QoS 1/2).
QoS 0 (:at_most_once) provides no delivery guarantee. The broker fires and forgets.
Backpressure via max_inflight
The max_inflight option limits how many unACKed QoS 1/2 messages the broker will send before
waiting for acknowledgements. This is the primary backpressure mechanism: the broker stops
delivering new messages once the window is full.
For MQTT v5 (config: [proto_ver: :v5]), Receive-Maximum is set automatically in the CONNECT
properties so the broker enforces the limit server-side.
Session persistence across restarts
clean_start defaults to false. When a producer restarts (after a crash or deployment), the
broker recognises the clientid and redelivers any QoS 1/2 messages that were not acknowledged
before the restart. No messages are lost between restarts.
If you want a fresh session on every connect (discarding unACKed messages), set
config: [clean_start: true] explicitly.
[!NOTE] Each producer instance connects with the configured
clientidplus an index suffix:my-pipeline_0,my-pipeline_1, and so on. Aconcurrency: 1pipeline configured withclientid: "my-pipeline"appears on the broker asmy-pipeline_0. If you are upgrading from v0.2.x and rely on an exact clientid for a persistent session or ACL, either change your config toclientid: "my-pipeline_0"or accept that the session will be treated as new on the first v0.3.0 connect.
Reconnection
By default, if the MQTT connection is lost the producer process stops and Broadway's supervisor
restarts it. The fresh producer creates a new emqtt connection and re-subscribes to all topics.
With clean_start: false (the default), the broker redelivers any unACKed QoS 1/2 messages.
Alternatively, you can enable emqtt's built-in reconnect by passing reconnect in config:
config: [
host: "mqtt.example.com",
clientid: "my-pipeline",
reconnect: :infinity,
reconnect_timeout: 5,
clean_start: false
][!IMPORTANT] emqtt's reconnect reopens the TCP connection but does not re-subscribe. You must set
clean_start: falseso the broker restores the session and delivers messages again. If you enablereconnectwithclean_start: true, messages silently stop arriving after the first reconnect.
on_success and on_failure
| Option | Value | Behaviour |
|---|---|---|
on_success | :ack (default) | ACKs the message to the broker after successful processing |
on_success | :noop | Does not ACK; broker will redeliver |
on_failure | :noop (default) | Does not ACK; broker will redeliver (QoS 1/2) |
on_failure | :ack | ACKs even on failure; message is not redelivered |
Multiple consumers (concurrency)
To distribute messages across multiple producer instances, set concurrency > 1 and provide a
shared_group name. Without shared_group, every producer instance receives every message,
causing duplicates.
producer: [
module: {OffBroadway.EMQTT.Producer,
topics: [{"work/queue", :at_least_once}],
shared_group: "my-pipeline",
config: [host: "mqtt.example.com", clientid: "worker"]
},
concurrency: 3
]
This creates three MQTT connections (worker_0, worker_1, worker_2) all in the shared group
my-pipeline. The broker distributes messages across them using MQTT shared subscriptions
($share/my-pipeline/work/queue).
max_inflight applies per connection. With concurrency: 3 and max_inflight: 100, up to 300
unACKed messages can be in-flight across the pipeline at once.
Custom message handler
Implement OffBroadway.EMQTT.MessageHandler to customise how MQTT messages are converted to
Broadway messages. The default handler places the raw payload binary in data and the rest of
the MQTT packet fields in metadata.
defmodule MyApp.JsonHandler do
@behaviour OffBroadway.EMQTT.MessageHandler
@impl true
def handle_message(message, ack_ref, _opts) do
{payload, metadata} = Map.pop(message, :payload)
%Broadway.Message{
data: Jason.decode!(payload),
metadata: metadata,
acknowledger: {OffBroadway.EMQTT.Acknowledger, ack_ref, %{}}
}
end
endproducer: [
module: {OffBroadway.EMQTT.Producer,
topics: [{"events/#", :at_least_once}],
message_handler: MyApp.JsonHandler,
config: [host: "mqtt.example.com"]
},
concurrency: 1
]Telemetry
| Event | Measurements | Metadata |
|---|---|---|
[:off_broadway_emqtt, :producer, :init] | %{time: integer} | %{broadway_name: term, producer_index: integer} |
[:off_broadway_emqtt, :producer, :terminate] | %{time: integer} | %{broadway_name: term, producer_index: integer, client_id: string | nil, reason: term} |
[:off_broadway_emqtt, :connection, :up] | %{time: integer} | %{client_id: string, producer_index: integer} |
[:off_broadway_emqtt, :connection, :down] | %{time: integer} | %{client_id: string, producer_index: integer, reason: term} |
[:off_broadway_emqtt, :subscription, :success] | %{time: integer} | %{client_id: string, producer_index: integer, topic: string, granted_qos: 0..2} |
[:off_broadway_emqtt, :subscription, :error] | %{time: integer} | %{client_id: string, producer_index: integer, topic: string, reason: term} |
[:off_broadway_emqtt, :receive_message, :start] | %{time: integer, count: 1} | %{client_id: string, producer_index: integer, topic: string, qos: integer} |
[:off_broadway_emqtt, :receive_message, :ack] | %{time: integer, count: 1} | %{topic: string, qos: integer, status: :on_success | :on_failure} |
See the Producer moduledoc for common connection.down reason shapes (auth failures, TLS errors, transport errors).
Changelog
See CHANGELOG.md for upgrade instructions and release history.