Flurry

CIHex.pmDocs

Scatter-gather batching for Elixir, built on GenStage. Individual requests are coalesced into a single bulk call and the results are correlated back to each caller.

Installation

Add :flurry to your list of dependencies in mix.exs:

def deps do
  [
    {:flurry, "~> 0.1.0"}
  ]
end

Overview

A module that uses Flurry defines a bulk (list-in, list-out) function and decorates it with @decorate batch(...). Flurry generates a single-item entry point, runs a GenStage producer/consumer pipeline, and correlates each caller's request with its result.

defmodule MyApp.UserBatcher do
  use Flurry, repo: MyApp.Repo

  @decorate batch(get(id))
  def get_many(ids) do
    Repo.all(from u in User, where: u.id in ^ids)
  end
end

The decorator generates MyApp.UserBatcher.get/1. Under concurrency, N simultaneous calls collapse into one get_many/1 invocation:

MyApp.UserBatcher.get(42)
#=> %User{id: 42, ...}

MyApp.UserBatcher.get(999)
#=> nil

Flush Policy

A batch is emitted when any of the following conditions is met:

The mailbox-empty check provides minimum latency under low load: a single request arriving at an idle producer flushes immediately as a batch of one. The max_wait timer (default 200ms) caps worst-case latency under slow trickle conditions where requests arrive one at a time, fast enough to keep the mailbox non-empty but too slowly to reach batch_size.

Starting the Batcher

use Flurry generates start_link/1 and child_spec/1 on the module. Add it to a supervision tree:

children = [
  # ...
  MyApp.UserBatcher
]

Options

batch_size

Caps the size of any single bulk call. This is necessary because databases such as PostgreSQL impose parameter limits on WHERE id IN (...) queries.

# Module-wide default
children = [{MyApp.UserBatcher, batch_size: 500}]

# Per-decorated-function override
@decorate batch(get(id), batch_size: 500)
def get_many(ids), do: ...

@decorate batch(get_with_posts(id), batch_size: 50)
def get_many_with_posts(ids), do: ...

When more requests accumulate than batch_size allows, Flurry flushes batch_size entries at a time across successive cycles.

max_wait:

Maximum time in milliseconds that the first pending request waits before the producer forces a flush. Defaults to 200. Set to nil to disable.

# Module-wide default
children = [{MyApp.UserBatcher, max_wait: 500}]

# Per-decorated-function override
@decorate batch(get(id), max_wait: 100)
def get_many(ids), do: ...

overridable:

By default, the generated singular entry point is not overridable. Defining a function with the same name and arity in the module produces a redefinition error. The overridable: option on use Flurry allows wrapping the generated function via super/1:

defmodule MyApp.UserBatcher do
  use Flurry, repo: MyApp.Repo, overridable: [get: 1]

  @decorate batch(get(id))
  def get_many(ids) do
    Repo.all(from u in User, where: u.id in ^ids)
  end

  def get(id) do
    case super(id) do
      nil -> nil
      user -> %{user | display_name: "[#{user.id}] #{user.name}"}
    end
  end
end

The option accepts a keyword list of name: arity pairs. At compile time, Flurry validates that every :overridable entry has a matching @decorate batch(...) decoration with the same arity.

additive:

Merges list-valued arguments across coalesced callers. This is useful when different callers specify different values that the batch should combine, such as Ecto preloads:

@decorate batch(get(id, preloads), additive: [:preloads])
def get_many(ids, preloads) do
  Repo.all(from u in User, where: u.id in ^ids, preload: ^preloads)
end

# Three concurrent callers with distinct preloads:
MyApp.UserBatcher.get(1, [:posts])
MyApp.UserBatcher.get(2, [:comments])
MyApp.UserBatcher.get(3, [:posts, :profile])

# All three coalesce into one bulk call:
# get_many([1, 2, 3], [:posts, :comments, :profile])

Named arguments are excluded from the producer's routing key, so callers that differ only on additive arguments share a batch. At flush time, the additive values are merged using list ++ list |> Enum.uniq/1.

Restrictions:

batch_by:

Normalizes the non-batched arguments for coalescing purposes. Accepts a 1-arity function that receives the raw tuple of non-batched arguments and returns the canonical form used for both coalescing and the bulk function's argument positions:

@decorate batch(
  get_post(slug, user),
  batch_by: fn {user} -> {user.id} end
)
def get_many(slugs, user_id) do
  Repo.all(from p in Post, where: p.slug in ^slugs and p.user_id == ^user_id)
end

With batch_by:, the bulk function signature takes the normalized values, not the raw decorator argument types. Valid values are closures and captures. batch_by: on a single-arg decoration raises at compile time.

correlate:

Specifies how the match key is extracted from each returned record. By default, Flurry uses the first decorator argument's name as the record field. Two forms are supported:

@decorate batch(get(id), correlate: :uuid)
def get_many(ids) do
  Repo.all(from r in Row, where: r.uuid in ^ids)
end

timeout:

Sets the GenServer.call/3 timeout for the generated entry point. Defaults to 5_000 (5 seconds).

@decorate batch(get(id), timeout: 30_000)
def get_many(ids) do
  Repo.all(from u in User, where: u.id in ^ids, preload: [:posts, :comments])
end

returns: :one | :list

Defaults to :one, where each caller's argument corresponds to at most one returned record. Use :list when the bulk function returns multiple records per key:

@decorate batch(get_posts_by_user(user_id), returns: :list)
def get_many_posts_by_user(user_ids) do
  Repo.all(from p in Post, where: p.user_id in ^user_ids)
end

Using :one on a function that returns duplicate keys raises Flurry.AmbiguousBatchError.

on_failure: :bisect | :fail_all

Defaults to :bisect. When the bulk function raises or exits for a batch of N entries, Flurry splits the batch in half and retries each half. The recursion continues until a singleton failure isolates the problematic entry. Every other caller in the original batch still receives their correlated record.

Use :fail_all to surface a single failure as an error to every caller in the batch without retrying.

:bisect re-invokes the bulk function with smaller subsets of the same inputs. If the bulk function has non-idempotent side effects, use on_failure: :fail_all to avoid double-writes.

Multi-Argument Batching

When the decorated function takes more than one argument, the first argument is the batched variable and the remaining arguments determine which callers share a batch. Callers whose non-batched arguments are structurally equal coalesce into the same bulk call.

@decorate batch(get_post(slug, user_id, active?))
def get_many_posts(slugs, user_id, active?) do
  Repo.all(
    from p in Post,
      where: p.slug in ^slugs and p.user_id == ^user_id and p.active == ^active?
  )
end

Each distinct combination of non-batched arguments has its own pending list, batch_size cap, and slot in the producer's LRU flush rotation.

Transactions

The bulk function runs in a background consumer process, not in the caller's process. This has consequences for database transactions:

  1. Writes performed by the bulk function do not participate in the caller's transaction. If the caller's transaction rolls back, those writes are not rolled back.
  2. Reads performed by the bulk function do not see the caller's uncommitted writes.
  3. Under Ecto.Adapters.SQL.Sandbox, the consumer has no ancestry link to the test process, so queries produce ownership errors under :manual mode.

use Flurry, repo: ...

The repo: option is mandatory. Pass the Ecto repo module, or :none for batchers with no database involvement:

use Flurry, repo: MyApp.Repo
use Flurry, repo: :none

in_transaction: :warn | :safe | :bypass

Controls behavior when the generated entry point is called inside a transaction. With a real repo the default is :warn; with repo: :none the default is :safe.

Tests with Ecto SQL Sandbox

Enable global bypass in test_helper.exs:

ExUnit.start()
Flurry.Testing.enable_bypass_globally()

With bypass enabled, every call to a decorated function runs the bulk function inline in the caller's process. The batching pipeline is not exercised in unit tests.

Limitations

License

MIT -- see LICENSE.