AMQPChannelPool
A lightweight Elixir library for managing named AMQP channel pools with NimblePool.
Documentation
HexDocs includes the API reference and focused guides:
- Getting Started
- Configuration
- Checkout and Failure Semantics
- Recovery and Worker Lifecycle
- Telemetry
- Integration Testing
Installation
Add amqp_channel_pool to your list of dependencies in mix.exs:
def deps do
[
{:amqp_channel_pool, "~> 0.2.1"}
]
endThen fetch dependencies:
mix deps.getConfiguration
Pool startup requires:
:name:connection
Optional startup options:
:pool_sizewith a default of10:channel_setupto configure a freshly opened channel before it enters service:idto override the default child specification id
The :connection value is passed to AMQP.Connection.open/1.
The :opts key is not supported.
Usage
Starting named pools
children = [
{AMQPChannelPool,
name: MyApp.PrimaryPool,
connection: [
host: "localhost",
port: 5672,
username: "guest",
password: "guest"
],
pool_size: 5,
channel_setup: &declare_topology/1},
{AMQPChannelPool,
name: MyApp.SecondaryPool,
connection: [
host: "localhost",
port: 5672,
username: "guest",
password: "guest"
]}
]
Supervisor.start_link(children, strategy: :one_for_one)Checking out a channel
{:ok, :ok} =
AMQPChannelPool.checkout(MyApp.PrimaryPool, fn channel ->
AMQP.Queue.declare(channel, "service.health", durable: true)
end, timeout: 5_000)
AMQPChannelPool.checkout!(MyApp.SecondaryPool, fn channel ->
AMQP.Queue.declare(channel, "service.audit", durable: true)
end, timeout: 5_000)Stopping a named pool
:ok = AMQPChannelPool.stop(MyApp.PrimaryPool)Channel setup callback
defp declare_topology(channel) do
with {:ok, _} <- AMQP.Exchange.declare(channel, "service.events", :topic, durable: true),
{:ok, _} <- AMQP.Queue.declare(channel, "service.events.primary", durable: true) do
:ok
else
{:error, reason} -> {:error, reason}
end
endchannel_setup can enable confirm mode and topology, but confirm mode alone is not
sufficient to guarantee end-to-end reliable delivery.
For complete setup constraints, see Configuration.
Stale worker recovery
Workers are monitored for connection and channel process exits. If a worker is stale at checkout
time, the pool performs one immediate recovery attempt by reopening the connection and channel,
reapplying :channel_setup, and reinstalling monitors. If recovery fails, checkout returns a
pool-layer error and the failed worker is discarded for replacement.
For lifecycle details, see Recovery and Worker Lifecycle.
Borrower failure semantics
checkout/3 and checkout!/3 preserve normal Elixir callback failure behavior:
raisere-raises to the callerexitexits the callerthrowthrows to the caller
These abnormal callback outcomes are not converted into pool-layer {:error, reason} tuples.
The borrowed worker is discarded and replaced before reuse to avoid channel contamination.
Borrowers are responsible for channel hygiene when callbacks succeed. If callback code may leave channel state uncertain, fail the callback so the worker is discarded.
For full failure behavior, see Checkout and Failure Semantics.
Telemetry
Event namespace:
[:amqp_channel_pool, ...]
Event families:
[:amqp_channel_pool, :checkout, :start | :stop | :exception][:amqp_channel_pool, :worker, :init, :start | :stop | :exception][:amqp_channel_pool, :worker, :recover, :start | :stop | :exception][:amqp_channel_pool, :worker, :discard][:amqp_channel_pool, :worker, :terminate]
Measurements:
-
start events include
system_time -
stop and exception events include
duration
Stable metadata keys:
poolworker_pidworker_staterecovery_kindreasontimeoutresult
Failure semantics in telemetry:
-
checkout callback failures (
raise/throw/exit) emit checkout:exceptionwithresult: :callback_exception -
pool-layer checkout failures that return
{:error, reason}emit checkout:stopwithresult: :pool_error -
worker recovery failures emit worker recover
:exception
For event shapes and usage examples, see Telemetry.
Pool Boundary
The pool library is intentionally generic infrastructure. It owns:
- channel pool lifecycle and checkout behavior
- stale detection, one-shot recovery, and worker replacement
- pool-layer telemetry and failure reporting
Publisher applications own:
- message routing policy
- serialization and headers
- confirm waiting/ack handling
- retry and idempotency strategy
The pool does not provide publish convenience APIs and does not guarantee delivery semantics on behalf of the application.
Running Tests
mix test
Unit tests run by default. Integration tests are tagged with :integration and
are excluded unless RUN_INTEGRATION=true is set.
Running Integration Tests With RabbitMQ
docker network create amqp-channel-pool-test-net
docker run -d --rm --name amqp-test-rabbit --network amqp-channel-pool-test-net rabbitmq:3.13-management
docker run --rm --network amqp-channel-pool-test-net \
-e RUN_INTEGRATION=true \
-e AMQP_TEST_HOST=amqp-test-rabbit \
-e AMQP_TEST_PORT=5672 \
-v "$(pwd)":/app -w /app elixir:1.16 \
sh -lc 'mix local.hex --force && mix local.rebar --force && mix deps.get && mix test --only integration'
docker stop amqp-test-rabbit
docker network rm amqp-channel-pool-test-netRunning Dialyzer
mix dialyzer --plt
mix dialyzerContributing
Contributions are welcome. See CONTRIBUTING.
License
This project is licensed under the Apache License, Version 2.0. See LICENSE.