Wamp
An Elixir implementation of the WAMP (Web Application Messaging Protocol) providing both Publish & Subscribe and Remote Procedure Call messaging patterns over a single connection.
Features
- Publish & Subscribe - Loosely coupled, many-to-many event distribution
- Remote Procedure Calls - Routed, point-to-point procedure invocation
- Phoenix Integration - WebSocket transport adapter for Phoenix
- Extensible Architecture - Custom broker and dealer modules via behaviours
- Authentication - Pluggable challenge-response authentication (anonymous, ticket, WAMP-CRA, SCRAM)
- Subscriber Filtering - Allow/Deny listing by session ID, auth ID, or auth role
- Call Cancellation - Cancel in-flight RPC calls with skip, kill, or killnowait modes
- Shared Registration - Multiple callees can register the same procedure URI
Installation
Add wamp to your list of dependencies in mix.exs:
def deps do
[
{:wamp, "~> 0.2.0"}
]
endQuick Start
1. Define a Router
The router is the central component that manages client sessions and coordinates message routing.
defmodule MyApp.Router do
use Wamp.Router,
otp_app: :my_app
end2. Define a Client
Clients connect to a router and can act as callers, callees, publishers, and subscribers.
defmodule MyApp.Client do
use Wamp.Client,
otp_app: :my_app
# Auto-register procedures on connect
procedure "com.myapp.add", MyApp.Math, :add
procedure "com.myapp.multiply", MyApp.Math, :multiply
# Auto-subscribe to topics on connect
channel "com.myapp.events", MyApp.EventHandler
end3. Implement Procedure Handlers
Procedure handler functions receive (args, kwargs, details) and must return a tagged tuple:
defmodule MyApp.Math do
def add(args, _kwargs, _details) do
{:ok, [Enum.sum(args)]}
end
def multiply([a, b], _kwargs, _details) do
{:ok, [a * b]}
end
def divide([a, b], _kwargs, _details) do
{:ok, [a / b], %{"remainder" => rem(a, b)}}
end
endAccepted return types:
| Return | Description |
|---|---|
{:ok, args} | Success with a list of positional arguments |
{:ok, args, kwargs} | Success with positional arguments and a keyword map |
{:error, uri} | Error with a WAMP error URI |
{:error, uri, args} | Error with URI and positional arguments |
{:error, uri, args, kwargs} | Error with URI, positional arguments, and keyword map |
4. Implement Event Subscribers
defmodule MyApp.EventHandler do
use Wamp.Subscriber
def handle_event({_pubid, _details, args, kwargs}, sub) do
IO.inspect(args, label: "Event received")
{:noreply, sub}
end
end5. Configure and Start
# config/config.exs
config :my_app, MyApp.Router,
realm: "realm1"
config :my_app, MyApp.Client,
realm: "realm1",
router: MyApp.RouterAdd to your supervision tree:
children = [
{MyApp.Router, realm: "realm1"},
{MyApp.Client, realm: "realm1", router: MyApp.Router}
]
Supervisor.start_link(children, strategy: :one_for_one)Architecture
+------------------+
| Wamp.Router |
| (GenServer) |
+--------+---------+
|
+-----------------------+-------------------+
| |
+--------v---------+ +--------v---------+
| Wamp.PubSub | | Wamp.RPC |
| .Broker | | .Dealer |
| (GenServer) | | (GenServer) |
+---------+--------+ +--------+---------+
| |
+-------+-------+ +------+------+
| | | |
Subscribers Publishers Callees Callers
(Wamp.Client) (Wamp.Client)The library is organized around these core modules:
| Module | Description |
|---|---|
Wamp.Router | Central message router managing sessions and coordination |
Wamp.PubSub.Broker | Handles topic subscriptions and event distribution |
Wamp.RPC.Dealer | Handles procedure registration and call routing |
Wamp.Client | Client-side protocol (caller, callee, publisher, subscriber) |
Wamp.Subscriber | GenServer wrapper for event subscription handlers |
Wamp.Transport.Phoenix | Phoenix WebSocket transport adapter |
Wamp.Spec | WAMP message type constants |
Usage
Remote Procedure Calls
# Register a procedure at runtime
MyApp.Client.register("com.myapp.echo", {MyApp.Handlers, :echo})
# Call a procedure
request_id = MyApp.Client.call("com.myapp.add", [1, 2])
# Wait for the result
{:ok, {[3], %{}}} = MyApp.Client.await(request_id)
# Or check if result is ready
case MyApp.Client.yielded(request_id) do
true -> MyApp.Client.yield(request_id)
false -> # still pending
end
# Unregister
MyApp.Client.unregister("com.myapp.echo")Publish & Subscribe
# Subscribe to a topic
MyApp.Client.subscribe("com.myapp.events")
# Publish an event (fire-and-forget)
MyApp.Client.publish("com.myapp.events", ["hello"])
# Publish with keyword arguments
MyApp.Client.publish("com.myapp.events", [], %{"message" => "hello"})
# Publish with acknowledgment
{:ok, publication_id} = MyApp.Client.ack_publish("com.myapp.events", ["hello"])
# Unsubscribe
MyApp.Client.unsubscribe("com.myapp.events")Publishing Options
# Exclude yourself from receiving the event
MyApp.Client.publish("topic", args, %{}, %{"exclude_me" => true})
# Disclose publisher identity to subscribers
MyApp.Client.publish("topic", args, %{}, %{"disclose_me" => true})
# Target specific sessions
MyApp.Client.publish("topic", args, %{}, %{"eligible" => [session_id1, session_id2]})
# Exclude specific sessions
MyApp.Client.publish("topic", args, %{}, %{"exclude" => [session_id3]})
# Filter by auth role
MyApp.Client.publish("topic", args, %{}, %{"eligible_authrole" => ["admin"]})Error Handling in Procedures
Return an {:error, uri, ...} tuple to signal an error to the caller:
def divide([a, b], _kwargs, _details) do
if b == 0 do
{:error, "com.myapp.error.division_by_zero", ["Cannot divide by zero"]}
else
{:ok, [a / b]}
end
end
Alternatively, raise Wamp.Client.InvocationError for exception-based error handling:
def divide([a, b], _kwargs, _details) do
if b == 0 do
raise Wamp.Client.InvocationError,
uri: "com.myapp.error.division_by_zero",
args: ["Cannot divide by zero"],
kwargs: %{}
end
{:ok, [a / b]}
endPhoenix WebSocket Integration
1. Define a Transport Module
defmodule MyAppWeb.WampTransport do
use Wamp.Transport.Phoenix,
router: MyApp.Router
def connection(socket) do
# Perform connection-level authorization here
{:ok, socket}
end
end2. Configure Your Endpoint
# lib/my_app_web/endpoint.ex
socket "/ws", MyAppWeb.WampTransport,
websocket: [
subprotocols: ["wamp.2.json", "wamp.2.msgpack"]
]The transport supports both JSON and MessagePack serialization, negotiated via WebSocket subprotocol.
Custom Broker and Dealer
Custom Broker
Implement Wamp.Spec.Broker to control subscription approval and event filtering:
defmodule MyApp.Broker do
@behaviour Wamp.Broker
def publish({topic, event}, publisher, subscribers) do
# Filter subscribers or reject publication
authorized = Enum.filter(subscribers, &authorized?(&1, topic))
{:ok, authorized}
end
def subscribe(topic, opts, session) do
if can_subscribe?(session, topic) do
{:ok, []} # attributes
else
{:error, "wamp.error.not_authorized"}
end
end
def unsubscribed(_subscription) do
:ok
end
endCustom Dealer
Implement Wamp.Spec.Dealer to control registration approval and procedure selection:
defmodule MyApp.Dealer do
@behaviour Wamp.Dealer
def register({uri, opts}, session) do
if can_register?(session, uri) do
{:ok, nil} # attributes
else
{:error, "wamp.error.not_authorized"}
end
end
def select({uri, call}, procedures, caller) do
# Select which callee handles the call
# Implement round-robin, load balancing, etc.
{:ok, Enum.random(procedures)}
# Or return custom invocation details:
# {:ok, proc, %{callee: caller.id}}
end
def unregistered(_procedure) do
:ok
end
endUse your custom modules:
defmodule MyApp.Router do
use Wamp.Router,
otp_app: :my_app,
broker: MyApp.Broker,
dealer: MyApp.Dealer
endAuthentication
Override challenge/1 and check_challenge/3 in your router:
defmodule MyApp.Router do
use Wamp.Router, otp_app: :my_app
# Return the auth method and challenge data
def challenge(%{id: sid}) do
{:ticket, %{}}
end
# Verify the client's response
def check_challenge({:ticket, _challenge}, {token, _details}, session) do
case verify_token(token) do
{:ok, user} ->
{:ok, %{
authid: user.id,
authrole: user.role,
authmethod: :ticket,
authprovider: :my_app
}}
:error ->
{:error, "wamp.error.not_authorized"}
end
end
endSupported authentication methods:
| Method | Tuple | Description |
|---|---|---|
| Anonymous | {:anonymous, details} | No authentication (default) |
| Ticket | {:ticket, details} | Token-based authentication |
| WAMP-CRA | {:wampcra, details} | Challenge-Response Authentication |
| SCRAM | {:scram, details} | Salted Challenge Response |
Router Introspection
# Get all subscriptions
MyApp.Router.subscriptions()
# Get subscriptions for a topic
MyApp.Router.subscriptions("com.myapp.events")
# Get all registered procedures
MyApp.Router.procedures()
# Get procedures for a URI
MyApp.Router.procedures("com.myapp.add")
# Get active invocations
MyApp.Router.invocations()
# Revoke a subscription (server-initiated)
MyApp.Router.revoke_subscription(subscription_id, "administrative action")
# Revoke a registration (server-initiated)
MyApp.Router.revoke_registration(registration_id, "administrative action")WAMP Protocol Support
Implemented
- Session lifecycle (HELLO, WELCOME, ABORT, GOODBYE)
- Challenge-response authentication
- Publish & Subscribe with filtering
- Remote Procedure Calls with cancellation
- Publisher exclusion and identification
- Subscriber allow/deny listing
- Shared procedure registration
- Registration and subscription revocation
- Progressive call results (infrastructure)
- Publication acknowledgment
Not Yet Implemented
- Pattern-based subscriptions
- Pattern-based registrations
- Payload encryption
- Event retention
License
MIT License. See LICENSE for details.