Nipper
Lightweight, embeddable MQTT broker for BEAM-based edge applications
Nipper is a minimal MQTT v3.1.1 broker designed to run within your Elixir/Phoenix applicationβs supervision tree. Perfect for edge computing, IoT gateways, and embedded systems where you need local MQTT messaging without the overhead of a separate broker service.
π Features
Core MQTT Protocol
- β MQTT 3.1.1 full protocol implementation
- β QoS 0 & QoS 1 message delivery
- β Persistent & clean sessions
- β Keep-alive mechanism with configurable timeouts
- β Will messages and retained messages
- β Topic subscriptions with wildcard support
- β Binary payload support
Security & DoS Protection π‘οΈ
-
β
Rate limiting with sliding window algorithm
- Connection rate limiting (per IP)
- Message rate limiting (per client)
- Authentication failure protection
- β Packet size enforcement with configurable limits
- β Resource monitoring and automatic cleanup
- β Memory usage monitoring with automatic GC
- β Pluggable authentication system
Architecture & Performance
- β Embedded-first: Runs within your supervision tree
- β BEAM-native: Leverages OTP patterns and Phoenix.PubSub
- β GenStage pipeline for event streaming
- β High-performance TCP with ThousandIsland
- β Non-blocking I/O and efficient message routing
- β Automatic resource cleanup on process crashes
Observability & Monitoring
-
β
Comprehensive telemetry using
:telemetry - β Connection lifecycle tracking
- β Message flow monitoring
- β Rate limiting alerts
- β Memory usage alerts
- β Resource monitoring events
Development & Testing
- β 207+ comprehensive tests (unit + integration)
- β Property-based testing with StreamData
- β Code coverage tracking
- β Static analysis with Credo and Dialyzer
- β Benchmarking support
π Quick Start
Installation
Add nipper to your list of dependencies in mix.exs:
def deps do
[
{:nipper, "~> 0.1.0"}
]
endBasic Setup
Add to your applicationβs supervision tree:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
# Your existing children...
Nipper.Supervisor
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endConfiguration
Configure in config/config.exs:
config :nipper,
# Listener configuration
listeners: [
default: [
port: 1883,
transport: :tcp,
acceptors: 100,
max_connections: 10_000
]
],
# Authentication
auth: [
module: Nipper.Auth.AllowAll # For development
],
# Protocol limits
protocol: [
max_packet_size: 65_536,
max_client_id_length: 128,
default_keepalive: 60
],
# Rate limiting (DoS protection)
rate_limiter: [
connection_limit: 10, # connections per minute per IP
message_limit: 100, # messages per minute per client
auth_limit: 5 # auth failures per 5 minutes per IP
]Testing Your Setup
Start your application and test with a simple MQTT client:
# Terminal 1: Start your application
iex -S mix
# Terminal 2: Test with mosquitto_pub/sub
mosquitto_sub -h localhost -p 1883 -t "test/topic" &
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello, Nipper!"π Advanced Usage
Custom Authentication
Create a custom authentication module:
defmodule MyApp.MqttAuth do
@behaviour Nipper.Auth
@impl true
def authenticate(conn_info) do
%{client_id: client_id, username: username, password: password} = conn_info
case verify_credentials(username, password) do
{:ok, user_info} ->
{:ok, %{user_id: user_info.id, permissions: user_info.permissions}}
:error ->
{:error, :not_authorized}
end
end
defp verify_credentials(username, password) do
# Your authentication logic here
end
endConfigure it:
config :nipper,
auth: [module: MyApp.MqttAuth]Telemetry Integration
Set up telemetry handlers for monitoring:
# In your application startup
:telemetry.attach_many(
"mqtt-metrics",
[
[:nipper, :client, :connected],
[:nipper, :client, :disconnected],
[:nipper, :message, :published],
[:nipper, :rate_limit, :exceeded],
[:nipper, :memory_monitor, :warning]
],
&MyApp.TelemetryHandler.handle_event/4,
%{}
)Upstream Integration
Configure event streaming to external systems:
config :nipper,
upstream: [
enabled: true,
transport: MyApp.UpstreamTransport,
producer: [batch_size: 100, batch_timeout: 1000],
batcher: [max_batches: 10]
]ποΈ Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Nipper.Supervisor β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Listener β βRateLimiter β βResourceMon β β
β β(ThousandIs.)β β (ETS) β β (GenServer) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β βMemoryMon β β Router β β Upstream β β
β β(GenServer) β β(Phoenix.PS) β β Producer β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββKey Components
- Nipper.Listener: ThousandIsland-based TCP server
- Nipper.Connection: Per-client connection handler
- Nipper.RateLimiter: DoS protection with sliding windows
- Nipper.ResourceMonitor: Process monitoring and cleanup
- Nipper.MemoryMonitor: Memory usage tracking and GC
- Nipper.Router: Message routing via Phoenix.PubSub
- Nipper.Upstream.Producer: Event streaming pipeline
π§ͺ Development
Running Tests
# Unit tests
mix test --exclude integration
# All tests
mix test
# With coverage
mix test --coverCode Quality
# Static analysis
mix credo --strict
mix dialyzer
# Documentation
mix docsBenchmarking
# Connection benchmarks
mix run benchmarks/connections.exs
# Message throughput
mix run benchmarks/messages.exsπ Performance
Benchmarks (on modern hardware)
- Connections: 1000+ concurrent clients
- Message throughput: 10,000+ messages/second
- Memory usage: <100MB for 1000 clients
- Latency: <1ms message routing
Resource Limits (configurable)
- Max connections: 10,000 (default)
- Max packet size: 65KB (default)
- Rate limits: 10 conn/min, 100 msg/min per client
- Memory thresholds: 100MB warning, 500MB critical
π£οΈ Roadmap
Version 0.2.0 (Planned)
- [ ] MQTT 5.0 protocol support
- [ ] SSL/TLS transport layer
- [ ] Persistent message storage
- [ ] WebSocket transport support
- [ ] Enhanced authentication providers
Version 0.3.0 (Planned)
- [ ] Cluster support and horizontal scaling
- [ ] MQTT bridge functionality
- [ ] Message transformation plugins
- [ ] Docker containerization
- [ ] Kubernetes deployment manifests
π Known Limitations
This is version 0.1.0 - suitable for development and small-scale deployments:
- Single-node only (clustering planned for 0.3.0)
- In-memory only (persistence planned for 0.2.0)
- MQTT 3.1.1 only (5.0 support planned for 0.2.0)
- TCP only (WebSocket/SSL planned for 0.2.0)
π€ Contributing
- Fork it
-
Create your feature branch (
git checkout -b feature/amazing-feature) -
Add tests and ensure they pass (
mix test) -
Run code quality checks (
mix credo,mix dialyzer) -
Commit your changes (
git commit -am 'Add amazing feature') -
Push to the branch (
git push origin feature/amazing-feature) - Create a Pull Request
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π Acknowledgments
- ThousandIsland for high-performance TCP
- Phoenix.PubSub for scalable messaging
- GenStage for event streaming
- The Elixir and OTP communities for the excellent foundations
Built with β€οΈ for the BEAM ecosystem