Flurry
Scatter-gather batching for Elixir, built on GenStage. Individual requests are coalesced into a single bulk call and the results are correlated back to each caller.
Installation
Add :flurry to your list of dependencies in mix.exs:
def deps do
[
{:flurry, "~> 0.1.0"}
]
endOverview
A module that uses Flurry defines a bulk (list-in, list-out) function
and decorates it with @decorate batch(...). Flurry generates a
single-item entry point, runs a GenStage producer/consumer pipeline, and
correlates each caller's request with its result.
defmodule MyApp.UserBatcher do
use Flurry, repo: MyApp.Repo
@decorate batch(get(id))
def get_many(ids) do
Repo.all(from u in User, where: u.id in ^ids)
end
end
The decorator generates MyApp.UserBatcher.get/1. Under concurrency,
N simultaneous calls collapse into one get_many/1 invocation:
MyApp.UserBatcher.get(42)
#=> %User{id: 42, ...}
MyApp.UserBatcher.get(999)
#=> nilFlush Policy
A batch is emitted when any of the following conditions is met:
batch_sizepending requests have accumulated, or- the producer's mailbox is empty, meaning no further requests are immediately queued, or
max_waitmilliseconds have elapsed since the first pending request was enqueued.
The mailbox-empty check provides minimum latency under low load: a single
request arriving at an idle producer flushes immediately as a batch of one.
The max_wait timer (default 200ms) caps worst-case latency under slow
trickle conditions where requests arrive one at a time, fast enough to keep
the mailbox non-empty but too slowly to reach batch_size.
Starting the Batcher
use Flurry generates start_link/1 and child_spec/1 on the module.
Add it to a supervision tree:
children = [
# ...
MyApp.UserBatcher
]Options
batch_size
Caps the size of any single bulk call. This is necessary because databases
such as PostgreSQL impose parameter limits on WHERE id IN (...) queries.
# Module-wide default
children = [{MyApp.UserBatcher, batch_size: 500}]
# Per-decorated-function override
@decorate batch(get(id), batch_size: 500)
def get_many(ids), do: ...
@decorate batch(get_with_posts(id), batch_size: 50)
def get_many_with_posts(ids), do: ...
When more requests accumulate than batch_size allows, Flurry flushes
batch_size entries at a time across successive cycles.
max_wait:
Maximum time in milliseconds that the first pending request waits before
the producer forces a flush. Defaults to 200. Set to nil to disable.
# Module-wide default
children = [{MyApp.UserBatcher, max_wait: 500}]
# Per-decorated-function override
@decorate batch(get(id), max_wait: 100)
def get_many(ids), do: ...overridable:
By default, the generated singular entry point is not overridable. Defining
a function with the same name and arity in the module produces a
redefinition error. The overridable: option on use Flurry allows
wrapping the generated function via super/1:
defmodule MyApp.UserBatcher do
use Flurry, repo: MyApp.Repo, overridable: [get: 1]
@decorate batch(get(id))
def get_many(ids) do
Repo.all(from u in User, where: u.id in ^ids)
end
def get(id) do
case super(id) do
nil -> nil
user -> %{user | display_name: "[#{user.id}] #{user.name}"}
end
end
end
The option accepts a keyword list of name: arity pairs. At compile time,
Flurry validates that every :overridable entry has a matching
@decorate batch(...) decoration with the same arity.
additive:
Merges list-valued arguments across coalesced callers. This is useful when different callers specify different values that the batch should combine, such as Ecto preloads:
@decorate batch(get(id, preloads), additive: [:preloads])
def get_many(ids, preloads) do
Repo.all(from u in User, where: u.id in ^ids, preload: ^preloads)
end
# Three concurrent callers with distinct preloads:
MyApp.UserBatcher.get(1, [:posts])
MyApp.UserBatcher.get(2, [:comments])
MyApp.UserBatcher.get(3, [:posts, :profile])
# All three coalesce into one bulk call:
# get_many([1, 2, 3], [:posts, :comments, :profile])
Named arguments are excluded from the producer's routing key, so callers
that differ only on additive arguments share a batch. At flush time, the
additive values are merged using list ++ list |> Enum.uniq/1.
Restrictions:
- Values at additive positions must be lists.
additive:andbatch_by:cannot be combined on the same decoration.-
Every name in
additive:must appear in the decorator's group arguments.
batch_by:
Normalizes the non-batched arguments for coalescing purposes. Accepts a 1-arity function that receives the raw tuple of non-batched arguments and returns the canonical form used for both coalescing and the bulk function's argument positions:
@decorate batch(
get_post(slug, user),
batch_by: fn {user} -> {user.id} end
)
def get_many(slugs, user_id) do
Repo.all(from p in Post, where: p.slug in ^slugs and p.user_id == ^user_id)
end
With batch_by:, the bulk function signature takes the normalized values,
not the raw decorator argument types. Valid values are closures and
captures. batch_by: on a single-arg decoration raises at compile time.
correlate:
Specifies how the match key is extracted from each returned record. By default, Flurry uses the first decorator argument's name as the record field. Two forms are supported:
- Atom -- names a top-level field on each returned record.
- Function -- a 1-arity function that extracts the key from each record.
@decorate batch(get(id), correlate: :uuid)
def get_many(ids) do
Repo.all(from r in Row, where: r.uuid in ^ids)
endtimeout:
Sets the GenServer.call/3 timeout for the generated entry point.
Defaults to 5_000 (5 seconds).
@decorate batch(get(id), timeout: 30_000)
def get_many(ids) do
Repo.all(from u in User, where: u.id in ^ids, preload: [:posts, :comments])
endreturns: :one | :list
Defaults to :one, where each caller's argument corresponds to at most one
returned record. Use :list when the bulk function returns multiple records
per key:
@decorate batch(get_posts_by_user(user_id), returns: :list)
def get_many_posts_by_user(user_ids) do
Repo.all(from p in Post, where: p.user_id in ^user_ids)
end
Using :one on a function that returns duplicate keys raises
Flurry.AmbiguousBatchError.
on_failure: :bisect | :fail_all
Defaults to :bisect. When the bulk function raises or exits for a batch
of N entries, Flurry splits the batch in half and retries each half. The
recursion continues until a singleton failure isolates the problematic
entry. Every other caller in the original batch still receives their
correlated record.
Use :fail_all to surface a single failure as an error to every caller in
the batch without retrying.
:bisectre-invokes the bulk function with smaller subsets of the same inputs. If the bulk function has non-idempotent side effects, useon_failure: :fail_allto avoid double-writes.
Multi-Argument Batching
When the decorated function takes more than one argument, the first argument is the batched variable and the remaining arguments determine which callers share a batch. Callers whose non-batched arguments are structurally equal coalesce into the same bulk call.
@decorate batch(get_post(slug, user_id, active?))
def get_many_posts(slugs, user_id, active?) do
Repo.all(
from p in Post,
where: p.slug in ^slugs and p.user_id == ^user_id and p.active == ^active?
)
end
Each distinct combination of non-batched arguments has its own pending
list, batch_size cap, and slot in the producer's LRU flush rotation.
Transactions
The bulk function runs in a background consumer process, not in the caller's process. This has consequences for database transactions:
- Writes performed by the bulk function do not participate in the caller's transaction. If the caller's transaction rolls back, those writes are not rolled back.
- Reads performed by the bulk function do not see the caller's uncommitted writes.
- Under
Ecto.Adapters.SQL.Sandbox, the consumer has no ancestry link to the test process, so queries produce ownership errors under:manualmode.
use Flurry, repo: ...
The repo: option is mandatory. Pass the Ecto repo module, or :none for
batchers with no database involvement:
use Flurry, repo: MyApp.Repo
use Flurry, repo: :nonein_transaction: :warn | :safe | :bypass
Controls behavior when the generated entry point is called inside a
transaction. With a real repo the default is :warn; with repo: :none
the default is :safe.
:warn-- Logs a warning whenRepo.checked_out?/0returns true.:safe-- Suppresses the warning. Use for reads that do not require read-your-writes consistency.:bypass-- Runs the bulk function inline in the caller's process when inside a transaction, so writes participate in the transaction's commit/rollback semantics. Outside a transaction, batches normally.
Tests with Ecto SQL Sandbox
Enable global bypass in test_helper.exs:
ExUnit.start()
Flurry.Testing.enable_bypass_globally()With bypass enabled, every call to a decorated function runs the bulk function inline in the caller's process. The batching pipeline is not exercised in unit tests.
Limitations
-
Calling the bulk function directly (e.g.,
get_many/1) runs in the caller's process and does not coalesce with concurrent singular callers. -
The default additive merge function (
list ++ list |> Enum.uniq/1) does not support nested preload trees.
License
MIT -- see LICENSE.