Event Queues

This library provides helpers in Elixir to create GenStage broadcast based event queues and handlers.

This library is built to work on Elixir 1.4 or later.

Installation

If available in Hex, the package can be installed by adding event_queues to your list of dependencies in mix.exs:

def deps do
[{:event_queues, "~> 1.0"}]
end

Event Message

The event queues and handlers created by this library utilize a common struct to pass events around.

EventQueues.Event

EventQueues.Event.new category: :car,
name: :sold,
data: %{
id: 45,
purchaser: %{
name: "Bob Nobody"
},
purchased_on: Date.utc_today
}
%EventQueues.Event{category: :car, created: ~N[2017-01-14 22:50:30.989000],
data: %{id: 45, purchased_on: ~D[2017-01-14],
purchaser: %{name: "Bob Nobody"}},
id: "b06fec0a-3cda-41f1-a089-7a6c30fbe7a4", name: :sold, source: #PID<0.151.0>}

Queues

An event queue is a GenStage BroadcastDispatcher producer module that can be subscribed to either by using the provided handler generated from this library or by using GenStage directly and subscribing a consumer to the module. Multiple queues can be created and are encouraged for applications that may have large groups of handlers that listen to different groups of events. Since events will only be dispatched once all handlers (GenStage Consumers) are listening, this will help minimize the wait time be segregating handlers from one another that are part of different business flows.

defmodule VehicleInventoryQueue do
use EventQueues, type: :queue
end
defmodule BoatInventoryQueue do
use EventQueues, type: :queue
end

Announce Events

Queue Functions:

Both function take either a EventQueues.Event struct or a keyword list of the values for the event. See EventQueues.Event.new/1.

VehicleInventoryQueue.announce category: :car, name: :sold, data: %{}
VehicleInventoryQueue.announce_sync category: :car, name: :sold, data: %{}

Handlers

An event handler is a GenStage consumer that spawns an asynchronous handler process for each event broadcast from a queue.

defmodule RegistrationVehicleHandler do
use EventQueues, type: :handler, subscribe: VehicleInventoryQueue
def handle(%EventQueues.Event{category: :car, name: :sold, data: data}) do
# Custom logic here that would register the vehicle electronically with a government agenecy.
end
def handle(_event), do: nil
end
defmodule LoanVehicleHandler do
use EventQueues, type: :handler, subscribe: VehicleInventoryQueue
def handle(%EventQueues.Event{category: :car, name: :sold, data: data}) do
# Custom logic to finalize a loan process when a car is sold.
end
def handle(_event), do: nil
end
defmodule RegistrationBoatHandler do
use EventQueues, type: :handler, subscribe: BoatInventoryQueue
def handle(%EventQueues.Event{category: :jet_ski, name: :sold, data: data}) do
# Custom logic here that would register the vehicle electronically.
end
def handle(_event), do: nil
end

Starting

Each module created is a ordinary GenServer that must be started with your application.

VehicleInventoryQueue.start_link()
BoatInventoryQueue.start_link()
RegistrationVehicleHandler.start_link()
LoanVehicleHandler.start_link()
RegistrationBoatHandler.start_link()

Or the modules can be added to a supervisor:

defmodule MyApp do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
worker(VehicleInventoryQueue, [])
worker(BoatInventoryQueue, [])
worker(RegistrationVehicleHandler, [])
worker(LoanVehicleHandler, [])
worker(RegistrationBoatHandler, [])
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end