DocsHex.pm

PhoenixGenApi

Build dynamic API gateways on top of Phoenix Channels. Register APIs at runtime from any node in the cluster — no restarts, no redeploys.

Version: 2.16.0 | Elixir: ~> 1.18 | OTP: ~> 27 | License: MPL-2.0

How It Works

┌──────────┐     WebSocket      ┌──────────────────┐     RPC      ┌──────────────┐
│  Client  │ ◄──────────────► │  Phoenix Gateway  │ ◄──────────► │ Service Node │
└──────────┘   (Phoenix Ch.)   │  (uses this lib)  │   (Erlang)   │  (your app)  │
                               └──────────────────┘              └──────────────┘
  1. Clients send requests through a Phoenix Channel.
  2. The Gateway looks up the matching FunConfig, selects a node, validates arguments & permissions, then executes the remote function.
  3. The Service Node runs the function and returns the result.

Service nodes can register new APIs at any time — the gateway picks them up automatically (pull) or receives them immediately (push).

Features

Installation

Requires Elixir ~> 1.18 and OTP ~> 27.

def deps do
  [
    {:phoenix_gen_api, "~> 2.16"}
  ]
end

Use :libcluster to form the Erlang cluster.

Quick Start

1. Define your API on the service node

defmodule MyApp.Api do
  def get_user(user_id) do
    %{id: user_id, name: "Alice"}
  end
end

2. Create a FunConfig

A FunConfig tells the gateway how to call your function:

alias PhoenixGenApi.Structs.FunConfig

%FunConfig{
  request_type: "get_user",
  service: "user_service",
  nodes: [:"app@host"],
  choose_node_mode: :random,
  timeout: 5_000,
  mfa: {MyApp.Api, :get_user, []},
  arg_types: %{"user_id" => :string},
  response_type: :sync,
  version: "1.0.0"
}

3. Register the config on the gateway

Option A — Pull mode (gateway fetches from service node on startup):

On the service node, define a supporter module:

defmodule MyApp.GenApi.Supporter do
  alias PhoenixGenApi.Structs.FunConfig

  def get_config(_arg) do
    {:ok, [
      %FunConfig{
        request_type: "get_user",
        service: "user_service",
        nodes: [Node.self()],
        choose_node_mode: :random,
        timeout: 5_000,
        mfa: {MyApp.Api, :get_user, []},
        arg_types: %{"user_id" => :string},
        response_type: :sync,
        version: "1.0.0"
      }
    ]}
  end
end

Then configure the gateway to pull from it (see Gateway Configuration below).

Option B — Push mode (service node pushes to gateway on startup):

alias PhoenixGenApi.ConfigPusher

fun_configs = [%FunConfig{ ... }]

push_config = ConfigPusher.from_service_config(
  :user_service,
  [Node.self()],
  fun_configs,
  config_version: "1.0.0"
)

ConfigPusher.push_on_startup(:"gateway@host", push_config)

Option C — Add directly on the gateway node at runtime:

PhoenixGenApi.ConfigDb.add(%FunConfig{ ... })

4. Add PhoenixGenApi to your Channel

defmodule MyAppWeb.ApiChannel do
  use Phoenix.Channel
  use PhoenixGenApi, event: "api"

  # handle_in, handle_info for :push, :async_call, :stream_response
  # are automatically injected by `use PhoenixGenApi`
end

5. Call from the client

Send a JSON message over the channel:

{
  "service": "user_service",
  "request_type": "get_user",
  "request_id": "req_1",
  "args": { "user_id": "123" }
}

Receive the response:

{
  "request_id": "req_1",
  "result": { "id": "123", "name": "Alice" },
  "success": true,
  "async": false,
  "has_more": false
}

That's it — you have a working API gateway.


Core Concepts

FunConfig

FunConfig is the central data structure. It maps a {service, request_type} pair to a function call.

Field Type Description
request_typeString.t() API endpoint name (e.g. "get_user")
serviceatom | String.t() Service group name (e.g. "user_service")
nodes[atom] | {m, f, a} | :local Target nodes or :local for same-node execution
choose_node_mode:random | :hash | {:hash, key} | :round_robin Node selection strategy
timeoutinteger | :infinity Execution timeout in ms (100–300 000 or :infinity)
mfa{module, function, args} The function to call. Args are prepended with converted request args
arg_types%{name => type} Argument type declarations for validation
arg_orders[String.t()] | :map Argument ordering (or :map to pass a map)
response_type:sync | :async | :stream | :none How the result is delivered
check_permissionfalse | :any_authenticated | {:arg, name} | {:role, roles} Permission mode
permission_callback{m, f, a} | nil Custom permission check
request_infoboolean() Pass the full %Request{} as first arg to the MFA (legacy field, currently unused in execution)
versionString.t() API version (default "0.0.0")
disabledboolean() Disable this version without removing it
retrynil | number | {:same_node, n} | {:all_nodes, n} Retry configuration
before_execute{m, f} | {m, f, a} | nil Hook called before execution
after_execute{m, f} | {m, f, a} | nil Hook called after execution

Request

The client payload is decoded into a %Request{}:

Field Type Description
user_idString.t() | nil Authenticated user ID (can be set from socket assigns)
device_idString.t() | nil Device identifier
request_typeString.t() Which API to call
request_idString.t() Client-generated ID to match responses
serviceString.t() Target service
argsmap() Request arguments
user_roles[String.t()] | nil User roles for RBAC
versionString.t() | nil Requested API version

Response

The gateway replies with a %Response{}:

Field Type Description
request_idString.t() Matches the request
resultany() Returned data
successboolean() Whether the call succeeded
errorString.t() | nil Error message on failure
asyncboolean()true if more messages will follow
has_moreboolean()true for intermediate stream chunks
can_retryboolean()true if the client may retry

Configuration

Gateway Configuration

In the gateway node's config.exs:

config :phoenix_gen_api, :gen_api,
  pull_timeout: 5_000,
  pull_interval: 30_000,
  detail_error: false,
  service_configs: [
    %{
      service: "user_service",
      nodes: [:"app@host"],
      module: MyApp.GenApi.Supporter,
      function: :get_config,
      args: []
    }
  ]
Option Type Default Description
pull_timeoutinteger5_000 Timeout in ms for each pull RPC call
pull_intervalinteger30_000 Interval in ms between automatic pulls
detail_errorbooleanfalse Include detailed error messages in responses
service_configs[map()][] List of service configurations for pull mode

Each service_config map supports:

Key Type Description
serviceString.t() | atom() Service name (used as lookup key)
nodes[atom()] | {m, f, a} Target nodes or MFA that resolves to node list
modulemodule() Remote module implementing the config function
functionatom() Function on the remote module
argslist() Extra arguments passed to the config function
version_modulemodule() | nil Module for lightweight version check RPC
version_functionatom() | nil Function returning current config version
version_argslist() Args for the version check function

Remote Node (Pull Mode)

Mark the node as a client so it doesn't start gateway-only services (RateLimiter, WorkerPool, ConfigDb, etc.):

config :phoenix_gen_api, :client_mode, true

When client_mode: true, the application starts with an empty supervision tree and no ETS tables. This is the standard setting for service/worker nodes that push configs to the gateway.

Define a supporter module that returns FunConfig lists:

defmodule MyApp.GenApi.Supporter do
  alias PhoenixGenApi.Structs.FunConfig

  def get_config(_arg) do
    {:ok, my_fun_configs()}
  end

  def my_fun_configs do
    [
      %FunConfig{
        request_type: "get_user",
        service: "user_service",
        nodes: [Node.self()],
        choose_node_mode: :random,
        timeout: 5_000,
        mfa: {MyApp.Api, :get_user, []},
        arg_types: %{"user_id" => :string},
        response_type: :sync,
        version: "1.0.0"
      }
    ]
  end
end

Active Push (Remote → Gateway)

Remote nodes can push configs immediately on startup instead of waiting for a pull:

alias PhoenixGenApi.ConfigPusher
alias PhoenixGenApi.Structs.{FunConfig, PushConfig}

fun_configs = [%FunConfig{ ... }]

push_config = %PushConfig{
  service: :user_service,
  nodes: [Node.self()],
  config_version: "1.0.0",
  fun_configs: fun_configs,
  # Optional: enable periodic pull after initial push
  module: MyApp.GenApi.Supporter,
  function: :get_config,
  args: []
}

ConfigPusher.push_on_startup(:"gateway@host", push_config)

Push is idempotent — if config_version matches, the push is skipped. Use force: true to override:

ConfigPusher.push(:"gateway@host", push_config, force: true)

Verify a service's config version before pushing:

case ConfigPusher.verify(:"gateway@host", :user_service, "1.0.0") do
  {:ok, :matched} -> :already_registered
  {:ok, :mismatch, _} -> ConfigPusher.push(:"gateway@host", push_config)
  {:error, :not_found} -> ConfigPusher.push(:"gateway@host", push_config)
end

Gateway-side API:

# Receive a push (called via RPC from remote node)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config)
{:ok, :skipped, :version_matches} = PhoenixGenApi.push_config(push_config)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config, force: true)

# Verify a service's config version
{:ok, :matched} = PhoenixGenApi.verify_config("user_service", "1.0.0")
{:ok, :mismatch, "0.9.0"} = PhoenixGenApi.verify_config("user_service", "1.0.0")
{:error, :not_found} = PhoenixGenApi.verify_config("unknown", "1.0.0")

# Check pushed services (IEx helper)
PhoenixGenApi.pushed_services_status()

Channel Integration

defmodule MyAppWeb.ApiChannel do
  use Phoenix.Channel
  use PhoenixGenApi, event: "api"
end

Options:

Option Default Description
:event"phoenix_gen_api" Channel event name for incoming requests and outgoing pushes
:override_user_idtrue Override user_id from socket.assigns.user_id (only if it's a non-empty string)

The use macro injects these handlers:

It also derives JSON.Encoder for Response using the configured JSON library.


Function Versioning

Run multiple versions of the same API side-by-side:

# Version 1.0.0
%FunConfig{
  request_type: "get_user",
  service: "user_service",
  version: "1.0.0",
  mfa: {MyApp.Users, :get_user_v1, []},
  arg_types: %{"id" => :string},
  response_type: :sync
}

# Version 2.0.0 — adds a "fields" argument
%FunConfig{
  request_type: "get_user",
  service: "user_service",
  version: "2.0.0",
  mfa: {MyApp.Users, :get_user_v2, []},
  arg_types: %{"id" => :string, "fields" => :list_string},
  response_type: :sync
}

Clients request a specific version:

{
  "service": "user_service",
  "request_type": "get_user",
  "request_id": "req_1",
  "version": "2.0.0",
  "args": { "id": "123", "fields": ["name", "email"] }
}

If no version is sent, "0.0.0" is used. Use ConfigDb.get_latest/2 to resolve the highest enabled version.

Managing Versions

alias PhoenixGenApi.ConfigDb

# Get a specific version
{:ok, config} = ConfigDb.get("user_service", "get_user", "1.0.0")

# Get the latest enabled version
{:ok, latest} = ConfigDb.get_latest("user_service", "get_user")

# Disable a version (returns {:error, :disabled} when called)
:ok = ConfigDb.disable("user_service", "get_user", "1.0.0")

# Re-enable
:ok = ConfigDb.enable("user_service", "get_user", "1.0.0")

# Delete
:ok = ConfigDb.delete("user_service", "get_user", "1.0.0")

# List all functions with their versions
ConfigDb.get_all_functions()

Behavior:


Rate Limiter

Sliding-window rate limiter backed by ETS. Uses a multi-instance architecture for high concurrency — instances are supervised under a :one_for_one supervisor and requests are routed via consistent hashing (:erlang.phash2) by default.

Configuration

config :phoenix_gen_api, :rate_limiter,
  enabled: true,
  fail_open: true,
  # Multi-instance sharding (default: number of online schedulers)
  instance_count: :auto,  # :auto or positive integer
  routing_strategy: :hash,  # :hash (consistent) or :random
  global_limits: [
    # 2 000 requests per minute per user
    %{key: :user_id, max_requests: 2000, window_ms: 60_000},
    # 10 000 requests per minute per device
    %{key: :device_id, max_requests: 10000, window_ms: 60_000}
  ],
  api_limits: [
    # Expensive operation: 10 requests per minute per user
    %{
      service: "data_service",
      request_type: "export_data",
      key: :user_id,
      max_requests: 10,
      window_ms: 60_000
    }
  ]

Runtime API

alias PhoenixGenApi.RateLimiter

# Check rate limit for a request
case RateLimiter.check_rate_limit(request) do
  :ok ->
    # Proceed
    Executor.execute!(request)

  {:error, :rate_limited, details} ->
    # Reject
    %{error: "Rate limit exceeded", retry_after: details.retry_after_ms}
end

# Check global/API-specific limits directly
RateLimiter.check_rate_limit("user_123", :global, :user_id)
RateLimiter.check_rate_limit("user_123", {"my_service", "my_api"}, :user_id)

# Dynamic configuration
RateLimiter.add_global_limit(%{key: :ip_address, max_requests: 5000, window_ms: 60_000})
RateLimiter.update_config(%{enabled: true, global_limits: [...], api_limits: [...]})

Supported Keys

:user_id, :device_id, :ip_address, or any custom string key.

The rate limiter uses a sharded ETS architecture — multiple GenServer instances distribute the load. The routing strategy is :erlang.phash2/1 by default (configurable via routing_strategy). ETS tables (:rate_limiter_global and :rate_limiter_api) are shared across all instances with read_concurrency and write_concurrency enabled.

IEx Helpers

PhoenixGenApi.rl_status("user_123")   # Rate limit status for a user
PhoenixGenApi.rl_global()             # Show global limits
PhoenixGenApi.rl_config()             # Show full rate limiter config

Permission System

Four permission modes, configured per FunConfig via check_permission:

1. Disabled (false)

No check. Use for public endpoints.

%FunConfig{request_type: "search", check_permission: false}

2. Any Authenticated (:any_authenticated)

Requires a non-nil user_id.

%FunConfig{request_type: "get_profile", check_permission: :any_authenticated}

3. Argument-Based ({:arg, arg_name})

The specified argument must match user_id — users can only access their own data.

%FunConfig{request_type: "get_user_profile", check_permission: {:arg, "user_id"}}
# ✅ user_id: "user_123", args: %{"user_id" => "user_123"}
# ❌ user_id: "user_123", args: %{"user_id" => "user_999"}

4. Role-Based ({:role, allowed_roles})

User must have at least one of the specified roles.

%FunConfig{request_type: "delete_user", check_permission: {:role, ["admin", "moderator"]}}
# ✅ user_roles: ["admin"]
# ❌ user_roles: ["user"]

Notes:


Relay Messages

Group-based message relaying: a user sends a message to a group, and all members (including the sender) receive it through their Phoenix Channel.

Group Types

Type Join Accept Mute/Unmute Who can send
:public Immediate :active N/A Any :active member
:private:pending Any :active member Any :active member
:strict_private:pending Only :admin Admin only :active (not muted)

Muted members in :strict_private groups can receive messages but cannot send them.

Architecture

Example FunConfig Setup

# Create a group (one-time setup, e.g. in an admin API)
PhoenixGenApi.Relay.create_group("room_1", :public, "admin_user", channel_pid)

# Join a group
PhoenixGenApi.Relay.join_group("room_1", "user_2", user2_channel_pid)

# Accept a pending member (private/strict_private)
PhoenixGenApi.Relay.accept_member("room_1", "admin_user", "user_2")

# Mute a member (strict_private only)
PhoenixGenApi.Relay.mute_member("room_1", "admin_user", "user_2")

# Send a relay message (configured as FunConfig, called via WebSocket)
%FunConfig{
  request_type: "relay_msg",
  service: "chat_service",
  nodes: :local,
  mfa: {PhoenixGenApi.Relay, :handle_relay, []},
  arg_types: %{"group_id" => :string, "message" => :string},
  arg_orders: ["group_id", "message"],
  check_permission: :any_authenticated,
  response_type: :sync
}

Client → Server → All Members Flow

Client A                    Gateway (Phoenix Channel)           Client B
  │  WebSocket: relay_msg                                      │
  │  {group_id, message}                                       │
  │──────────────────────────►│                                 │
  │                            │ handle_in()                     │
  │                            │  └─ Executor.execute!(request)  │
  │                            │     └─ Relay.handle_relay()     │
  │                            │        ├─ ETS: validate member  │
  │                            │        ├─ Registry: dispatch     │
  │                            │        └─ send(pid, {:relay_message, response})
  │                            │                                 │
  │                            │ handle_info({:relay_message})  │
  │                            │  └─ push(socket, result)───────►│
  │◄───────────────────────────│                                 │
  │  {status: "relayed",       │                                 │
  │   recipients_count: 2}     │                                 │

Client WebSocket Payload

{
  "service": "chat_service",
  "request_type": "relay_msg",
  "request_id": "req_123",
  "args": {
    "group_id": "room_1",
    "message": "Hello everyone!"
  }
}

Client Receives (all members)

{
  "request_id": "req_123",
  "success": true,
  "result": {
    "group_id": "room_1",
    "from_user_id": "user_1",
    "message": "Hello everyone!",
    "timestamp": "2025-01-15T10:30:00Z"
  }
}

Direct API (outside WebSocket)

You can also manage groups programmatically:

# Create
:ok = PhoenixGenApi.Relay.create_group("room_1", :public, "admin", channel_pid)

# Join
{:ok, :active} = PhoenixGenApi.Relay.join_group("room_1", "user_2", pid)

# Leave
:ok = PhoenixGenApi.Relay.leave_group("room_1", "user_2")

# Accept (private/strict_private)
:ok = PhoenixGenApi.Relay.accept_member("room_1", "admin", "user_2")

# Mute / Unmute (strict_private only)
:ok = PhoenixGenApi.Relay.mute_member("room_1", "admin", "user_2")
:ok = PhoenixGenApi.Relay.unmute_member("room_1", "admin", "user_2")

# Inspect
{:ok, info} = PhoenixGenApi.Relay.get_group_info("room_1")

Retry

Configure retry behavior when execution fails:

Value Meaning
nil No retry (default)
3 Equivalent to {:all_nodes, 3}
{:same_node, 2} Retry on the originally selected node(s)
{:all_nodes, 3} Retry across all available nodes
%FunConfig{
  request_type: "get_data",
  service: "my_service",
  nodes: [:"node1@host", :"node2@host", :"node3@host"],
  mfa: {MyApp.Api, :get_data, []},
  retry: {:all_nodes, 3}  # Retry up to 3 times across all nodes
}

For nodes: :local, both :same_node and :all_nodes retry locally.

Retry attempts emit telemetry at [:phoenix_gen_api, :executor, :retry] with metadata including mode (:same_node or :all_nodes), type (:local or :remote), and the current attempt number in measurements.


Telemetry

30 events across 5 categories:

Category Events Description
Executor 4 Request lifecycle (start/stop/exception) and retry
Rate Limiter 4 Check, exceeded, reset, cleanup
Hooks 6 Before/after hook start/stop/exception
Worker Pool 6 Task start/stop/exception/rejected, circuit breaker open/close
Config Cache 9 Pull/push, add, batch_add, delete, clear, disable, enable

Note: The [:phoenix_gen_api, :worker_pool, :task, :rejected] event is emitted when a task is rejected due to the circuit breaker being open.

Quick Start

# Attach to all events
PhoenixGenApi.Telemetry.attach_all("my-app", fn event, measurements, metadata, _config ->
  Logger.info("[Telemetry] #{inspect(event)} #{inspect(measurements)}")
end)

# Attach to executor events only
PhoenixGenApi.Telemetry.attach_executor("my-app-exec", fn event, measurements, metadata, _config ->
  case event do
    [:phoenix_gen_api, :executor, :request, :stop] ->
      Logger.info("Request #{metadata.request_id} completed in #{measurements.duration_us}µs")
    _ ->
      :ok
  end
end)

# Built-in debug logger
PhoenixGenApi.Telemetry.attach_default_logger()

# List all available events
PhoenixGenApi.Telemetry.list_events()

# Detach when done
PhoenixGenApi.Telemetry.detach_all("my-app")

You can also use the convenience functions on the main module:

PhoenixGenApi.attach_telemetry("my-app", &MyHandler.handle_event/4)
PhoenixGenApi.detach_telemetry("my-app")

These attach to both executor and rate limiter events simultaneously.

Integration with Telemetry.Metrics

defmodule MyApp.Metrics do
  def metrics do
    [
      Telemetry.Metrics.distribution(
        "phoenix_gen_api.executor.request.duration_us",
        event_name: [:phoenix_gen_api, :executor, :request, :stop],
        measurement: :duration_us,
        tags: [:service, :request_type, :success]
      ),
      Telemetry.Metrics.counter(
        "phoenix_gen_api.executor.exceptions.count",
        event_name: [:phoenix_gen_api, :executor, :request, :exception],
        tags: [:service, :request_type]
      ),
      Telemetry.Metrics.counter(
        "phoenix_gen_api.rate_limiter.exceeded.count",
        event_name: [:phoenix_gen_api, :rate_limiter, :exceeded],
        tags: [:key, :scope]
      )
    ]
  end
end

📖 For the complete event reference, integration patterns, and best practices, see the Telemetry Guide.


IEx Helpers

PhoenixGenApi.rl_status("user_123")     # Rate limit status
PhoenixGenApi.rl_global()               # Global rate limits
PhoenixGenApi.rl_global(limits)         # Set global rate limits
PhoenixGenApi.rl_config()               # Rate limiter config
PhoenixGenApi.cache_status()            # Config cache status
PhoenixGenApi.pool_status()             # Worker pool status
PhoenixGenApi.pushed_services_status()  # Pushed services status

Related Packages

AI Agent Support

Update usage rules from dependencies:

mix usage_rules.sync AGENTS.md --all --link-to-folder deps --inline usage_rules:all

Start the Tidewave MCP server:

mix tidewave

Connect at http://localhost:4114/tidewave/mcp. See Tidewave for details.