diff --git a/meetup.livemd b/meetup.livemd index 18abf14..6818d44 100644 --- a/meetup.livemd +++ b/meetup.livemd @@ -45,7 +45,7 @@ Let's quickly go through the article. ## The problem -## How Elixir engineer see the problem +## How an Elixir engineer see the problem In Elixir we have the Stream abstraction - composable, lazy enumerables. @@ -63,7 +63,7 @@ Stream.run(stream) Enum.to_list(stream) ``` -Using Streams one can see the problem in quite straightforward way: +Using Streams one can see the problem in a quite straightforward way: * One need a "source" that produces a stream of the "orders" events (file, db, Kafka, Rabbit, etc). * One need a "source" that produces a stream of the "parcels" events. @@ -317,6 +317,99 @@ end See: https://github.com/antonmi/Strom/blob/main/test/dsl_test.exs +## RoundRobin Mixer + +Create a mixer that will smartly mix several streams, taking one event from each of the streams. + +Let's consider a simple example with 2 streams. It can be easily generalized for any number of streams. + + + + + +```mermaid +graph LR; + source1-->add_label1; + source2-->add_label2; + add_label1-->mix; + add_label2-->mix; + mix --> do_smart_mix +``` + +```elixir +defmodule RoundRobin do + use Strom.DSL + + def add_label(event, label) do + {[{event, label}], label} + end + + def call({number, label}, acc) do + [another] = Enum.reject(Map.keys(acc), &(&1 == label)) + + case Map.fetch!(acc, another) do + [hd | tl] -> + {[hd, number], Map.put(acc, another, tl)} + + [] -> + numbers = Map.fetch!(acc, label) + {[], Map.put(acc, label, numbers ++ [number])} + end + end + + def topology(_opts) do + [ + transform(:first, &__MODULE__.add_label/2, :first), + transform(:second, &__MODULE__.add_label/2, :second), + mix([:first, :second], :mixed), + transform(:mixed, &__MODULE__.call/2, %{first: [], second: []}) + ] + end +end +``` + +```elixir +RoundRobin.start() +%{mixed: mixed} = RoundRobin.call(%{first: [1, 2, 3], second: [10, 20, 30]}) +result = Enum.to_list(mixed) +RoundRobin.stop() +result +``` + +The RoundRobin module returns a flow data-structure, so it can be composed with other components and other flow modules. + +```elixir +# reads two streams of numbers and produces a stream of sums of elements from both streams + +transformer = Strom.Transformer.start() + +function = fn event, acc -> + if acc do + {[acc + event], nil} + else + {[], event} + end +end + +RoundRobin.start() +%{mixed: mixed} = RoundRobin.call(%{first: [1, 2, 3], second: [10, 20, 30]}) +%{mixed: mixed} = Strom.Transformer.call(%{mixed: mixed}, transformer, :mixed, {function, nil}) +result = Enum.to_list(mixed) +RoundRobin.stop() +result +``` + +```elixir +RoundRobinMany.start([:first, :second, :third]) + +%{mixed: mixed} = + RoundRobinMany.call(%{first: [1, 2, 3], second: [10, 20, 30], third: [100, 200, 300]}) + +result = Enum.to_list(mixed) +RoundRobinMany.stop() +result +``` + ## Finally back to the "parcels" problem @@ -447,4 +540,29 @@ def decide(event, memo) do end ``` - +See: https://github.com/antonmi/Strom/blob/main/test/examples/parcels/parcels_test.exs + + + +### Performance + +On my simple machine (13-inch MacBook Pro, 4-cores, 2.3 GHz) it takes around 8 seconds to process 100_000 orders with `:rand.uniform(5)` parcels. +So it's around 12k order events per second. + +## Conclusion + +#### - Default Elixir/Erlang abstractions (GenServer, Stream) allows to do a complex streaming computations with minimal effort. + + + +#### - The Strom library is on early state of development currently. Right now it's just a small set of basic components that can be composed in any imaginable way. I'm pretty sure that other high-level concepts data-processing concepts (bounding, windowing, sessioning, etc) can be implemented with these components. + + + +#### - Hope we will be able to democratize data streams processing, so one can create a beatiful and simple streams processing logic without relying on monopolized solutions. + + + +#### + +