Skip to content

Commit

Permalink
Merge branch 'master' into elixir-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
vasuadari committed May 7, 2024
2 parents c212ef6 + 26f7ef0 commit 40a184e
Show file tree
Hide file tree
Showing 19 changed files with 161 additions and 43 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: Install OTP and Elixir
uses: actions/setup-elixir@v1.2.0
uses: erlef/setup-beam@v1
with:
otp-version: 22.1
elixir-version: 1.7.4
elixir-version: 1.8.2
- name: Install dependencies
run: mix deps.get --only test
- name: Check mix format
Expand All @@ -30,10 +30,10 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: Install OTP and Elixir
uses: actions/setup-elixir@v1.2.0
uses: erlef/setup-beam@v1
with:
otp-version: 22.1
elixir-version: 1.7.4
elixir-version: 1.8.2
- name: Install dependencies
run: |
mix deps.get --only test
Expand Down
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.7.4
elixir 1.8.2
erlang 21.3
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
redis:
image: redis
ports:
- '6379'
- '6379:6379'

flume:
image: bitwalker/alpine-elixir:1.6.6
Expand Down
1 change: 1 addition & 0 deletions lib/flume/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Flume.Config do
dequeue_lock_ttl: 30_000,
dequeue_process_timeout: 10_000,
dequeue_lock_poll_interval: 500,
debug_log: false,
# In seconds
visibility_timeout: 600,
instrumentation: [
Expand Down
12 changes: 10 additions & 2 deletions lib/flume/default_logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ defmodule Flume.DefaultLogger do

require Logger

def debug(message, %{}), do: Logger.debug(message)
def debug(message, opts), do: Logger.debug("#{message} - #{inspect(opts)}")
def debug(message, %{}) do
if Flume.Config.debug_log(), do: Logger.debug(message)
end

def debug(message, opts) do
if Flume.Config.debug_log(), do: Logger.debug("#{message} - #{inspect(opts)}")
end

def error(message, %{}), do: Logger.error(message)
def error(message, opts), do: Logger.error("#{message} - #{inspect(opts)}")

def info(message, %{}), do: Logger.info(message)
def info(message, opts), do: Logger.info("#{message} - #{inspect(opts)}")

def warn(message, %{}), do: Logger.warn(message)
def warn(message, opts), do: Logger.warn("#{message} - #{inspect(opts)}")
end
4 changes: 4 additions & 0 deletions lib/flume/instrumentation/default_event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ defmodule Flume.Instrumentation.DefaultEventHandler do
Logger.info("#{app_name}/#{Instrumentation.format_event_name(event_name)} - #{value}")
end

def handle(_, _, _, _) do
:ok
end

defp metric_path(event_name, nil), do: Instrumentation.format_event_name(event_name)

defp metric_path(event_name, module) do
Expand Down
13 changes: 13 additions & 0 deletions lib/flume/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Flume.Logger do
@callback debug(String.t(), map()) :: :ok | :error
@callback error(String.t(), map()) :: :ok | :error
@callback info(String.t(), map()) :: :ok | :error
@callback warn(String.t(), map()) :: :ok | :error

defmacro debug(message) do
quote location: :keep do
Expand All @@ -26,6 +27,12 @@ defmodule Flume.Logger do
end
end

defmacro warn(message) do
quote location: :keep do
apply(Flume.Config.logger(), :warn, [unquote(message), %{}])
end
end

defmacro debug(message, opts) do
quote location: :keep do
apply(Flume.Config.logger(), :debug, [unquote(message), unquote(opts)])
Expand All @@ -43,4 +50,10 @@ defmodule Flume.Logger do
apply(Flume.Config.logger(), :info, [unquote(message), unquote(opts)])
end
end

defmacro warn(message, opts) do
quote location: :keep do
apply(Flume.Config.logger(), :warn, [unquote(message), unquote(opts)])
end
end
end
17 changes: 11 additions & 6 deletions lib/flume/pipeline/event/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule Flume.Pipeline.Event.Producer do
state =
state
|> Map.put(:paused, EventPipeline.paused_state(name))
|> Map.put(:fetch_events_scheduled, false)
|> Map.put(:demand, 0)

{:producer, state}
Expand All @@ -63,6 +64,7 @@ defmodule Flume.Pipeline.Event.Producer do

def handle_info(:fetch_events, state) do
# This callback is invoked by the Process.send_after/3 message below.
state = Map.put(state, :fetch_events_scheduled, false)
dispatch_events(state)
end

Expand Down Expand Up @@ -107,7 +109,7 @@ defmodule Flume.Pipeline.Event.Producer do
end

defp dispatch_events(%{paused: true} = state) do
schedule_fetch_events(state)
state = schedule_fetch_events(state)

{:noreply, [], state}
end
Expand Down Expand Up @@ -144,7 +146,7 @@ defmodule Flume.Pipeline.Event.Producer do
end

defp dispatch_events(state) do
schedule_fetch_events(state)
state = schedule_fetch_events(state)

{:noreply, [], state}
end
Expand All @@ -164,26 +166,29 @@ defmodule Flume.Pipeline.Event.Producer do
new_demand = state.demand - round(count / batch_size)
state = Map.put(state, :demand, new_demand)

schedule_fetch_events(state)
state = schedule_fetch_events(state)

{:noreply, events, state}
end

defp handle_locked_fetch(state) do
schedule_fetch_events(state, @lock_poll_interval)
state = schedule_fetch_events(state, @lock_poll_interval)

{:noreply, [], state}
end

defp schedule_fetch_events(state), do: schedule_fetch_events(state, @default_interval)

defp schedule_fetch_events(%{demand: demand} = _state, interval)
defp schedule_fetch_events(%{fetch_events_scheduled: true} = state, _), do: state

defp schedule_fetch_events(%{demand: demand} = state, interval)
when demand > 0 do
# Schedule the next request
Process.send_after(self(), :fetch_events, interval)
Map.put(state, :fetch_events_scheduled, true)
end

defp schedule_fetch_events(_state, _interval), do: nil
defp schedule_fetch_events(state, _interval), do: state

# For regular pipelines
defp take(demand, %{rate_limit_count: nil, rate_limit_scale: nil} = state) do
Expand Down
6 changes: 6 additions & 0 deletions lib/flume/queue/processing_scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ defmodule Flume.Queue.ProcessingScheduler do
{:noreply, state}
end

def handle_info(msg, state) do
Logger.warn("#{__MODULE__}: Unknown message - #{inspect(msg)}")

{:noreply, state}
end

defp work(state) do
Manager.enqueue_processing_jobs(
state.namespace,
Expand Down
6 changes: 6 additions & 0 deletions lib/flume/queue/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ defmodule Flume.Queue.Scheduler do
{:noreply, state}
end

def handle_info(msg, state) do
Logger.warn("#{__MODULE__}: Unknown message - #{inspect(msg)}")

{:noreply, state}
end

defp work(state) do
Manager.remove_and_enqueue_scheduled_jobs(
state.namespace,
Expand Down
5 changes: 5 additions & 0 deletions lib/flume/redis/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Flume.Redis.Client do
@decrby "DECRBY"
@del "DEL"
@evalsha "EVALSHA"
@eval "EVAL"
@get "GET"
@hgetall "HGETALL"
@incr "INCR"
Expand Down Expand Up @@ -363,6 +364,10 @@ defmodule Flume.Redis.Client do
[@evalsha] ++ args
end

def eval_command(args) do
[@eval] ++ args
end

def hmget(hash_key_list) when hash_key_list |> is_list(),
do: hash_key_list |> Command.hmget() |> pipeline()

Expand Down
6 changes: 2 additions & 4 deletions lib/flume/redis/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Flume.Redis.Job do
alias Flume.Support.Time
alias Flume.Redis.{Client, Script, SortedSet, BulkDequeue}

@enqueue_processing_jobs_sha Script.sha(:enqueue_processing_jobs)
@enqueue_processing_jobs_script Script.compile(:enqueue_processing_jobs)

def enqueue(queue_key, job) do
try do
Expand Down Expand Up @@ -51,15 +51,13 @@ defmodule Flume.Redis.Job do
as: :exec_rate_limited

def enqueue_processing_jobs(sorted_set_key, queue_key, current_score, limit) do
Client.evalsha_command([
@enqueue_processing_jobs_sha,
Script.eval(@enqueue_processing_jobs_script, [
_num_of_keys = 2,
sorted_set_key,
queue_key,
current_score,
limit
])
|> Client.query()
|> case do
{:error, reason} ->
{:error, reason}
Expand Down
6 changes: 2 additions & 4 deletions lib/flume/redis/lock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Flume.Redis.Lock do

alias Flume.Redis.{Client, Script}

@release_lock_sha Script.sha(:release_lock)
@release_lock_script Script.compile(:release_lock)

def acquire(
lock_key,
Expand All @@ -25,13 +25,11 @@ defmodule Flume.Redis.Lock do

def release(lock_key, token) do
response =
Client.evalsha_command([
@release_lock_sha,
Script.eval(@release_lock_script, [
_num_of_keys = 1,
lock_key,
token
])
|> Client.query()

case response do
{:ok, _count} ->
Expand Down
29 changes: 24 additions & 5 deletions lib/flume/redis/script.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,30 @@ defmodule Flume.Redis.Script do
:ok
end

@spec sha(binary) :: binary
def sha(script_name) do
script = Path.join(scripts_dir(), "#{script_name}.lua")
hash_sha = :crypto.hash(:sha, File.read!(script))
Base.encode16(hash_sha, case: :lower)
@spec compile(binary) :: {binary, binary}
def compile(script_name) do
script =
Path.join(scripts_dir(), "#{script_name}.lua")
|> File.read!()

hash_sha = :crypto.hash(:sha, script)
{script, Base.encode16(hash_sha, case: :lower)}
end

@spec eval({binary, binary}, List.t()) :: {:ok, term} | {:error, term}
def eval({script, sha}, arguments) do
result =
Client.evalsha_command([sha | arguments])
|> Client.query()

case result do
{:error, %Redix.Error{message: "NOSCRIPT" <> _}} ->
Client.eval_command([script | arguments])
|> Client.query()

result ->
result
end
end

@spec scripts_dir() :: binary
Expand Down
12 changes: 6 additions & 6 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Flume.Mixfile do
[
app: :flume,
version: "0.2.0",
elixir: "~> 1.7",
elixir: "~> 1.8",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
deps: deps(),
Expand Down Expand Up @@ -42,15 +42,15 @@ defmodule Flume.Mixfile do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:redix, "~> 0.10.0"},
{:redix, "~> 1.0"},
{:gen_stage, "~> 0.14.0"},
{:jason, "~> 1.1.0"},
{:poolboy, "~> 1.5.1"},
{:uuid, "~> 1.1.8"},
{:logger_file_backend, "~> 0.0.11"},
{:retry, "~> 0.11.0"},
{:elixir_uuid, "~> 1.2"},
{:logger_file_backend, "~> 0.0.10"},
{:retry, "0.8.2"},
{:benchee, "~> 1.0"},
{:telemetry, "~> 0.4.0"},
{:telemetry, "~> 1.0"},
{:excoveralls, "~> 0.10.6", only: :test}
]
end
Expand Down
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"excoveralls": {:hex, :excoveralls, "0.10.6", "e2b9718c9d8e3ef90bc22278c3f76c850a9f9116faf4ebe9678063310742edc2", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "b06c73492aa9940c4c29cfc1356bcf5540ae318f17b423749a0615a66ee3e049"},
"gen_stage": {:hex, :gen_stage, "0.14.0", "65ae78509f85b59d360690ce3378d5096c3130a0694bab95b0c4ae66f3008fad", [:mix], [], "hexpm", "095d38418e538af99ac82043985d26724164b78736a1d0f137c308332ad46250"},
"hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "c2790c9f0f7205f4a362512192dee8179097394400e745e4d20bab7226a8eaad"},
Expand All @@ -14,10 +15,9 @@
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm", "8f7168911120e13419e086e78d20e4d1a6776f1eee2411ac9f790af10813389f"},
"redix": {:hex, :redix, "0.10.0", "886cfbb14f9b78b82f38963695be2c6ed54b4f5cf911acbf70278ba09144f55d", [:mix], [{:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d8266f5a13291f1d8d7003ad48820e80d34c48348dd762e6564a806cb4b5bc0f"},
"retry": {:hex, :retry, "0.11.2", "29f9ab8e7d78878307f4653adc286d8a8baa6b66b6bcb67925c07a1386ef7867", [:mix], [], "hexpm", "0b5f405edab04fd618253564fd964a0ac64eea5ceffd690a83831efc6e5bdfa6"},
"redix": {:hex, :redix, "1.1.5", "6fc460d66a5c2287e83e6d73dddc8d527ff59cb4d4f298b41e03a4db8c3b2bd5", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "679afdd4c14502fe9c11387ff1cdcb33065a1cf511097da1eee407f17c7a418b"},
"retry": {:hex, :retry, "0.8.2", "7b57bd5e1e7efeca04dd740cabdc3930c472cfa7e0186949de180c64417e9c35", [:mix], [], "hexpm", "ae8969bfea65bf2adf9e7cc6e59bc2d1b6d9e04bcb7d2c53b996d9b8a023fe78"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm", "603561dc0fd62f4f2ea9b890f4e20e1a0d388746d6e20557cafb1b16950de88c"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
"telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}
}
Loading

0 comments on commit 40a184e

Please sign in to comment.