Errol
An opinionated framework to run and orchestrate RabbitMQ consumers.
Index
Installation
The package can be installed by adding errol to your list of dependencies in mix.exs:
def deps do
[{:errol, "~> 0.2.0"}]
end
You should also update your application list to include :errol:
def application do
[applications: [:errol]]
endDocumentation can be found at https://hexdocs.pm/errol.
Usage
To bind consumers to queue, you can use the Errol.Wiring module:
defmodule Sample.Wiring do
use Wiring
connection "amqp://guest:guest@localhost"
@exchange "/users"
@exchange_type :topic
# You can pass a reference to a function with arity of 1
consume("account_created", "users.account.created", &UsersConsumer.account_created/1)
# or even an anonymous function
consume("account_updated", "users.account.updated", fn message -> ... end)
endFor more complex setups, you can add middleware and group different consumers for more granularity.
defmodule Sample.Wiring do
use Wiring
connection "amqp://guest:guest@localhost"
@exchange "/users"
@exchange_type :topic
# Use pipe_before/1, pipe_after/1 or pipe_error/1 to run middleware functions
# middlewares declared outside of a group will run for every consumer
pipe_after(&Sample.StatisticsMiddleware.track/2)
# Use the `group` macro to group consumers with specific middleware
group :account do
# This middlewares will run only for consumers in the :account group
pipe_before(&Errol.Middleware.Json.parse/2)
pipe_error(&Errol.Middleware.Retry.basic_retry/2)
consume("account_created", "users.account.created", &UsersConsumer.account_created/1)
consume("account_updated", "users.account.updated", fn message -> ... end)
end
group :photos do
pipe_before(&Sample.ImagesMiddleware.uncompress/2)
consume("profile_photo_uploaded", "users.profile.photo.uploaded", fn message -> ... end)
end
end
At this point, the only thing left is to run Sample.Wiring as a supervisor in your application.ex file.
This is important because if RabbitMQ goes down, all wiring supervisors will be killed, so if they are not in the supervision tree they will not be restarted.
defmodule Sample.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(Sample.Wiring, []),
...
]
opts = [strategy: :one_for_one, name: Sample.Supervisor]
Supervisor.start_link(children, opts)
end
endVoilà! This will spin up the following supervision tree:
--------------------
| Sample.Application |
--------------------
|
|
---------------
| Sample.Wiring |
---------------
|
_____________________________|_____________________________
| | |
| | |
--------------------------- --------------------------- -------------------------
| :account_created_consumer | | :account_updated_consumer | | :profile_photo_uploaded |
--------------------------- --------------------------- -------------------------
. . . . . . . . . . . . .
. . . . . . . . . . . . .
. . . . . . . . . . . . .
. . . . . . . . . . . . .
. . . . . . . . . . . . .
New monitored process per each message received
Compatibility with other AMQP implementations exists but is not guaranteed (at least for now 😁).
Running locally
Clone the repository
git clone git@github.com:uesteibar/errol.gitInstall dependencies
cd errol
mix deps.getTo run the tests (you will need docker installed)
./scripts/test_prepare.sh
mix testRoadmap
-
Allow to retry messages from
pipe_errormiddleware. This would enable users to handle retries and requeuing to dead letter exchange. -
Allow to reject messages from
pipe_beforemiddleware. - Handle RabbitMQ outages, following the great explanation in the amqp hex documentation.
- Allow to specify number of workers per consumer. Poolboy would come handy here.
- Publish messages.
Contributing
Pull requests are always welcome =)
The project uses standard-changelog to update the Changelog with each commit message and upgrade the package version.
For that reason every contribution must have a title and body that follows the conventional commits standard conventions (e.g. feat(consumer): Consume it all).
To make this process easier, you can do the following:
Install commitizen and cz-conventional-changelog globally
npm i -g commitizen cz-conventional-changelog
Save cz-conventional-changelog as default
echo '{ "path": "cz-conventional-changelog" }' > ~/.czrc
Instead of git commit, you can now run
git czand follow the instructions to generate the commit message.
Credit
🎉 Special thanks to @pma for the amazing work on the amqp hex.