DocsHex.pm

PhoenixGenApi

A framework for rapidly building backend APIs on top of Phoenix Channels and Erlang clustering. Define your business logic on any node — the framework handles routing, validation, permissions, retries, and observability. No HTTP endpoints to write, no routes to configure, no restarts to deploy.

Version: 2.18.0 | Elixir: ~> 1.18 | OTP: ~> 27 | License: MPL-2.0

Why PhoenixGenApi?

Traditional Phoenix backends require you to: define routes, write controllers, serialize responses, handle errors, and restart to deploy changes. PhoenixGenApi eliminates all of that:

How It Works

┌──────────┐ WebSocket ┌──────────────────┐ RPC ┌──────────────┐
│ Client │ ◄──────────────► │ Phoenix Gateway │ ◄──────────► │ Service Node │
└──────────┘ (Phoenix Ch.) │ (uses this lib) │ (Erlang) │ (your app) │
└──────────────────┘ └──────────────┘
  1. Clients send requests through a Phoenix Channel.
  2. The Gateway looks up the matching FunConfig, selects a node, validates arguments & permissions, then executes the remote function.
  3. The Service Node runs the function and returns the result.

Service nodes can register new APIs at any time — the gateway picks them up automatically (pull) or receives them immediately (push).

Features

Installation

Requires Elixir ~> 1.18 and OTP ~> 27.

def deps do
[
{:phoenix_gen_api, "~> 2.16"}
]
end

Use :libcluster to form the Erlang cluster.

Quick Start

1. Define your API on the service node

defmodule MyApp.Api do
def get_user(user_id) do
%{id: user_id, name: "Alice"}
end
end

2. Create a FunConfig

A FunConfig tells the gateway how to call your function:

alias PhoenixGenApi.Structs.FunConfig
%FunConfig{
request_type: "get_user",
service: "user_service",
nodes: [:"app@host"],
choose_node_mode: :random,
timeout: 5_000,
mfa: {MyApp.Api, :get_user, []},
arg_types: %{"user_id" => :string},
response_type: :sync,
version: "1.0.0"
}

3. Register the config on the gateway

Option A — Pull mode (gateway fetches from service node on startup):

On the service node, define a supporter module:

defmodule MyApp.GenApi.Supporter do
alias PhoenixGenApi.Structs.FunConfig
def get_config(_arg) do
{:ok, [
%FunConfig{
request_type: "get_user",
service: "user_service",
nodes: [Node.self()],
choose_node_mode: :random,
timeout: 5_000,
mfa: {MyApp.Api, :get_user, []},
arg_types: %{"user_id" => :string},
response_type: :sync,
version: "1.0.0"
}
]}
end
end

Then configure the gateway to pull from it (see Gateway Configuration below).

Option B — Push mode (service node pushes to gateway on startup):

alias PhoenixGenApi.ConfigPusher
fun_configs = [%FunConfig{ ... }]
push_config = ConfigPusher.from_service_config(
:user_service,
[Node.self()],
fun_configs,
config_version: "1.0.0"
)
ConfigPusher.push_on_startup(:"gateway@host", push_config)

Option C — Add directly on the gateway node at runtime:

PhoenixGenApi.ConfigDb.add(%FunConfig{ ... })

4. Add PhoenixGenApi to your Channel

defmodule MyAppWeb.ApiChannel do
use Phoenix.Channel
use PhoenixGenApi, event: "api"
# handle_in, handle_info for :push, :async_call, :stream_response
# are automatically injected by `use PhoenixGenApi`
end

5. Call from the client

Send a JSON message over the channel:

{
"service": "user_service",
"request_type": "get_user",
"request_id": "req_1",
"args": { "user_id": "123" }
}

Receive the response:

{
"request_id": "req_1",
"result": { "id": "123", "name": "Alice" },
"success": true,
"async": false,
"has_more": false
}

That's it — you have a working API gateway.


Core Concepts

FunConfig

FunConfig is the central data structure. It maps a {service, request_type} pair to a function call.

FieldTypeDescription
request_typeString.t()API endpoint name (e.g. "get_user")
serviceatom | String.t()Service group name (e.g. "user_service")
nodes[atom] | {m, f, a} | :localTarget nodes or :local for same-node execution
choose_node_mode:random | :hash | {:hash, key} | :round_robinNode selection strategy
timeoutinteger | :infinityExecution timeout in ms (100–300 000 or :infinity)
mfa{module, function, args}The function to call. Args are prepended with converted request args
arg_types%{name => type}Argument type declarations for validation
arg_orders[String.t()] | :mapArgument ordering (or :map to pass a map)
response_type:sync | :async | :stream | :noneHow the result is delivered
check_permissionfalse | :any_authenticated | {:arg, name} | {:role, roles}Permission mode
permission_callback{m, f, a} | nilCustom permission check
request_infoboolean()Pass the full %Request{} as first arg to the MFA (legacy field, currently unused in execution)
versionString.t() | nilAPI version (default nil). "0.0.0" is reserved as a sentinel and cannot be explicitly registered
disabledboolean()Disable this version without removing it
retrynil | number | {:same_node, n} | {:all_nodes, n}Retry configuration
before_execute{m, f} | {m, f, a} | nilHook called before execution
after_execute{m, f} | {m, f, a} | nilHook called after execution
hook_timeoutpositive_integer()Per-hook timeout in ms (default 5_000). Prevents misbehaving hooks from blocking requests

Request

The client payload is decoded into a %Request{}:

FieldTypeDescription
user_idString.t() | nilAuthenticated user ID (can be set from socket assigns)
device_idString.t() | nilDevice identifier
request_typeString.t()Which API to call
request_idString.t()Client-generated ID to match responses
serviceString.t()Target service
argsmap()Request arguments
user_roles[String.t()] | nilUser roles for RBAC
versionString.t() | nilRequested API version (nil means no version specified)

Request.decode!/1 validates the payload before deserialization:

The use PhoenixGenApi channel macro distinguishes these client errors from internal errors in its rescue block, returning "Invalid request: ..." for decode errors and "Request processing failed" for unexpected exceptions.

Response

The gateway replies with a %Response{}:

FieldTypeDescription
request_idString.t()Matches the request
resultany()Returned data
successboolean()Whether the call succeeded
errorString.t() | nilError message on failure
asyncboolean()true if more messages will follow
has_moreboolean()true for intermediate stream chunks
can_retryboolean()true if the client may retry

Configuration

Gateway Configuration

In the gateway node's config.exs:

config :phoenix_gen_api, :gen_api,
pull_timeout: 5_000,
pull_interval: 30_000,
detail_error: false,
service_configs: [
%{
service: "user_service",
nodes: [:"app@host"],
module: MyApp.GenApi.Supporter,
function: :get_config,
args: []
}
]
OptionTypeDefaultDescription
pull_timeoutinteger5_000Timeout in ms for each pull RPC call
pull_intervalinteger30_000Interval in ms between automatic pulls
detail_errorbooleanfalseInclude detailed error messages in responses
service_configs[map()][]List of service configurations for pull mode
circuit_breaker_thresholdinteger10Pool-level consecutive failures before circuit opens (worker default: 5)
circuit_breaker_cooldowninteger60_000Cooldown in ms before circuit closes

Each service_config map supports:

KeyTypeDescription
serviceString.t() | atom()Service name (used as lookup key)
nodes[atom()] | {m, f, a}Target nodes or MFA that resolves to node list
modulemodule()Remote module implementing the config function
functionatom()Function on the remote module
argslist()Extra arguments passed to the config function
version_modulemodule() | nilModule for lightweight version check RPC
version_functionatom() | nilFunction returning current config version
version_argslist()Args for the version check function

Remote Node (Pull Mode)

Mark the node as a client so it doesn't start gateway-only services (RateLimiter, WorkerPool, ConfigDb, etc.):

config :phoenix_gen_api, :client_mode, true

When client_mode: true, the application starts with an empty supervision tree and no ETS tables. This is the standard setting for service/worker nodes that push configs to the gateway.

Pull startup behavior: On application start, the ConfigPuller schedules an initial pull after 1 second. If the initial pull fails (service nodes unreachable), it logs a warning and retries with exponential backoff (up to 300s). The gateway starts even if service nodes are down — APIs register on first successful pull.

Define a supporter module that returns FunConfig lists:

defmodule MyApp.GenApi.Supporter do
alias PhoenixGenApi.Structs.FunConfig
def get_config(_arg) do
{:ok, my_fun_configs()}
end
def my_fun_configs do
[
%FunConfig{
request_type: "get_user",
service: "user_service",
nodes: [Node.self()],
choose_node_mode: :random,
timeout: 5_000,
mfa: {MyApp.Api, :get_user, []},
arg_types: %{"user_id" => :string},
response_type: :sync,
version: "1.0.0"
}
]
end
end

Active Push (Remote → Gateway)

Remote nodes can push configs immediately on startup instead of waiting for a pull:

alias PhoenixGenApi.ConfigPusher
alias PhoenixGenApi.Structs.{FunConfig, PushConfig}
fun_configs = [%FunConfig{ ... }]
push_config = %PushConfig{
service: :user_service,
nodes: [Node.self()],
config_version: "1.0.0",
fun_configs: fun_configs,
# Optional: enable periodic pull after initial push
module: MyApp.GenApi.Supporter,
function: :get_config,
args: []
}
ConfigPusher.push_on_startup(:"gateway@host", push_config)

Push is idempotent — if config_version matches, the push is skipped. Use force: true to override:

ConfigPusher.push(:"gateway@host", push_config, force: true)

Verify a service's config version before pushing:

case ConfigPusher.verify(:"gateway@host", :user_service, "1.0.0") do
{:ok, :matched} -> :already_registered
{:ok, :mismatch, _} -> ConfigPusher.push(:"gateway@host", push_config)
{:error, :not_found} -> ConfigPusher.push(:"gateway@host", push_config)
end

Gateway-side API:

# Receive a push (called via RPC from remote node)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config)
{:ok, :skipped, :version_matches} = PhoenixGenApi.push_config(push_config)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config, force: true)
# Verify a service's config version
{:ok, :matched} = PhoenixGenApi.verify_config("user_service", "1.0.0")
{:ok, :mismatch, "0.9.0"} = PhoenixGenApi.verify_config("user_service", "1.0.0")
{:error, :not_found} = PhoenixGenApi.verify_config("unknown", "1.0.0")
# Check pushed services (IEx helper)
PhoenixGenApi.pushed_services_status()

Channel Integration

defmodule MyAppWeb.ApiChannel do
use Phoenix.Channel
use PhoenixGenApi, event: "api"
end

Options:

OptionDefaultDescription
:event"phoenix_gen_api"Channel event name for incoming requests and outgoing pushes
:override_user_idtrueOverride user_id from socket.assigns.user_id (only if it's a non-empty string)

The use macro injects these handlers:

It also derives JSON.Encoder for Response using the configured JSON library.


Function Versioning

Run multiple versions of the same API side-by-side:

# Version 1.0.0
%FunConfig{
request_type: "get_user",
service: "user_service",
version: "1.0.0",
mfa: {MyApp.Users, :get_user_v1, []},
arg_types: %{"id" => :string},
response_type: :sync
}
# Version 2.0.0 — adds a "fields" argument
%FunConfig{
request_type: "get_user",
service: "user_service",
version: "2.0.0",
mfa: {MyApp.Users, :get_user_v2, []},
arg_types: %{"id" => :string, "fields" => :list_string},
response_type: :sync
}

Clients request a specific version:

{
"service": "user_service",
"request_type": "get_user",
"request_id": "req_1",
"version": "2.0.0",
"args": { "id": "123", "fields": ["name", "email"] }
}

If no version is sent, nil is used. The value "0.0.0" is reserved as a sentinel meaning "no version specified" and cannot be explicitly registered as a version. Use ConfigDb.get_latest/2 to resolve the highest enabled version.

Managing Versions

alias PhoenixGenApi.ConfigDb
# Get a specific version
{:ok, config} = ConfigDb.get("user_service", "get_user", "1.0.0")
# Get the latest enabled version
{:ok, latest} = ConfigDb.get_latest("user_service", "get_user")
# Disable a version (returns {:error, :disabled} when called)
:ok = ConfigDb.disable("user_service", "get_user", "1.0.0")
# Re-enable
:ok = ConfigDb.enable("user_service", "get_user", "1.0.0")
# Delete
:ok = ConfigDb.delete("user_service", "get_user", "1.0.0")
# List all functions with their versions
ConfigDb.get_all_functions()

Behavior:


Rate Limiter

Sliding-window rate limiter backed by ETS. Uses a multi-instance architecture for high concurrency — instances are supervised under a :one_for_one supervisor and requests are routed via consistent hashing (:erlang.phash2) by default.

Configuration

config :phoenix_gen_api, :rate_limiter,
enabled: true,
fail_open: true,
# Multi-instance sharding (default: number of online schedulers)
instance_count: :auto, # :auto or positive integer
routing_strategy: :hash, # :hash (consistent) or :random
global_limits: [
# 2 000 requests per minute per user
%{key: :user_id, max_requests: 2000, window_ms: 60_000},
# 10 000 requests per minute per device
%{key: :device_id, max_requests: 10000, window_ms: 60_000}
],
api_limits: [
# Expensive operation: 10 requests per minute per user
%{
service: "data_service",
request_type: "export_data",
key: :user_id,
max_requests: 10,
window_ms: 60_000
}
]

Runtime API

alias PhoenixGenApi.RateLimiter
# Check rate limit for a request
case RateLimiter.check_rate_limit(request) do
:ok ->
# Proceed
Executor.execute!(request)
{:error, :rate_limited, details} ->
# Reject
%{error: "Rate limit exceeded", retry_after: details.retry_after_ms}
end
# Check global/API-specific limits directly
RateLimiter.check_rate_limit("user_123", :global, :user_id)
RateLimiter.check_rate_limit("user_123", {"my_service", "my_api"}, :user_id)
# Dynamic configuration
RateLimiter.add_global_limit(%{key: :ip_address, max_requests: 5000, window_ms: 60_000})
RateLimiter.update_config(%{enabled: true, global_limits: [...], api_limits: [...]})

Supported Keys

:user_id, :device_id, :ip_address, or any custom string key.

The rate limiter uses a sharded ETS architecture — multiple GenServer instances distribute the load. The routing strategy is :erlang.phash2/1 by default (configurable via routing_strategy). ETS tables (:rate_limiter_global and :rate_limiter_api) are shared across all instances with read_concurrency and write_concurrency enabled.

IEx Helpers

PhoenixGenApi.rl_status("user_123") # Rate limit status for a user
PhoenixGenApi.rl_global() # Show global limits
PhoenixGenApi.rl_config() # Show full rate limiter config

Permission System

Four permission modes, configured per FunConfig via check_permission:

1. Disabled (false)

No check. Use for public endpoints.

%FunConfig{request_type: "search", check_permission: false}

2. Any Authenticated (:any_authenticated)

Requires a non-nil user_id.

%FunConfig{request_type: "get_profile", check_permission: :any_authenticated}

3. Argument-Based ({:arg, arg_name})

The specified argument must match user_id — users can only access their own data.

%FunConfig{request_type: "get_user_profile", check_permission: {:arg, "user_id"}}
# ✅ user_id: "user_123", args: %{"user_id" => "user_123"}
# ❌ user_id: "user_123", args: %{"user_id" => "user_999"}

4. Role-Based ({:role, allowed_roles})

User must have at least one of the specified roles.

%FunConfig{request_type: "delete_user", check_permission: {:role, ["admin", "moderator"]}}
# ✅ user_roles: ["admin"]
# ❌ user_roles: ["user"]

Notes:


Relay Messages

Group-based message relaying: a user sends a message to a group, and all members (including the sender) receive it through their Phoenix Channel.

Group Types

TypeJoinAcceptMute/UnmuteWho can send
:publicImmediate :activeN/AAny :active member
:private:pendingAny :active memberAny :active member
:strict_private:pendingOnly :adminAdmin only:active (not muted)

Muted members in :strict_private groups can receive messages but cannot send them.

Architecture

Example FunConfig Setup

# Create a group (one-time setup, e.g. in an admin API)
PhoenixGenApi.Relay.create_group("room_1", :public, "admin_user", channel_pid)
# Join a group
PhoenixGenApi.Relay.join_group("room_1", "user_2", user2_channel_pid)
# Accept a pending member (private/strict_private)
PhoenixGenApi.Relay.accept_member("room_1", "admin_user", "user_2")
# Mute a member (strict_private only)
PhoenixGenApi.Relay.mute_member("room_1", "admin_user", "user_2")
# Send a relay message (configured as FunConfig, called via WebSocket)
%FunConfig{
request_type: "relay_msg",
service: "chat_service",
nodes: :local,
mfa: {PhoenixGenApi.Relay, :handle_relay, []},
arg_types: %{"group_id" => :string, "message" => :string},
arg_orders: ["group_id", "message"],
check_permission: :any_authenticated,
response_type: :sync
}

Client → Server → All Members Flow

Client A Gateway (Phoenix Channel) Client B
WebSocket: relay_msg
{group_id, message}
│──────────────────────────►│ │
│ │ handle_in()
│ │ └─ Executor.execute!(request)
│ │ └─ Relay.handle_relay()
│ │ ├─ ETS: validate member
│ │ ├─ Registry: dispatch
│ │ └─ send(pid, {:relay_message, response})
│ │ │
│ │ handle_info({:relay_message})
│ │ └─ push(socket, result)───────►│
│◄───────────────────────────│ │
{status: "relayed", │ │
recipients_count: 2} │ │

Client WebSocket Payload

{
"service": "chat_service",
"request_type": "relay_msg",
"request_id": "req_123",
"args": {
"group_id": "room_1",
"message": "Hello everyone!"
}
}

Client Receives (all members)

{
"request_id": "req_123",
"success": true,
"result": {
"group_id": "room_1",
"from_user_id": "user_1",
"message": "Hello everyone!",
"timestamp": "2025-01-15T10:30:00Z"
}
}

Direct API (outside WebSocket)

You can also manage groups programmatically:

# Create
:ok = PhoenixGenApi.Relay.create_group("room_1", :public, "admin", channel_pid)
# Join
{:ok, :active} = PhoenixGenApi.Relay.join_group("room_1", "user_2", pid)
# Leave
:ok = PhoenixGenApi.Relay.leave_group("room_1", "user_2")
# Accept (private/strict_private)
:ok = PhoenixGenApi.Relay.accept_member("room_1", "admin", "user_2")
# Mute / Unmute (strict_private only)
:ok = PhoenixGenApi.Relay.mute_member("room_1", "admin", "user_2")
:ok = PhoenixGenApi.Relay.unmute_member("room_1", "admin", "user_2")
# Inspect
{:ok, info} = PhoenixGenApi.Relay.get_group_info("room_1")

Retry

Configure retry behavior when execution fails:

ValueMeaning
nilNo retry (default)
3Equivalent to {:all_nodes, 3}
{:same_node, 2}Retry on the originally selected node(s)
{:all_nodes, 3}Retry across all available nodes
%FunConfig{
request_type: "get_data",
service: "my_service",
nodes: [:"node1@host", :"node2@host", :"node3@host"],
mfa: {MyApp.Api, :get_data, []},
retry: {:all_nodes, 3} # Retry up to 3 times across all nodes
}

For nodes: :local, both :same_node and :all_nodes retry locally.

Retry attempts emit telemetry at [:phoenix_gen_api, :executor, :retry] with metadata including mode (:same_node or :all_nodes), type (:local or :remote), and the current attempt number in measurements.

When all retry attempts are exhausted, a [:phoenix_gen_api, :executor, :retry, :exhausted] event is emitted with the original retry configuration and the final error. This event is emitted for both local and remote retries.


Telemetry

31 events across 5 categories:

CategoryEventsDescription
Executor4Request lifecycle (start/stop/exception) and retry
Rate Limiter4Check, exceeded, reset, cleanup
Hooks6Before/after hook start/stop/exception
Worker Pool6Task start/stop/exception/rejected, circuit breaker open/close
Config Cache9Pull/push, add, batch_add, delete, clear, disable, enable

Note: The [:phoenix_gen_api, :worker_pool, :task, :rejected] event is emitted when a task is rejected due to the circuit breaker being open.

Quick Start

# Attach to all events
PhoenixGenApi.Telemetry.attach_all("my-app", fn event, measurements, metadata, _config ->
Logger.info("[Telemetry] #{inspect(event)} #{inspect(measurements)}")
end)
# Attach to executor events only
PhoenixGenApi.Telemetry.attach_executor("my-app-exec", fn event, measurements, metadata, _config ->
case event do
[:phoenix_gen_api, :executor, :request, :stop] ->
Logger.info("Request #{metadata.request_id} completed in #{measurements.duration_us}µs")
_ ->
:ok
end
end)
# Built-in debug logger
PhoenixGenApi.Telemetry.attach_default_logger()
# List all available events
PhoenixGenApi.Telemetry.list_events()
# Detach when done
PhoenixGenApi.Telemetry.detach_all("my-app")

You can also use the convenience functions on the main module:

PhoenixGenApi.attach_telemetry("my-app", &MyHandler.handle_event/4)
PhoenixGenApi.detach_telemetry("my-app")

These attach to both executor and rate limiter events simultaneously.

Integration with Telemetry.Metrics

defmodule MyApp.Metrics do
def metrics do
[
Telemetry.Metrics.distribution(
"phoenix_gen_api.executor.request.duration_us",
event_name: [:phoenix_gen_api, :executor, :request, :stop],
measurement: :duration_us,
tags: [:service, :request_type, :success]
),
Telemetry.Metrics.counter(
"phoenix_gen_api.executor.exceptions.count",
event_name: [:phoenix_gen_api, :executor, :request, :exception],
tags: [:service, :request_type]
),
Telemetry.Metrics.counter(
"phoenix_gen_api.rate_limiter.exceeded.count",
event_name: [:phoenix_gen_api, :rate_limiter, :exceeded],
tags: [:key, :scope]
)
]
end
end

📖 For the complete event reference, integration patterns, and best practices, see the Telemetry Guide.


IEx Helpers

PhoenixGenApi.rl_status("user_123") # Rate limit status
PhoenixGenApi.rl_global() # Global rate limits
PhoenixGenApi.rl_global(limits) # Set global rate limits
PhoenixGenApi.rl_config() # Rate limiter config
PhoenixGenApi.cache_status() # Config cache status
PhoenixGenApi.pool_status() # Worker pool status
PhoenixGenApi.pushed_services_status() # Pushed services status

All packages in the PhoenixGenApi ecosystem:

PackageDescription
PhoenixGenApiCore library — dynamic API gateway over Phoenix Channels
EasyRpcWrap RPC calls from remote nodes so they can be used like local functions. Simplifies exposing remote functions as local APIs
ClusterHelperDynamic cluster node discovery. Map nodes to roles or IDs and select nodes by role/ID
ToonExTOON encoder/decoder for Elixir — a compact binary protocol for Phoenix Channels. TOON-to-JSON converter for efficient WebSocket transport
AshPhoenixGenApiAsh extension that auto-generates FunConfig definitions from Ash resources

AI Agent Support

Update usage rules from dependencies:

mix usage_rules.sync AGENTS.md --all --link-to-folder deps --inline usage_rules:all

Start the Tidewave MCP server:

mix tidewave

Connect at http://localhost:4114/tidewave/mcp. See Tidewave for details.