Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Kinesis Avro Event Consumption #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore local config.
/config/config.local.exs
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,35 @@ consume. Events are published as Apach Avro serialized OCF files.
This library is a common library that can be used by our Elixir applications
to both publish to and read from the event stream.

## Usage

Add as a dependency (and ensure started if < Elixir 1.4):

```elixir
{:ello_event_stream, github: "ello/ello-event-stream"},
```

### Reading/Consuming an event stream

By default no consumers are started. Starting the Reader will start a
supervision tree for the given stream name in the `:ello_event_stream` app.

Each shard will be processed serially and passed to the consumer function.

The consumer function is passed in as a `{Module, :function}` tuple and must
be a function which receives a single argument which is an
`%Ello.EventStream.Event{}`. The consumer function should return `:ok` if
the event was successfully processed or skipped, otherwise the shard will stop
processing events.

The consumer may choose to spawn more processes to handle each event if desired.

```elixir
Ello.EventStream.read("stream-name", {MyApp, :process}, opts)
```



## License
Released under the [MIT License](/LICENSE.txt)

Expand Down
31 changes: 3 additions & 28 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,30 +1,5 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

# You can configure for your application as:
#
# config :ello_event_stream, key: :value
#
# And access this configuration in your application as:
#
# Application.get_env(:ello_event_stream, :key)
#
# Or configure a 3rd-party app:
#
# config :logger, level: :info
#

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"
if File.exists?("config/config.local.exs") do
import_config "config.local.exs"
end
26 changes: 14 additions & 12 deletions lib/ello_event_stream.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
defmodule Ello.EventStream do
@moduledoc """
Documentation for Ello.EventStream.
"""
alias __MODULE__.{
Kinesis.StreamSupervisor
}

@doc """
Hello world.

## Examples
def read(stream, callback, opts \\ %{}) do
StreamSupervisor.start_stream(stream, callback, opts)
end

iex> Ello.EventStream.hello
:world
# Ello.EventStream.tmp
def tmp do
read("ello-staging-stream", {__MODULE__, :inspect})
end

"""
def hello do
:world
def inspect(event) do
event
|> IO.inspect
:ok
end
end
8 changes: 1 addition & 7 deletions lib/ello_event_stream/application.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
defmodule Ello.EventStream.Application do
# See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications
@moduledoc false

use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false

# Define workers and child supervisors to be supervised
children = [
# Starts a worker by calling: Ello.EventStream.Worker.start_link(arg1, arg2, arg3)
# worker(Ello.EventStream.Worker, [arg1, arg2, arg3]),
supervisor(Ello.EventStream.Kinesis.StreamSupervisor, []),
]

# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Ello.EventStream.Supervisor]
Supervisor.start_link(children, opts)
end
Expand Down
22 changes: 22 additions & 0 deletions lib/ello_event_stream/avro.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Ello.EventStream.Avro do

@doc """
Parse an Avro OCF format binary.
"""
def parse_ocf(binary) do
{_header, schema, data} = :avro_ocf.decode_binary(binary)
{:avro_record_type, type, _, _, _, _definition, _type, _} = schema
{type, deep_into(data)}
end

# Convert lists of tuples into maps.
defp deep_into([input]), do: deep_into(input)
defp deep_into(input) when is_list(input), do: deep_into(input, %{})
defp deep_into(other), do: other
defp deep_into([], output), do: output
defp deep_into([{k, v} | rest], output) do
deep_into(rest, Map.put(output, k, deep_into(v)))
end
defp deep_into(other, _output), do: other

end
19 changes: 19 additions & 0 deletions lib/ello_event_stream/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Ello.EventStream.Event do
alias Ello.EventStream.Avro
defstruct type: "", data: nil, shard: nil, sequence_number: nil, raw: ""

def from_kinesis(record) do
parse_raw(%__MODULE__{
raw: record["Data"],
sequence_number: record["SequenceNumber"],
shard: record["PartitionKey"],
})
end

defp parse_raw(%__MODULE__{raw: raw} = event) when not is_nil(raw) do
{type, data} = raw
|> Base.decode64!
|> Avro.parse_ocf
%{event | data: data, type: type}
end
end
62 changes: 62 additions & 0 deletions lib/ello_event_stream/kinesis.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule Ello.EventStream.Kinesis do

alias Ello.EventStream.{
Event,
}

@doc """
Get the shard ids for a given stream.
"""
def shard_ids(stream) do
with {:ok, resp} <- describe_stream(stream),
shards when is_list(shards) <- resp["StreamDescription"]["Shards"] do
Enum.map(shards, &(&1["ShardId"]))
else
_ -> raise "Stream #{stream} not found"
end
end

@doc """
Get an interator for the shard and known sequence_number.
"""
def get_iterator(stream, shard, nil) do
req = client().get_shard_iterator(stream, shard, :trim_horizon)
{:ok, resp} = client_execute(req)
resp["ShardIterator"]
end
def get_iterator(stream, shard, sequence_number) do
req = client().get_shard_iterator(stream, shard, :after_sequence_number,
starting_sequence_number: sequence_number)
{:ok, resp} = client_execute(req)
resp["ShardIterator"]
end

@doc """
Retrieves events starting at the given iterator.
"""
def events(iterator, limit, backoff \\ 1000) do
req = client().get_records(iterator, limit: limit)
case client_execute(req) do
{:ok, %{"Records" => records, "NextShardIterator" => next, "MillisBehindLatest" => ms_behind}} ->
{Enum.map(records, &Event.from_kinesis/1), next, ms_behind}
{:error, {:http_error, 400, %{"__type" => "ExpiredIteratorException"}}} ->
{:error, :iterator_expired}
{:error, _} ->
:timer.sleep(backoff)
events(iterator, limit, backoff * 2)
end
end

defp describe_stream(stream) do
client_execute(client().describe_stream(stream))
end

defp client do
Application.get_env(:ello_event_stream, :client, ExAws.Kinesis)
end

defp client_execute(request) do
{mod, fun} = Application.get_env(:ello_event_stream, :client_execute, {ExAws, :request})
apply(mod, fun, [request])
end
end
113 changes: 113 additions & 0 deletions lib/ello_event_stream/kinesis/shard_processor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule Ello.EventStream.Kinesis.ShardProcessor do
require Logger
alias Ello.EventStream.{
Kinesis,
}

@moduledoc """
Started by Stream Supervisor, processes events for a given stream's shard.

Not typically started manually.
"""

defmodule State do
defstruct [
stream: nil,
shard: nil,
iterator: nil,
callback: nil,
limit: 100,
events: [],
last_fetch: nil,
exit: false,
last_sequence_number: nil,
]
end

def process(stream, shard, callback, opts) do
process(%State{
stream: stream,
shard: shard,
callback: callback,
limit: opts[:limit] || 100,
})
end

def process(%State{exit: true, shard: shard}) do
Logger.info("Shard #{shard} has been closed, all records are processed")
Process.exit(self(), :normal)
end

def process(%State{} = state) do
state
|> get_last_sequence_number
|> get_new_iterator
|> rate_limit
|> get_events
|> process_events
|> process
end

defp get_last_sequence_number(%{last_sequence_number: nil} = state) do
#TODO: Get from redis
state
end
defp get_last_sequence_number(state), do: state

defp get_new_iterator(%{iterator: nil} = state) do
case kinesis().get_iterator(state.stream, state.shard, state.last_sequence_number) do
nil -> %{state | iterator: nil, exit: true}
other -> %{state | iterator: other}
end
end
defp get_new_iterator(state), do: state

defp rate_limit(%{last_fetch: nil} = state),
do: %{state | last_fetch: System.system_time(:second)}
defp rate_limit(%{last_fetch: last_fetch} = state) do
if (last_fetch - System.system_time(:second)) < rate_limit() do
:timer.sleep(1000)
end
%{state | last_fetch: System.system_time(:second)}
end

defp get_events(%{exit: true} = state), do: state
defp get_events(state) do
case kinesis().events(state.iterator, state.limit) do
{:error, :iterator_expired} ->
%{state | events: [], interator: nil}
{events, nil, _ms_behind} ->
%{state | events: events, iterator: nil, exit: true}
{events, next_iterator, ms_behind} ->
Logger.info "Got batch of #{length(events)} records, #{ms_behind}ms behind latest"
%{state | events: events, iterator: next_iterator}
end
end

defp process_events(%{events: []} = state), do: state
defp process_events(%{events: [event | rest]} = state) do
Logger.debug "Processing #{event.type} event ##{event.sequence_number}."
{mod, fun} = state.callback
case apply(mod, fun, [event]) do
:ok ->
state
|> set_last_sequence_number(event)
|> Map.put(:events, rest)
|> process_events
_ -> raise "Processing failed"
end
end

defp set_last_sequence_number(state, %{sequence_number: sequence_number}) do
#TODO: Put in redis.
%{state | last_sequence_number: sequence_number}
end

defp kinesis do
Application.get_env(:ello_event_stream, :kinesis_client, Kinesis)
end

defp rate_limit do
Application.get_env(:ello_event_stream, :kinesis_read_request_min_rate, 1)
end
end
32 changes: 32 additions & 0 deletions lib/ello_event_stream/kinesis/stream_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Ello.EventStream.Kinesis.StreamSupervisor do
@moduledoc """
For each stream, starts and supervises each shard to be read.
"""
use Supervisor
alias Ello.EventStream.{
Kinesis,
Kinesis.ShardProcessor,
}

@doc false
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end

@doc """
Starts processing a stream.

Gets each shard from the stream and starts a ShardSupervisor for each one.
"""
@spec start_stream(stream :: String.t, {Module.t, :atom}, Keyword.t) :: [{:ok, pid}]
def start_stream(stream, callback, opts) do
stream
|> Kinesis.shard_ids
|> Enum.map(&Supervisor.start_child(__MODULE__, [ShardProcessor, :process, [stream, &1, callback, opts]]))
end

def init(:ok) do
children = [worker(Task, [], restart: :transient)]
supervise(children, strategy: :simple_one_for_one)
end
end
Loading