Strom
Flow-based Programming Framework
Strom provides a set of abstractions for creating, routing and modifying streams of data.
Data
The data abstractions are:
Event
Any piece of data - number, string, list, map, struct, etc.
Stream
A sequence (can be infinite) of events made available over time.
See Elixir Stream.
Flow
Flow - is a named set of streams.
For example:
flow = %{stream1: Stream.cycle([1, 2, 3]), stream2: ["a", "b", "c"]}
Flow can be empty - %{}.
Operators (functions)
There are several operators (functions) that can be applied to flows. Each operator accept flow as input and return a modified flow.
Source (source)
Adds a stream of "external data" to a flow.
%{} -> source(Src, :foo) -> %{foo: sfoo}
%{bar: Sbar} -> source(Src, :foo) -> %{foo: sfoo, bar: sbar}
Sink (sink)
Writes a stream data back to somewhere.
%{foo: sfoo} -> sink(Snk, :foo) -> %{}
%{foo: sfoo, bar: sbar} -> sink(Snk, :foo) -> %{bar: sbar}
Mixer (mix)
Mixes several streams.
%{foo: sfoo, bar: sbar} -> mix([:foo, :bar], :mixed) -> %{mixed: smixed}
Splitter (split)
Split a stream into several streams.
%{foo: sfoo} -> split(:foo, [:bar, :baz]) -> %{bar: sbar, baz: sbaz}
Transformer (transform)
Applies a function to each event of a stream or streams.
%{foo: sfoo, bar: sbar} -> transform(:foo, F) -> %{foo: F(sfoo)}
%{foo: sfoo, bar: sbar} -> transform([:foo, :bar], F) -> %{foo: F(sfoo), bar: F(sbar}
A function gets an event as input and must return a modified event. So, it's the map operation. Think about &Stream.map/2, which is used under the hood.
Symbolic representation
Implementation details and interface
Under the hood, each operation is performed inside "components". Component is a separate process - GenServer.
A component can be:
- started -
start/0,start/1 - stopped -
stop/1 - and called -
call/3andcall/4
Example
Let's say one wants to stream a file:
alias Strom.Source
alias Strom.Source.ReadLines
source = Source.start(%ReadLines{path: "input.txt"})
# returns a struct %Source{pid: pid, origin: %ReadLines{}}
%{lines: stream} = Source.call(%{}, source, :lines)
# adds the :lines stream to the empty flow (%{})
Enum.to_list(stream)
# runs the stream and returns a list of strings
Source.stop(source)
# stops the source process
Here the Strom.Source.ReadLines module is used to read line from file.
To specify a custom source, one can implement a module with the Strom.Source behaviour.
Strom provides a couple of simple sources, see sources.
The same for sinks.
Then, for example, one wants to split the stream into two streams, one with short lines, another - with long ones:
alias Strom.Splitter
splitter = Splitter.start()
# starts the splitter process
parts = %{
long: fn event -> String.length(event) > 100 end,
short: fn event -> String.length(event) <= 100 end
}
%{long: long, short: short} =
Splitter.call(%{lines: stream}, splitter, :lines, parts)
# Splits the :lines stream into the :long and :short streams based on rules defined in parts
And then, one wants to save the streams into two files:
alias Strom.Sink
sink_short = Sink.start(%WriteLines{path: "short.txt"})
sink_long = Sink.start(%WriteLines{path: "long.txt"})
%{} =
%{long: long, short: short}
|> Sink.call(sink_short, :short)
|> Sink.call(sink_long, :long, true)
# the first sink will run the stream aynchronously (using the Elixir Task)
# the second sink (see `true` as the last argument) runs the stream synchronously
Transformer
With the Function component everything is straightforward. Let's calculate the length of each string and produce a stream of numbers:
alias Strom.Transformer
transformer = Transformer.start()
function = &String.length(&1)
%{short: short} =
%{short: short}
|> Transformer.call(transformer, :short, function)
# now the stream is the stream of numbers
The function can be applied to several steams simultaneously:
%{short: short, long: long} =
%{short: short, long: long}
|> Transformer.call(transformer, [:short, :long], function)
Transformer can operate 2-arity functions with accumulator.
The function must return 2-elements tuple.
{list(event), acc}
The first element is a list of events that will be returned from the component. The second is a new accumulator.
alias Strom.Transformer
function = fn event, acc ->
{[event * acc, :new_event], acc + 1}
end
transformer = Transformer.start()
%{events: stream} = Transformer.call(%{events: [1, 2, 3]}, transformer, :events, {function, 0})
Enum.to_list(stream)
# returns
[0, :new_event, 2, :new_event, 6, :new_event]
Let's consider the "Telegram problem".
The program accepts a stream of strings and should produce another stream of string with the length less then a specified value.
The solution requires two components - decomposer and recomposer. The first will split strings into words. The second will "recompose" words into new strings
The decomposer module is quite simple
defmodule Decompose do
def call(event, nil) do
{String.split(event, ","), nil}
end
end
The recomposer will store incoming words and when a line is ready, it will produce an event.
defmodule Recompose do
@length 100
def call(event, words) do
line = Enum.join(words, " ")
new_line = line <> " " <> event
if String.length(new_line) > @length do
{[new_line], [event]}
else
{[], words ++ [event]}
end
end
end
It's also possible to parameterize the Transformer component by passing opts to its start/1 function:
alias Strom.Transformer
function = fn event, acc, opts ->
{[event * acc], acc + opts[:inc]}
end
transformer = Transformer.start(opts: %{inc: 1})
%{events: stream} = Transformer.call(%{events: [1, 2, 3]}, transformer, :events, {function, 0})
Enum.to_list(stream)
# returns
[0, 2, 6]
Strom.DSL
Since the operations have a similar interface and behaviour, it's possible to define the topology of calculation in a simple declarative way.
Each component has a corresponding macro, see the Strom.DSL module and its tests.
The topology form the first examples (with long and short strings) can be defined like that:
defmodule MyFlow do
use Strom.DSL
alias Strom.Source.ReadLines
alias Strom.Sink.WriteLines
def topology(_opts) do
parts = %{
long: fn event -> String.length(event) > 100 end,
short: fn event -> String.length(event) <= 100 end
}
[
source(:lines, %ReadLines{path: "input.txt"}),
split(:lines, parts),
sink(:short, %WriteLines{path: "short.txt"}),
sink(:long, %WriteLines{path: "long.txt"})
]
end
end
See more examples in tests.