RBMQ

Simple and easy creation of producers and consumers for RabbitMQ. Written over <a href=”https://github.com/pma/amqp” target=”_blank”>AMQP</a>

Installation

The package can be installed as:

  1. Add rbmq to your list of dependencies in mix.exs:
  def deps do
    [{:rbmq, "~> 0.2.2"}]
  end
  1. Ensure rbmq is started before your application:
  def application do
    [applications: [:rbmq]]
  end

Configuration

You can define connection configuration in your config.exs:

  config :my_app, MyAMQPConnection,
    host: {:system, "AMQP_HOST", "localhost"},
    port: {:system, "AMQP_PORT", 5672},
    username: {:system, "AMQP_USER", "guest"},
    password: {:system, "AMQP_PASSWORD", "guest"},
    virtual_host: {:system, "AMQP_VHOST", "/"},
    connection_timeout: {:system, "AMQP_TIMEOUT", 15_000},

RBMQ support linking to runtime environment conflagration via {:system, "ENV_VAR_NAME", "default_value"} and {:system, "ENV_VAR_NAME"} tuples. But are free to set raw values whenever you need.

By default RBMQ read environment configuration to establish AMQP connection:

Other connections settings can be found in AMQP client docs.

Usage

  1. Define your connection
  defmodule MyAMQPConnection do
    use RBMQ.Connection,
      otp_app: :my_app
      # Optionally you can define queue params right here,
      # but it&#39;s better to do so in producer and consumer separately
  end
  1. Define your Producer and/or Consumer
  defmodule MyProducer do
    use RBMQ.Producer,
      connection: MyAMQPConnection,

      # Queue params
      queue: [
        name: "prodcer_queue",
        error_name: "prodcer_queue_errors",
        routing_key: "prodcer_queue",
        durable: false
      ],
      exchange: [
        name: "prodcer_queue_exchange",
        type: :direct,
        durable: false
      ]
  end

  defmodule MyConsumer do
    use RBMQ.Consumer,
      connection: MyAMQPConnection,

      # Queue params
      queue: [
        name: "consomer_queue",
        durable: false
      ],
      qos: [
        prefetch_count: 10
      ]

    def consume(_payload, [tag: tag, redelivered?: _redelivered]) do
      ack(tag)
    end
  end

Pay attention to consume/2 method. Write your consuming logic there. We recommend to send async messages to GenServer that will consume them, so queue read wouldn’t be blocked by a single thread.

If your queue required acknowledgements, use ack\1 and nack\1 methods.

  1. Add everything to your application supervisor:
  defmodule MyApp do
    use Application

    # See http://elixir-lang.org/docs/stable/elixir/Application.html
    # for more information on OTP Applications
    def start(_type, _args) do
      import Supervisor.Spec, warn: false

      # Define workers and child supervisors to be supervised
      children = [
        # Start the AMQP connection
        supervisor(MyAMQPConnection, []),
        # Start producer and consumer
        worker(MyProducer, []),
        worker(MyConsumer, []),
      ]

      opts = [strategy: :one_for_one, name: AssetProcessor.API.Supervisor]
      Supervisor.start_link(children, opts)
    end
  end