Skip to content

Commit

Permalink
meetup.livemd
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Jan 11, 2024
1 parent 6cab710 commit ae34a6a
Showing 1 changed file with 121 additions and 3 deletions.
124 changes: 121 additions & 3 deletions meetup.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Let's quickly go through the article.

## The problem

## How Elixir engineer see the problem
## How an Elixir engineer see the problem

In Elixir we have the Stream abstraction - composable, lazy enumerables.

Expand All @@ -63,7 +63,7 @@ Stream.run(stream)
Enum.to_list(stream)
```

Using Streams one can see the problem in quite straightforward way:
Using Streams one can see the problem in a quite straightforward way:

* One need a "source" that produces a stream of the "orders" events (file, db, Kafka, Rabbit, etc).
* One need a "source" that produces a stream of the "parcels" events.
Expand Down Expand Up @@ -317,6 +317,99 @@ end
See:
https://github.com/antonmi/Strom/blob/main/test/dsl_test.exs

## RoundRobin Mixer

Create a mixer that will smartly mix several streams, taking one event from each of the streams.

Let's consider a simple example with 2 streams. It can be easily generalized for any number of streams.

<!-- livebook:{"break_markdown":true} -->

<!-- Learn more at https://mermaid-js.github.io/mermaid -->

```mermaid
graph LR;
source1-->add_label1;
source2-->add_label2;
add_label1-->mix;
add_label2-->mix;
mix --> do_smart_mix
```

```elixir
defmodule RoundRobin do
use Strom.DSL

def add_label(event, label) do
{[{event, label}], label}
end

def call({number, label}, acc) do
[another] = Enum.reject(Map.keys(acc), &(&1 == label))

case Map.fetch!(acc, another) do
[hd | tl] ->
{[hd, number], Map.put(acc, another, tl)}

[] ->
numbers = Map.fetch!(acc, label)
{[], Map.put(acc, label, numbers ++ [number])}
end
end

def topology(_opts) do
[
transform(:first, &__MODULE__.add_label/2, :first),
transform(:second, &__MODULE__.add_label/2, :second),
mix([:first, :second], :mixed),
transform(:mixed, &__MODULE__.call/2, %{first: [], second: []})
]
end
end
```

```elixir
RoundRobin.start()
%{mixed: mixed} = RoundRobin.call(%{first: [1, 2, 3], second: [10, 20, 30]})
result = Enum.to_list(mixed)
RoundRobin.stop()
result
```

The RoundRobin module returns a flow data-structure, so it can be composed with other components and other flow modules.

```elixir
# reads two streams of numbers and produces a stream of sums of elements from both streams

transformer = Strom.Transformer.start()

function = fn event, acc ->
if acc do
{[acc + event], nil}
else
{[], event}
end
end

RoundRobin.start()
%{mixed: mixed} = RoundRobin.call(%{first: [1, 2, 3], second: [10, 20, 30]})
%{mixed: mixed} = Strom.Transformer.call(%{mixed: mixed}, transformer, :mixed, {function, nil})
result = Enum.to_list(mixed)
RoundRobin.stop()
result
```

```elixir
RoundRobinMany.start([:first, :second, :third])

%{mixed: mixed} =
RoundRobinMany.call(%{first: [1, 2, 3], second: [10, 20, 30], third: [100, 200, 300]})

result = Enum.to_list(mixed)
RoundRobinMany.stop()
result
```

## Finally back to the "parcels" problem

<!-- Learn more at https://mermaid-js.github.io/mermaid -->
Expand Down Expand Up @@ -447,4 +540,29 @@ def decide(event, memo) do
end
```

<!-- livebook:{"offset":10975,"stamp":{"token":"XCP.Lys1TaSuf-5QTgLGIitMRpPYjZ-zgfg3ADxWFRb0BeSjCBX2505f7_2Kz4WI_Z1OmjmCnngQvZuZ92jHYfI6UQxOss-ozTJoVR2Y0Q","version":2}} -->
See: https://github.com/antonmi/Strom/blob/main/test/examples/parcels/parcels_test.exs

<!-- livebook:{"break_markdown":true} -->

### Performance

On my simple machine (13-inch MacBook Pro, 4-cores, 2.3 GHz) it takes around 8 seconds to process 100_000 orders with `:rand.uniform(5)` parcels.
So it's around 12k order events per second.

## Conclusion

#### - Default Elixir/Erlang abstractions (GenServer, Stream) allows to do a complex streaming computations with minimal effort.

<!-- livebook:{"break_markdown":true} -->

#### - The Strom library is on early state of development currently. Right now it's just a small set of basic components that can be composed in any imaginable way. I'm pretty sure that other high-level concepts data-processing concepts (bounding, windowing, sessioning, etc) can be implemented with these components.

<!-- livebook:{"break_markdown":true} -->

#### - Hope we will be able to democratize data streams processing, so one can create a beatiful and simple streams processing logic without relying on monopolized solutions.

<!-- livebook:{"break_markdown":true} -->

####

<!-- livebook:{"offset":14295,"stamp":{"token":"XCP.4LVC-at5SPTcQQ_Ev1hOZYmArmVov0FyRXehEBaGlFkD-XRXwgXKcBZ0fuIhC9y4PGrt2rXb8hKdL-YPWVExh3ZD67oJiIwN5-U7aA","version":2}} -->

0 comments on commit ae34a6a

Please sign in to comment.