RMQ - RabbitMQ tools.

Actions StatusHex version badge

A set of handy tools for working with RabbitMQ in Elixir projects. Based on AMQP library.

It includes:

  1. RMQ.Connection
  2. RMQ.Consumer
  3. RMQ.RPQ

Installation

The package can be installed by adding rmq to your list of dependencies in mix.exs:

def deps do
  [
    {:rmq, "~> 0.3.0"}
  ]
end

RMQ.Connection

A GenServer which provides a robust connection to the RabbitMQ server.

Usage

defmodule MyApp.RabbitConnection do
  use RMQ.Connection,
    otp_app: :my_app,
    uri: "amqp://localhost",
    name: to_string(__MODULE__)
end

Meant to be started under the application's supervision tree as follows:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.RabbitConnection
      # ...
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Options

RMQ.Consumer

RabbitMQ Consumer.

Usage

defmodule MyApp.Consumer do
  use RMQ.Consumer,
    queue: "my-app-consumer-queue",
    exchange: {"my-exchange", :direct, durable: true}

  @impl RMQ.Consumer
  def consume(chan, payload, meta) do
    # do something with the payload
    ack(chan, meta.delivery_tag)
  end
end

# or with dynamic configuration
defmodule MyApp.Consumer2 do
  use RMQ.Consumer

  @impl RMQ.Consumer
  def config do
    [
      queue: System.fetch_env!("QUEUE_NAME"),
      reconnect_interval: fn attempt -> attempt * 1000 end,
    ]
  end

  @impl RMQ.Consumer
  def consume(chan, payload, meta) do
    # do something with the payload
    ack(chan, meta.delivery_tag)
  end
end

Options

RMQ.RPC

RPC via RabbitMQ.

Usage

# Application 1:

defmodule MyApp.RemoteResource do
  use RMQ.RPC, publishing_options: [app_id: "MyApp"]

  def find_by_id(id) do
    call("remote-resource-finder", %{id: id})
  end
end

#  Application 2:

defmodule MyOtherApp.Consumer do
  use RMQ.Consumer, queue: "remote-resource-finder"

  @impl RMQ.Consumer
  def consume(chan, payload, meta) do
    response =
      payload
      |> Jason.decode!()
      |> Map.fetch!("id")
      |> MyOtherApp.Resource.get()
      |> Jason.encode!()

    reply(chan, meta, response)
    ack(chan, meta)
  end
end

Options