codecovHex.pmLicenseDocumentationBuild StatusElixir Version

AMQPChannelPool

A lightweight Elixir library for managing named AMQP channel pools with NimblePool.

Documentation

HexDocs includes the API reference and focused guides:

Installation

Add amqp_channel_pool to your list of dependencies in mix.exs:

def deps do
  [
    {:amqp_channel_pool, "~> 0.2.1"}
  ]
end

Then fetch dependencies:

mix deps.get

Configuration

Pool startup requires:

Optional startup options:

The :connection value is passed to AMQP.Connection.open/1. The :opts key is not supported.

Usage

Starting named pools

children = [
  {AMQPChannelPool,
   name: MyApp.PrimaryPool,
   connection: [
     host: "localhost",
     port: 5672,
     username: "guest",
     password: "guest"
   ],
   pool_size: 5,
   channel_setup: &declare_topology/1},
  {AMQPChannelPool,
   name: MyApp.SecondaryPool,
   connection: [
     host: "localhost",
     port: 5672,
     username: "guest",
     password: "guest"
   ]}
]

Supervisor.start_link(children, strategy: :one_for_one)

Checking out a channel

{:ok, :ok} =
  AMQPChannelPool.checkout(MyApp.PrimaryPool, fn channel ->
    AMQP.Queue.declare(channel, "service.health", durable: true)
  end, timeout: 5_000)

AMQPChannelPool.checkout!(MyApp.SecondaryPool, fn channel ->
  AMQP.Queue.declare(channel, "service.audit", durable: true)
end, timeout: 5_000)

Stopping a named pool

:ok = AMQPChannelPool.stop(MyApp.PrimaryPool)

Channel setup callback

defp declare_topology(channel) do
  with {:ok, _} <- AMQP.Exchange.declare(channel, "service.events", :topic, durable: true),
       {:ok, _} <- AMQP.Queue.declare(channel, "service.events.primary", durable: true) do
    :ok
  else
    {:error, reason} -> {:error, reason}
  end
end

channel_setup can enable confirm mode and topology, but confirm mode alone is not sufficient to guarantee end-to-end reliable delivery.

For complete setup constraints, see Configuration.

Stale worker recovery

Workers are monitored for connection and channel process exits. If a worker is stale at checkout time, the pool performs one immediate recovery attempt by reopening the connection and channel, reapplying :channel_setup, and reinstalling monitors. If recovery fails, checkout returns a pool-layer error and the failed worker is discarded for replacement.

For lifecycle details, see Recovery and Worker Lifecycle.

Borrower failure semantics

checkout/3 and checkout!/3 preserve normal Elixir callback failure behavior:

These abnormal callback outcomes are not converted into pool-layer {:error, reason} tuples. The borrowed worker is discarded and replaced before reuse to avoid channel contamination.

Borrowers are responsible for channel hygiene when callbacks succeed. If callback code may leave channel state uncertain, fail the callback so the worker is discarded.

For full failure behavior, see Checkout and Failure Semantics.

Telemetry

Event namespace:

Event families:

Measurements:

Stable metadata keys:

Failure semantics in telemetry:

For event shapes and usage examples, see Telemetry.

Pool Boundary

The pool library is intentionally generic infrastructure. It owns:

Publisher applications own:

The pool does not provide publish convenience APIs and does not guarantee delivery semantics on behalf of the application.

Running Tests

mix test

Unit tests run by default. Integration tests are tagged with :integration and are excluded unless RUN_INTEGRATION=true is set.

Running Integration Tests With RabbitMQ

docker network create amqp-channel-pool-test-net
docker run -d --rm --name amqp-test-rabbit --network amqp-channel-pool-test-net rabbitmq:3.13-management

docker run --rm --network amqp-channel-pool-test-net \
  -e RUN_INTEGRATION=true \
  -e AMQP_TEST_HOST=amqp-test-rabbit \
  -e AMQP_TEST_PORT=5672 \
  -v "$(pwd)":/app -w /app elixir:1.16 \
  sh -lc 'mix local.hex --force && mix local.rebar --force && mix deps.get && mix test --only integration'

docker stop amqp-test-rabbit
docker network rm amqp-channel-pool-test-net

Running Dialyzer

mix dialyzer --plt
mix dialyzer

Contributing

Contributions are welcome. See CONTRIBUTING.

License

This project is licensed under the Apache License, Version 2.0. See LICENSE.