Strom
Composable components for stream processing.
Strom provides a set of abstractions for creating, routing and modifying streams of data.
Notation
In the "mermaid" notation, I suggest the following shapes:
- circles for a sink and a source.
- diamonds for a mixer and a splitter.
- simple rectangle for a transformer.
- rounded rectangle for a composite.
See the example below.
graph LR;
source1(("source1")) --> mixer{{"mixer"}}
source2(("source2")) --> mixer{{"mixer"}}
mixer{{"mixer"}} --> transformer["transformer"]
transformer["transformer"] --> composite(["composite"])
composite(["composite"]) --> splitter{{"splitter"}}
splitter{{"splitter"}} --> sink1(("sink1"))
splitter{{"splitter"}} --> sink2(("sink2")) Example
The problem
There are two streams of data. One have to sum pairs of numbers from each stream respectively, then produce two steams: one with the odd numbers, another with the even ones.
Solution
The flow chart for possible solution:
graph LR;
source1(("numbers1")) --> round_robin(["round-robin mixer"])
source2(("numbers1")) --> round_robin(["round-robin mixer"])
round_robin(["round-robin-mixer"]) --> sum["sum pairs"]
sum["sum pairs"] --> spitter{{"split odd-even"}}
spitter{{"split odd-even"}} --> sink_odd(("puts odd"))
spitter{{"split odd-even"}} --> sink_even(("puts even"))The "flow" data-structure
Strom components operates with "flow" - a named set of streams. It's a map with streams as values and their names as keys:
For example:
flow = %{
stream1: Stream.cycle([1, 2, 3]),
stream2: ["a", "b", "c"]
}
Flow can be empty - %{}.
A source adds a new stream to flow. A sink runs the stream of given name and removes it from flow.
A mixer mixes several streams into one. A splitter does the opposite.
A transformer modifies a stream (or streams).
Components
The origins for sources here will be just simple lists of numbers. See sources for other examples of sources. It's easy to implement your own source.
source1 = Strom.Source.new(:numbers1, [1, 2, 3, 4, 5])
source2 = Strom.Source.new(:numbers2, [10, 20, 30, 40, 50])Sinks will use simple IOPuts origin. See more examples here: sinks
origin_odd = Strom.Sink.IOPuts.new("odd: ")
sink_odd = Strom.Sink.new(:odd, origin_odd)
origin_even = Strom.Sink.IOPuts.new("even: ")
sink_even = Strom.Sink.new(:even, origin_even)Now comes a tricky part - the round-robin mixer. It's a composite component that has four components inside:
graph LR;
add_label1["add label :first"] --> mixer{{"mix"}}
add_label2["add label :second"] --> mixer{{"mix"}}
mixer{{"mix"}} --> emit_when_have_both["emit when have both"] The round-robin mixer first adds labels to each event in order to now from which stream comes a number. Then it mixes streams. The last transformer will wait until it has numbers from both streams and then emit a pair of events.
defmodule RoundRobinMixer do
alias Strom.{Mixer, Transformer}
def add_label(event, label) do
{[{event, label}], label}
end
def call({number, label}, acc) do
[another] = Enum.reject(Map.keys(acc), &(&1 == label))
case Map.fetch!(acc, another) do
[hd | tl] ->
{[hd, number], Map.put(acc, another, tl)}
[] ->
numbers = Map.fetch!(acc, label)
{[], Map.put(acc, label, numbers ++ [number])}
end
end
def components() do
[
Transformer.new(:first, &__MODULE__.add_label/2, :first),
Transformer.new(:second, &__MODULE__.add_label/2, :second),
Mixer.new([:first, :second], :numbers),
Transformer.new(:numbers, &__MODULE__.call/2, %{first: [], second: []})
]
end
end
round_robin = Strom.Composite.new(RoundRobinMixer.components())The "sum pairs" transformer is simple. It will save first number in accumulator and waits the second one to produce the sum.
function = fn number, acc ->
if acc do
{[number + acc], nil}
else
{[], number}
end
end
sum_pairs = Strom.Transformer.new(:numbers, function, nil)
The splitter will split the :numbers stream into two streams: :odd and :even
splitter = Strom.Splitter.new(:numbers, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 1)})
Ok, it's almost done. One thing that you may have noticed - the sources produces :numbers1 and :number2 streams.
However the round-robin composite operates with the :first and :second streams. One should simple rename the streams in flow.
For consistency there is the Renamer component:
renamer = Strom.Renamer.new(%{numbers1: :first, numbers2: :second})Ok. Now we are ready to combine all the components. There will be another composite.
final_composite = [
source1,
source2,
renamer,
round_robin,
sum_pairs,
splitter,
sink_odd,
sink_even
] |> Strom.Composite.new()Now, just start it and call on an empty flow:
final_composite = Strom.Composite.start(final_composite)
Strom.Composite.call(%{}, final_composite)Add see smth like that in console:
iex(18)> Strom.Composite.call(%{}, final_composite)
%{}
even: 11
odd: 11
even: 33
odd: 33
even: 55
odd: 55More info:
Read @moduledoc for components.
See examples in tests.