AMQP

Build StatusModule VersionHex DocsTotal DownloadLast UpdatedLicense

A simple Elixir wrapper for the Erlang RabbitMQ 3/4 client (AMQP 0.9.1).

The API is based on Langohr, a Clojure client for RabbitMQ.

Upgrade guides

To upgrade from an older version, please read the relevant upgrade guide:

Usage

Add AMQP as a dependency in your mix.exs file.

def deps do
[
{:amqp, "~> 4.1"}
]
end

Elixir will start amqp automatically if you use Elixir 1.6+.

If that is not the case (use Application.started_applications/0 to check), try adding :amqp to applications or extra_applications in your mix.exs, or call Application.ensure_started(:amqp) at the start.

After that, run mix deps.get in your shell to fetch and compile AMQP. Then start an interactive Elixir shell with iex -S mix.

iex> {:ok, conn} = AMQP.Connection.open()
# {:ok, %AMQP.Connection{pid: #PID<0.165.0>}}
iex> {:ok, chan} = AMQP.Channel.open(conn)
# {:ok, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.165.0>}, pid: #PID<0.177.0>}
iex> AMQP.Queue.declare(chan, "test_queue")
# {:ok, %{consumer_count: 0, message_count: 0, queue: "test_queue"}}
iex> AMQP.Exchange.declare(chan, "test_exchange")
# :ok
iex> AMQP.Queue.bind(chan, "test_queue", "test_exchange")
# :ok
iex> AMQP.Basic.publish(chan, "test_exchange", "", "Hello, World!")
# :ok
iex> {:ok, payload, meta} = AMQP.Basic.get(chan, "test_queue")
iex> payload
# "Hello, World!"
iex> AMQP.Queue.subscribe(chan, "test_queue", fn payload, _meta -> IO.puts("Received: #{payload}") end)
# {:ok, "amq.ctag-5L8U-n0HU5doEsNTQpaXWg"}
iex> AMQP.Basic.publish(chan, "test_exchange", "", "Hello, World!")
# :ok
# Received: Hello, World!

Set up a consumer GenServer

defmodule Consumer do
use GenServer
use AMQP
def start_link do
GenServer.start_link(__MODULE__, [], [])
end
@exchange "gen_server_test_exchange"
@queue "gen_server_test_queue"
@queue_error "#{@queue}_error"
def init(_opts) do
{:ok, conn} = Connection.open("amqp://guest:guest@localhost")
{:ok, chan} = Channel.open(conn)
setup_queue(chan)
# Limit unacknowledged messages to 10.
:ok = Basic.qos(chan, prefetch_count: 10)
# Register the GenServer process as a consumer.
{:ok, _consumer_tag} = Basic.consume(chan, @queue)
{:ok, chan}
end
# Confirmation sent by the broker after registering this process as a consumer.
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end
# Sent by the broker when the consumer is unexpectedly cancelled, such as
# after a queue deletion.
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do
{:stop, :normal, chan}
end
# Confirmation sent by the broker to the consumer process after a Basic.cancel.
def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
# You might want to run payload consumption in separate Tasks in production
consume(chan, tag, redelivered, payload)
{:noreply, chan}
end
defp setup_queue(chan) do
{:ok, _} = Queue.declare(chan, @queue_error, durable: true)
# Messages that cannot be delivered to any consumer in the main queue are
# routed to the error queue.
{:ok, _} = Queue.declare(chan, @queue,
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, @queue_error}
]
)
:ok = Exchange.fanout(chan, @exchange, durable: true)
:ok = Queue.bind(chan, @queue, @exchange)
end
defp consume(channel, tag, redelivered, payload) do
number = String.to_integer(payload)
if number <= 10 do
:ok = Basic.ack channel, tag
IO.puts "Consumed a #{number}."
else
:ok = Basic.reject channel, tag, requeue: false
IO.puts "#{number} is too big and was rejected."
end
rescue
# Requeue unless it is a redelivered message.
# This retries a message once in case of an exception before giving up and
# moving it to the error queue.
#
# You might also want to catch :exit signals in production code.
# Make sure you call ack, nack, or reject; otherwise, the consumer will stop
# receiving messages.
exception ->
:ok = Basic.reject channel, tag, requeue: not redelivered
IO.puts "Error converting #{payload} to integer"
end
end
iex> Consumer.start_link
{:ok, #PID<0.261.0>}
iex> {:ok, conn} = AMQP.Connection.open
{:ok, %AMQP.Connection{pid: #PID<0.165.0>}}
iex> {:ok, chan} = AMQP.Channel.open(conn)
{:ok, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.165.0>}, pid: #PID<0.177.0>}
iex> AMQP.Basic.publish chan, "gen_server_test_exchange", "", "5"
:ok
Consumed a 5.
iex> AMQP.Basic.publish chan, "gen_server_test_exchange", "", "42"
:ok
42 is too big and was rejected.
iex> AMQP.Basic.publish chan, "gen_server_test_exchange", "", "Hello, World!"
:ok
Error converting Hello, World! to integer
Error converting Hello, World! to integer

Configuration

Connections and channels

You can define a connection and channel in your config, and AMQP will automatically:

config :amqp,
connections: [
myconn: [url: "amqp://guest:guest@myhost:12345"],
],
channels: [
mychan: [connection: :myconn]
]

You can access the connection or channel via AMQP.Application.

iex> {:ok, chan} = AMQP.Application.get_channel(:mychan)
iex> :ok = AMQP.Basic.publish(chan, "", "", "Hello")

When a channel goes down and reconnects, you have to ensure your consumer subscribes to the channel again.

See the documentation for AMQP.Application.get_connection/1 and AMQP.Application.get_channel/1 for more details.

Types of arguments and headers

The arguments parameter in Queue.declare, Exchange.declare, and Basic.consume, and the headers parameter in Basic.publish, are lists of tuples in the form {name, type, value}, where name is a binary containing the argument or header name, type is an atom describing the AMQP field type, and value is a term compatible with the AMQP field type.

The valid AMQP field types are:

:longstr | :signedint | :decimal | :timestamp | :table | :byte | :double | :float | :long | :short | :bool | :binary | :void | :array

Valid argument names in Queue.declare include:

Valid argument names in Basic.consume include:

Valid argument names in Exchange.declare include:

Troubleshooting / FAQ

Is amqp 4.x compatible with RabbitMQ 3.x?

Yes, it is.

This library uses the official Erlang RabbitMQ client under the hood. As long as the client works with an older RabbitMQ version, this library should support that version too.

Here is the comment from the RabbitMQ team.

Why does RabbitMQ 4 reject Queue.declare(chan) with transient_nonexcl_queues?

RabbitMQ 4 can deny the deprecated transient_nonexcl_queues feature by default. This feature covers queues that are both non-durable and non-exclusive.

Queue.declare(chan) uses AMQP's historical defaults:

Queue.declare(chan, "", durable: false, exclusive: false, auto_delete: false)

That means the queue is transient (durable: false) and non-exclusive (exclusive: false), so RabbitMQ 4 may close the connection with an error like:

Feature `transient_nonexcl_queues` is deprecated.

To avoid the deprecated feature, declare the queue as either durable:

Queue.declare(chan, "jobs", durable: true)

or exclusive when you need a private temporary queue:

Queue.declare(chan, "", exclusive: true)

If you need to keep using non-durable, non-exclusive queues while migrating to a newer RabbitMQ version, permit the deprecated feature in the broker configuration:

deprecated_features.permit.transient_nonexcl_queues = true

Does the library support AMQP 1.0?

No, it does not. This library supports only AMQP 0.9.1, and we have no plans to support 1.0 at this time.

RabbitMQ 4 now officially supports AMQP 1.0 along with 0.9.1. You might get some benefits from using this protocol.

Since the AMQP 1.0 protocol design is significantly different from 0.9.1, we think it is better to start from scratch instead of building on top of this library.

Consumer stops receiving messages

It usually happens when your code does not send an acknowledgement (ack, nack, or reject) after receiving a message.

If you use a GenServer for your consumer, try storing the number of messages the server is currently processing in the GenServer state.

If the number equals prefetch_count, those messages were left without acknowledgements, which is why the consumer has stopped receiving more messages.

Also, review the following points:

Also, make sure that the consumer monitors the channel pid. When the channel is gone, you have to reopen it and subscribe to the new channel again.

Version compatibility

Check out this article to learn about compatibility with Elixir, OTP, and RabbitMQ.

Heartbeats

If the connection is dropped automatically, consider enabling heartbeats.

You can set the heartbeat option when you open a connection.

For more details, read this article

Copyright (c) 2014 Paulo Almeida

This library is MIT licensed. See the LICENSE for details.