Flume
This library is meant to help with managing control flow, when there are lots of steps and some may go wrong. You have probably seen or tried something complex along these lines with with statements. Most of the time, they work great.
Let's use a hypothetical example that's not too far from the real world. Let's say you are processing an order from a customer in another country. You need to retrieve some information from a database, get an fx rate, get tax rates for that country, process the order, and finally store the result. Anything could fail too, so you have to annotate operations so you know which one failed. Using with, it may look something like this:
defmodule MyApp.Orders do
def process(order) do
with
{:customer, {:ok, customer}} <- {:customer, MyApp.Customers.get(order)},
{:tax, {:ok, tax}} <- {:tax, MyApp.Tax.rate(order)},
{:fx_rate, {:ok, fx_rate}} <- {:fx_rate, MyApp.Fx.rate(order)},
{:payment, {:ok, items}} <- {:payment, MyApp.Payments.process(order, tax, fx_rate)},
{:order, {:ok, order}} <- {:order, create(order, customer, tax, fx_rate)} do
order.id
else
{:customer, {:error, error}} -> handle_error(:customer, error)
{:tax, {:error, error}} -> handle_error(:tax, error)
{:fx_rate, {:error, error}} -> handle_error(:fx_rate, error)
{:payment, {:error, error}} -> handle_error(:payment, error)
{:order, {:error, error}} -> handle_error(:order, error)
end
end
defp handle_error(step, error) do
# do something
end
def create(order, customer, tax, fx_rate) do
# do something
end
end
That can easily get out of hand and become hard to reason about. flume allows you to do something which is arguably clearer and more succinct:
defmodule MyApp.Orders do
def process(order) do
Flume.new(on_error: &handle_error/2)
|> Flume.run_async(:customer, fn -> MyApp.Customers.get(order) end)
|> Flume.run_async(:tax, fn -> MyApp.Tax.rate(order) end)
|> Flume.run_async(:fx_rate, fn -> MyApp.Fx.rate(order) end)
|> Flume.run(:payment, &handle_payment(&1, order), wait_for: [:customer, :tax, :fx_rate])
|> Flume.run(:order, &handle_order(&1, order), on_success: &Map.fetch!(&1, :id))
|> Flume.result()
end
defp handle_payment(%{fx_rate: fx_rate, tax: tax}, order) do
MyApp.Payments.process(order, tax, fx_rate)
end
defp handle_order(%{fx_rate: fx_rate, tax: tax, customer: customer}, order) do
create(order, customer, tax, fx_rate)
end
end
All that it asks is that your callbacks return predictable tuples, either {:ok, result} or {:error, reason}. It will stop at the first error and nothing later will be executed - unless run_async is used. See the docs for more info.
Installation
If available in Hex, the package can be installed
by adding flume to your list of dependencies in mix.exs:
def deps do
[
{:flume, "~> 0.1.0"}
]
endDocumentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/flume.