BatchServing

BatchServing is a fork of Nx.Serving focused on generic concurrent batch work.

It lets many callers submit work independently, then transparently groups calls arriving in the same time window into one batch, executes that batch once, and returns the right slice of results to each caller.

This is useful when you have high fan-in workloads (for example from web/API requests, jobs, or pipelines) where per-call execution is expensive but batched execution is efficient.

Installation

Add batch_serving to your dependencies:

def deps do
  [
    {:batch_serving, "~> 1.0.0"}
  ]
end

Core idea

  1. Define a serving function that receives a list of values.
  2. Start a serving process with batch_size and batch_timeout.
  3. Use BatchServing.dispatch/2 for single items and BatchServing.dispatch_many/2 for explicit batches.
  4. Calls close together in time are merged and executed once.

Quick start

1) Define and start a serving

children = [
  BatchServing.create_serving_process_group_spec(),
  {BatchServing,
   serving: BatchServing.new(fn values ->
     Enum.map(values, &(&1 * &1))
   end),
   name: MyServing,
   batch_size: 10,
   batch_timeout: 100}
]

Supervisor.start_link(children, strategy: :one_for_one)

batch_size is the max combined size per execution. batch_timeout (ms) is how long to wait for more calls before dispatching.

2) Submit work

BatchServing.dispatch_many!(MyServing, [2, 3])
#=> [4, 9]
BatchServing.dispatch!(MyServing, 2)
#=> 4

BatchServing.dispatch_many!(MyServing, ["2"])
#=> !!!! exit !!!!!
BatchServing.dispatch!(MyServing, "2")
#=> !!!! exit !!!!!

BatchServing.dispatch_many(MyServing, [2, 3])
#=> {:ok, [4, 9]}
BatchServing.dispatch(MyServing, 2)
#=> {:ok, 4}

BatchServing.dispatch_many(MyServing, ["2"])
#=> {:error, _}
BatchServing.dispatch(MyServing, "2")
#=> {:error, _}

From multiple concurrent callers:

Task.async_stream(1..20, fn n ->
  BatchServing.dispatch(MyServing, n)
end, max_concurrency: 20)
|> Enum.to_list()

Each caller gets its own result, but execution is internally batched.

API overview

Advanced options

Hooks (Streaming Runtime Events)

Hooks are useful when you need live runtime signals in addition to final batch output.

When streaming is enabled:

Example:

serving =
  BatchServing.new(MyServingModule, :ok)
  |> BatchServing.streaming(hooks: [:progress])

BatchServing.dispatch_many!(MyServing, inputs)
|> Enum.each(fn
  {:progress, meta} -> IO.inspect(meta, label: "progress")
  {:batch, output} -> IO.inspect(output, label: "batch")
end)

Practical use case:

Multi-user note:

See detailed guide and examples:

Notes