Verk is a job processing system backed by Redis. It uses the same job definition of Sidekiq/Resque.
The goal is to be able to isolate the execution of a queue of jobs as much as possible.
Every queue has its own supervision tree:
- A pool of workers;
-
A
QueueManagerthat interacts with Redis to get jobs and enqueue them back to be retried if necessary; -
A
WorkersManagerthat will interact with theQueueManagerand the pool to execute jobs.
Verk will hold 1 connection to Redis per queue plus 1 dedicated to the ScheduleManager.
The ScheduleManager fetches jobs from the retry set to be enqueued back to the original queue when it’s ready to be retried.
The image below is an overview of Verk’s supervision tree running with a queue named default having 5 workers.
Feature set:
- Retry mechanism
- Dynamic addition/removal of queues
- Reliable job processing (RPOPLPUSH and Lua scripts to the rescue)
- Error tracking
TODO:
- Metrics (GenEvent?)
- Scheduled jobs
- Store dead jobs (too many retries)
- JSON API (external library?)
Installation
First, add Verk to your mix.exs dependencies:
def deps do
[{:verk, "~> 0.9.1"}]
end
and run $ mix deps.get. Now, list the :verk application as your
application dependency:
def application do
[applications: [:verk]]
endVerk was tested using Redis 2.8+
Workers
A job is defined by a module and arguments:
defmodule ExampleWorker do
def perform(arg1, arg2) do
arg1 + arg2
end
end
This job can be enqueued using Verk.enqueue/1:
Verk.enqueue(%Verk.Job{queue: :default, class: "ExampleWorker", args: [1,2]})Configuration
Example configuration for verk having 2 queues: default and priority
The queue default will have a maximum of 25 jobs being processed at a time and priority just 10.
config :verk, queues: [default: 25, priority: 10],
poll_interval: 5000,
node_id: "1",
redis_url: "redis://127.0.0.1:6379"The configuration for releases is still a work in progress.
Queues
It’s possible to dynamically add and remove queues from Verk.
Verk.add_queue(:new, 10) # Adds a queue named `new` with 10 workersVerk.remove_queue(:new) # Terminate and delete the queue named `new`Reliability
Verk’s goal is to never have a job that exists only in memory. It uses Redis as the single source of truth to retry and track jobs that were being processed if some crash happened.
Verk will re-enqueue jobs if the application crashed while jobs were running. It will also retry jobs that failed keeping track of the errors that happened.
The jobs that will run on top of Verk should be idempotent as they may run more than once.
Error tracking
One can track when jobs start and finish or fail. Verk has an Event Manager that notify the following events:
Verk.Events.JobStartedVerk.Events.JobFinishedVerk.Events.JobFailed
Here is an example of a GenEvent handler to print any event:
defmodule PrintHandler do
use GenEvent
def handle_event(event, state) do
IO.puts "Event received: #{inspect event}"
{ :ok, state }
end
endOne can define an error tracking handler like this:
defmodule TrackingErrorHandler do
use GenEvent
def handle_event(%Verk.Events.JobFailed{job: job, failed_at: failed_at, stacktrace: trace}, state) do
MyTrackingExceptionSystem.track(stacktrace: trace, name: job.module)
{ :ok, state }
end
def handle_event(_, state) do
# Ignore other events
{ :ok, state }
end
endYou also need to add the handler to connect with the event manager:
GenEvent.add_mon_handler(Verk.EventManager, TrackingErrorHandler, [])
More info about GenEvent.add_mon_handler/3here.
Sponsorship
Initial development sponsored by Carnival.io