kyu - Simplified Erlang AMQP client

kyu is an AMQP client application that provides a simplified abstraction layer above the official amqp_client. This of course also means that the module isn't suited to handle every possible use case (check out the features section below).

kyu is heavily inspired by the great turtle application. In some areas it provides more functionality over turtle, in others it lacks. The one feature that it definitely lacks is built in rpc support. If you don't want to implement that yourself I suggest taking a look at turtle.

Features

kyu...

Build

kyu can be built using rebar3:

rebar3 compile

Installation

Simply add kyu to your rebar3 dependecies and to the applications list (for example in \<yourapp>.app.src).

%% rebar.config
{deps, [
    kyu,
    %% or
    {kyu, "~> 2.0"},
    %% or
    {kyu, {git, "git://github.com/bajankristof/kyu.git"}}
]}
%% <yourapp>.app.src
{application, yourapp, [
    {mod, {yourapp_app, []}},
    {applications, [
        kernel,
        stdlib,
        kyu
    ]}
]}.

Usage

One way to use the application is by providing connection configurations in environment variables (for example in your sys.config or sys.config.src file). This will use kyu's supervisor to start the connections.

Another way is to start your application and then attach connections to your supervision tree using kyu_connection:child_spec/1 (check out the docs).

%% example sys.config
[
    {kyu, [
        {connections, [#{
            name => <<"rabbitmq_cluster">>, %% required
            url => "amqp://user:password@rabbitmq.cluster", %% optional
            host => "rabbitmq.cluster", %% optional - default: "localhost"
            port => 5672, %% optional - default: 5672
            username => <<"user">>, %% optional - default: <<"guest">>
            password => <<"password">>, %% optional - default: <<"guest">>
            retry_delay => 5000, %% (ms) optional - default: 1000
            retry_attempts => 99, %% optional - default: infinity
            management_host => "rabbitmq.management", %% optional - default: connection host
            management_port => 443 %% optional - default: 15672
        }]}
    ]}
].

If the retry_attempts option is 0 or below the server will try to connect infinitely.

For the full set of configuration options check out the docs.

Publishers

-include_lib("kyu/include/amqp.hrl"). %% amqp commands

kyu_publisher:child_spec(#{
    connection => <<"rabbitmq_cluster">>, %% required
    name => <<"my_publisher">>, %% required
    confirms => true, %% optional - default: true
    commands => [ %% optional
        #&#39;exchange.declare&#39;{
            exchange = <<"my_exchange">>,
            type = <<"topic">>,
            durable = false
        }
    ]
}).

After introducing the returned child spec to your supervision tree, you can start publishing messages.

Publisher = <<"my_publisher">>,
Message = #{
    routing_key => <<"my.routing.key">>,
    exchange => <<"my_exchange">>,
    payload => <<"hello world">>,
    execution => async %% check out the explanation below
},
kyu:publish(Publisher, Message).

This will try to publish the provided message.

For a full set of possible message properties, check out the docs.

Execution

This setting will tell the publisher how to act on messages:

Consumers

-include_lib("amqp.hrl").
-include_lib("kyu.hrl").

kyu_consumer:child_spec(#{
    connection => <<"rabbitmq_cluster">>, %% required
    name => <<"my_consumer">>, %% required
    queue => <<"queue_to_consume">>, %% required
    module => my_worker, %% required
    args => #{}, %% optional - default: undefined
    prefetch_count => 2, %% required - will start a channel with global prefetch count of 2, and 2 workers with local prefetch count of 1
    commands => [ %% optional
        #&#39;queue.declare&#39;{queue = <<"queue_to_consume">>},
        #&#39;kyu.queue.bind&#39;{ %% this is one of the special commands introduced by kyu
            routing_key = <<"my.routing.key">>,
            exchange = <<"my_exchange">>,
            queue = <<"queue_to_consume">>,
            exclusive = true
        }
    ]
}).

After introducing the consumer to your supervision tree, it will start to consume messages from the provided queue and make calls to the specified worker module.

-module(my_worker).

-behaviour(kyu_worker).

-exports([
    init/1,
    handle_message/2,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2
]).

%% this callback is optional and works much like gen_server:init/1
-spec init(Args :: term()) -> {ok, State :: term()} | {stop, Reason :: term()}.
init(Args) ->
    %% initialization
    {ok, Args}.

-spec handle_message(Message :: kyu:message(), State :: term()) ->
    {ack, NewState :: term()}
    | {reject, NewState :: term()}
    | {remove, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_message(Message, State) ->
    %% consumption
    {ack, State}.

%% this callback is optional and works like gen_server:handle_call/3
-spec handle_call(Request :: term(), From :: gen_server:from(), State :: term()) ->
    {reply, Reply :: term(), NewState :: term()}
    | {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_call(_Request, _From, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:handle_cast/2
-spec handle_cast(Request :: term(), State :: term()) ->
    {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_cast(_Request, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:handle_info/2
-spec handle_info(Info :: term(), State :: term()) ->
    {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_info(_Info, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:terminate/2
-spec terminate(Reason :: term(), State :: term()) -> term().
terminate(_Reason, _State) -> ok.

Commands

kyu supports commands in publisher and consumer configurations. This allows you to declare exchanges, queues and bindings before publishing or consuming.

AMQP commands

The out of the box supported AMQP commands are:

Use these in a module:

-include_lib("kyu/include/amqp.hrl").

Kyu commands

There are two custom commands at the moment:

%% extends the standard &#39;queue.bind&#39; command
#&#39;kyu.queue.bind&#39;{
    routing_key :: binary(),
    exchange :: binary(),
    queue :: binary(),
    arguments :: list(),

    exclusive :: boolean()
    %% when set to false the command is equivalent to 
    %% a standard &#39;queue.bind&#39; command

    %% when set to true it will unbind any other routing key
    %% with the same arguments (above)
}
%% provides an alternative to the standard &#39;queue.unbind&#39; command
#&#39;kyu.queue.unbind&#39;{
    except = <<>> :: binary(), %% an exception routing key
    pattern = <<>> :: binary(), %% a regex pattern
    %% if a routing key bound to the queue
    %% matches the pattern and the arguments (below)
    %% it will be unbound
    %% (except and pattern may not be used together)

    exchange :: binary(),
    queue :: binary(),
    arguments :: list()
}

Use these in a module:

-include_lib("kyu/include/kyu.hrl").

Documentation

Read the full documentation here.