Wamp

An Elixir implementation of the WAMP (Web Application Messaging Protocol) providing both Publish & Subscribe and Remote Procedure Call messaging patterns over a single connection.

Features

Installation

Add wamp to your list of dependencies in mix.exs:

def deps do
    [
        {:wamp, "~> 0.2.0"}
    ]
end

Quick Start

1. Define a Router

The router is the central component that manages client sessions and coordinates message routing.

defmodule MyApp.Router do
    use Wamp.Router,
        otp_app: :my_app
end

2. Define a Client

Clients connect to a router and can act as callers, callees, publishers, and subscribers.

defmodule MyApp.Client do
    use Wamp.Client,
        otp_app: :my_app

    # Auto-register procedures on connect
    procedure "com.myapp.add", MyApp.Math, :add
    procedure "com.myapp.multiply", MyApp.Math, :multiply

    # Auto-subscribe to topics on connect
    channel "com.myapp.events", MyApp.EventHandler
end

3. Implement Procedure Handlers

Procedure handler functions receive (args, kwargs, details) and must return a tagged tuple:

defmodule MyApp.Math do
    def add(args, _kwargs, _details) do
        {:ok, [Enum.sum(args)]}
    end

    def multiply([a, b], _kwargs, _details) do
        {:ok, [a * b]}
    end

    def divide([a, b], _kwargs, _details) do
        {:ok, [a / b], %{"remainder" => rem(a, b)}}
    end
end

Accepted return types:

Return Description
{:ok, args} Success with a list of positional arguments
{:ok, args, kwargs} Success with positional arguments and a keyword map
{:error, uri} Error with a WAMP error URI
{:error, uri, args} Error with URI and positional arguments
{:error, uri, args, kwargs} Error with URI, positional arguments, and keyword map

4. Implement Event Subscribers

defmodule MyApp.EventHandler do
    use Wamp.Subscriber

    def handle_event({_pubid, _details, args, kwargs}, sub) do
        IO.inspect(args, label: "Event received")
        {:noreply, sub}
    end
end

5. Configure and Start

# config/config.exs
config :my_app, MyApp.Router,
    realm: "realm1"

config :my_app, MyApp.Client,
    realm: "realm1",
    router: MyApp.Router

Add to your supervision tree:

children = [
    {MyApp.Router, realm: "realm1"},
    {MyApp.Client, realm: "realm1", router: MyApp.Router}
]

Supervisor.start_link(children, strategy: :one_for_one)

Architecture

                                +------------------+
                                |   Wamp.Router    |
                                |   (GenServer)    |
                                +--------+---------+
                                         |
                 +-----------------------+-------------------+
                 |                                           |
        +--------v---------+                        +--------v---------+
        | Wamp.PubSub      |                        |  Wamp.RPC        |
        |   .Broker        |                        |   .Dealer        |
        | (GenServer)      |                        | (GenServer)      |
        +---------+--------+                        +--------+---------+
                  |                                          |
          +-------+-------+                           +------+------+
          |               |                           |             |
     Subscribers    Publishers                      Callees     Callers
     (Wamp.Client)                                 (Wamp.Client)

The library is organized around these core modules:

Module Description
Wamp.Router Central message router managing sessions and coordination
Wamp.PubSub.Broker Handles topic subscriptions and event distribution
Wamp.RPC.Dealer Handles procedure registration and call routing
Wamp.Client Client-side protocol (caller, callee, publisher, subscriber)
Wamp.Subscriber GenServer wrapper for event subscription handlers
Wamp.Transport.Phoenix Phoenix WebSocket transport adapter
Wamp.Spec WAMP message type constants

Usage

Remote Procedure Calls

# Register a procedure at runtime
MyApp.Client.register("com.myapp.echo", {MyApp.Handlers, :echo})

# Call a procedure
request_id = MyApp.Client.call("com.myapp.add", [1, 2])

# Wait for the result
{:ok, {[3], %{}}} = MyApp.Client.await(request_id)

# Or check if result is ready
case MyApp.Client.yielded(request_id) do
    true  -> MyApp.Client.yield(request_id)
    false -> # still pending
end

# Unregister
MyApp.Client.unregister("com.myapp.echo")

Publish & Subscribe

# Subscribe to a topic
MyApp.Client.subscribe("com.myapp.events")

# Publish an event (fire-and-forget)
MyApp.Client.publish("com.myapp.events", ["hello"])

# Publish with keyword arguments
MyApp.Client.publish("com.myapp.events", [], %{"message" => "hello"})

# Publish with acknowledgment
{:ok, publication_id} = MyApp.Client.ack_publish("com.myapp.events", ["hello"])

# Unsubscribe
MyApp.Client.unsubscribe("com.myapp.events")

Publishing Options

# Exclude yourself from receiving the event
MyApp.Client.publish("topic", args, %{}, %{"exclude_me" => true})

# Disclose publisher identity to subscribers
MyApp.Client.publish("topic", args, %{}, %{"disclose_me" => true})

# Target specific sessions
MyApp.Client.publish("topic", args, %{}, %{"eligible" => [session_id1, session_id2]})

# Exclude specific sessions
MyApp.Client.publish("topic", args, %{}, %{"exclude" => [session_id3]})

# Filter by auth role
MyApp.Client.publish("topic", args, %{}, %{"eligible_authrole" => ["admin"]})

Error Handling in Procedures

Return an {:error, uri, ...} tuple to signal an error to the caller:

def divide([a, b], _kwargs, _details) do
    if b == 0 do
        {:error, "com.myapp.error.division_by_zero", ["Cannot divide by zero"]}
    else
        {:ok, [a / b]}
    end
end

Alternatively, raise Wamp.Client.InvocationError for exception-based error handling:

def divide([a, b], _kwargs, _details) do
    if b == 0 do
        raise Wamp.Client.InvocationError,
            uri: "com.myapp.error.division_by_zero",
            args: ["Cannot divide by zero"],
            kwargs: %{}
    end
    {:ok, [a / b]}
end

Phoenix WebSocket Integration

1. Define a Transport Module

defmodule MyAppWeb.WampTransport do
    use Wamp.Transport.Phoenix,
        router: MyApp.Router

    def connection(socket) do
        # Perform connection-level authorization here
        {:ok, socket}
    end
end

2. Configure Your Endpoint

# lib/my_app_web/endpoint.ex
socket "/ws", MyAppWeb.WampTransport,
    websocket: [
        subprotocols: ["wamp.2.json", "wamp.2.msgpack"]
    ]

The transport supports both JSON and MessagePack serialization, negotiated via WebSocket subprotocol.

Custom Broker and Dealer

Custom Broker

Implement Wamp.Spec.Broker to control subscription approval and event filtering:

defmodule MyApp.Broker do
    @behaviour Wamp.Broker

    def publish({topic, event}, publisher, subscribers) do
        # Filter subscribers or reject publication
        authorized = Enum.filter(subscribers, &authorized?(&1, topic))
        {:ok, authorized}
    end

    def subscribe(topic, opts, session) do
        if can_subscribe?(session, topic) do
            {:ok, []}  # attributes
        else
            {:error, "wamp.error.not_authorized"}
        end
    end

    def unsubscribed(_subscription) do
        :ok
    end
end

Custom Dealer

Implement Wamp.Spec.Dealer to control registration approval and procedure selection:

defmodule MyApp.Dealer do
    @behaviour Wamp.Dealer

    def register({uri, opts}, session) do
        if can_register?(session, uri) do
            {:ok, nil}  # attributes
        else
            {:error, "wamp.error.not_authorized"}
        end
    end

    def select({uri, call}, procedures, caller) do
        # Select which callee handles the call
        # Implement round-robin, load balancing, etc.
        {:ok, Enum.random(procedures)}
        # Or return custom invocation details:
        # {:ok, proc, %{callee: caller.id}}
    end

    def unregistered(_procedure) do
        :ok
    end
end

Use your custom modules:

defmodule MyApp.Router do
    use Wamp.Router,
        otp_app: :my_app,
        broker: MyApp.Broker,
        dealer: MyApp.Dealer
end

Authentication

Override challenge/1 and check_challenge/3 in your router:

defmodule MyApp.Router do
    use Wamp.Router, otp_app: :my_app

    # Return the auth method and challenge data
    def challenge(%{id: sid}) do
        {:ticket, %{}}
    end

    # Verify the client's response
    def check_challenge({:ticket, _challenge}, {token, _details}, session) do
        case verify_token(token) do
            {:ok, user} ->
                {:ok, %{
                  authid: user.id,
                  authrole: user.role,
                  authmethod: :ticket,
                  authprovider: :my_app
                }}

            :error ->
                {:error, "wamp.error.not_authorized"}
        end
    end
end

Supported authentication methods:

Method Tuple Description
Anonymous {:anonymous, details} No authentication (default)
Ticket {:ticket, details} Token-based authentication
WAMP-CRA {:wampcra, details} Challenge-Response Authentication
SCRAM {:scram, details} Salted Challenge Response

Router Introspection

# Get all subscriptions
MyApp.Router.subscriptions()

# Get subscriptions for a topic
MyApp.Router.subscriptions("com.myapp.events")

# Get all registered procedures
MyApp.Router.procedures()

# Get procedures for a URI
MyApp.Router.procedures("com.myapp.add")

# Get active invocations
MyApp.Router.invocations()

# Revoke a subscription (server-initiated)
MyApp.Router.revoke_subscription(subscription_id, "administrative action")

# Revoke a registration (server-initiated)
MyApp.Router.revoke_registration(registration_id, "administrative action")

WAMP Protocol Support

Implemented

Not Yet Implemented

License

MIT License. See LICENSE for details.