Test

MQTT v5 server side sessions and routing

MQTT v5 session and topic routing - embeddable in Erlang projects.

This library handles pools of MQTT sessions. Transports (connections) attach to MQTT sessions to relay packets.

The sessions handles packets, queues, and a user-context state.

Access control and authentication is handled with a runtime module. This module is configured in the mqtt_sessions application env key runtime.

The default and example runtime is src/mqtt_sessions_runtime.erl.

Note that this library does not handle TCP/IP connections. It handles the complete MQTT session logic. Other libraries are used for transporting the MQTT packets to/from external clients.

When a subscription is made to a $SYS topic the subscriber is mapped to the default pool. This makes it possible to share system information between different pools.

How connects work

mqtt_sessions keeps the MQTT session state separate from the transport that carries packets.

An incoming MQTT CONNECT does not always create a brand new session. It can:

This means that the same MQTT session can survive disconnects and later be reattached by another connection source, as long as the session has not expired and the protocol flags allow the reconnect.

The attached transport can come from different sources. For example, one session can be driven by a websocket bridge used by a browser, while another session can be driven by a native MQTT listener connection. The session process only manages MQTT state; the actual transport process is external and can vary per reconnect.

Disconnect timeout behaviour

Sessions normally keep the negotiated session_expiry_interval after a disconnect.

Runtimes can mark a session as:

When a session is both websocket-originated and anonymous, mqtt_sessions uses a shorter timeout after the first disconnect:

This is intended to reduce memory use for one-page anonymous visits, such as headless crawler traffic that creates many short-lived browser sessions without reusing them.

Authenticated websocket sessions and non-websocket sessions keep their normal session expiry behaviour.

Runtime callbacks

The runtime module is responsible for authentication, authorization, and for describing some session characteristics derived from the opaque user_context().

The runtime callbacks are:

The last three callbacks are used by the session process to decide whether a disconnected session should get the shortened timeout described above, without inspecting the runtime-specific user_context() directly.

Packet size protection

mqtt_sessions distinguishes between incoming and outgoing MQTT packet limits.

The incoming limit is controlled with the application env key max_incoming_packet_size.

The outgoing limit is controlled with the application env key max_outgoing_packet_size.

Incoming packets:

Outgoing packets:

Defaults:

Retained memory protection

Retained messages can be bounded with the application env key max_retained_memory, expressed in bytes.

When configured, mqtt_sessions:

If max_retained_memory is not configured, then retained-message storage is not explicitly configured, mqtt_sessions defaults it to 500 MB.

Incoming publish rate limiting

mqtt_sessions can rate-limit incoming client PUBLISH packets per session using:

max_incoming_messages_rate is the sustained number of incoming publish messages per second. The period for this limit is 1 second.

max_incoming_messages_burst is the extra burst capacity on top of the regular rate. This is not a separate time window; it is additional token-bucket capacity that can be spent immediately and then refills at the regular per- second rate.

The limiter uses a token-bucket model per session. This means a client can send messages at the configured steady rate, while also consuming a limited burst budget for short spikes.

When the bucket is empty, mqtt_sessions does not block inside the session process. This is important because blocking there would let unrelated Erlang messages pile up in the mailbox. Instead it fails fast to keep memory use bounded:

Defaults:

TODO

Current status of the list below:

  1. Add instrumentation