From c53dc53e61d11e2a778def82c1460a42a68a0727 Mon Sep 17 00:00:00 2001 From: Matthew Date: Tue, 6 Aug 2024 16:00:50 +0200 Subject: [PATCH] Adding ttl to messages --- lib/turn_junebug_expressway/agent.ex | 35 ++++++++++--- lib/turn_junebug_expressway/application.ex | 2 +- lib/turn_junebug_expressway_web/utils.ex | 8 +-- .../utils_test.exs | 50 +++++++------------ 4 files changed, 49 insertions(+), 46 deletions(-) diff --git a/lib/turn_junebug_expressway/agent.ex b/lib/turn_junebug_expressway/agent.ex index ce34caf..06398fd 100644 --- a/lib/turn_junebug_expressway/agent.ex +++ b/lib/turn_junebug_expressway/agent.ex @@ -1,17 +1,36 @@ defmodule TurnJunebugExpressway.TurnAgent do - use Agent + use GenServer - def start_link(_) do - Agent.start_link(fn -> %{} end, name: __MODULE__) + def start_link(options \\ []) do + {name, options} = Keyword.pop(options, :name, __MODULE__) + GenServer.start_link(__MODULE__, options, name: name) end - def put(key, value) do - Agent.update(__MODULE__, &Map.put(&1, key, value)) + def put(pid, key, value, ttl \\ 2_000) do + GenServer.call(pid, {:put, key, value, ttl}) end - def get(key) do - Agent.get(__MODULE__, &Map.get(&1, key)) + def get(pid, key) do + GenServer.call(pid, {:get, key}) end - # TODO: Add a way to clear the cache after a set time + # GenServer callbacks + + def init(_) do + state = %{} + {:ok, state} + end + + def handle_call({:put, key, value, ttl}, _from, state) do + Process.send_after(self(), {:expire, key}, ttl) + {:reply, :ok, Map.put(state, key, value)} + end + + def handle_call({:get, key}, _from, state) do + {:reply, Map.get(state, key), state} + end + + def handle_info({:expire, key}, state) do + {:noreply, Map.delete(state, key)} + end end diff --git a/lib/turn_junebug_expressway/application.ex b/lib/turn_junebug_expressway/application.ex index 10f0225..1cb5759 100644 --- a/lib/turn_junebug_expressway/application.ex +++ b/lib/turn_junebug_expressway/application.ex @@ -12,7 +12,7 @@ defmodule TurnJunebugExpressway.Application do # Define workers and child supervisors to be supervised children = [ # Start Agent - TurnJunebugExpressway.TurnAgent, + {TurnJunebugExpressway.TurnAgent, name: :my_cache}, # Start the endpoint when the application starts TurnJunebugExpresswayWeb.Endpoint, # Start your own worker by calling: TurnJunebugExpressway.Worker.start_link(arg1, arg2, arg3) diff --git a/lib/turn_junebug_expressway_web/utils.ex b/lib/turn_junebug_expressway_web/utils.ex index 8cdd3e2..17a98c9 100644 --- a/lib/turn_junebug_expressway_web/utils.ex +++ b/lib/turn_junebug_expressway_web/utils.ex @@ -66,7 +66,7 @@ defmodule TurnJunebugExpresswayWeb.Utils do value = Map.get(message, "recipient_id") # IO.puts("#{message}") # IO.puts("#{inspect(key)}, #{inspect(value)}") - TurnJunebugExpressway.TurnAgent.put(key, value) + TurnJunebugExpressway.TurnAgent.put(:my_cache, key, value) TurnJunebugExpressway.MessageEngine.publish_message(message) end @@ -119,21 +119,21 @@ defmodule TurnJunebugExpresswayWeb.Utils do def forward_event(event) do # IO.puts("#{inspect(event)}") - IO.puts("#{inspect(TurnAgent.get(Map.get(event, "user_message_id")))}") + IO.puts("#{inspect(TurnAgent.get(:my_cache, Map.get(event, "user_message_id")))}") case event |> get_event_status do {:ignore, _} -> :ok {:ok, status} -> - if recipient_id = TurnAgent.get(Map.get(event, "user_message_id")) != nil do + if recipient_id = TurnAgent.get(:my_cache, Map.get(event, "user_message_id")) != nil do @turn_client.client() |> @turn_client.post_event(%{ "statuses" => [ %{ "id" => Map.get(event, "user_message_id"), # "recipient_id" => nil, - "recipient_id" => TurnAgent.get(Map.get(event, "user_message_id")), + "recipient_id" => TurnAgent.get(:my_cache, Map.get(event, "user_message_id")), "status" => status, "timestamp" => get_event_timestamp(event, :second) } diff --git a/test/turn_junebug_expressway_web/utils_test.exs b/test/turn_junebug_expressway_web/utils_test.exs index 0ad09df..49d7093 100644 --- a/test/turn_junebug_expressway_web/utils_test.exs +++ b/test/turn_junebug_expressway_web/utils_test.exs @@ -4,6 +4,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do import Mox alias TurnJunebugExpresswayWeb.Utils + alias TurnJunebugExpressway.TurnAgent describe "format_urn" do test "format_urn/1 with + for turn" do @@ -34,38 +35,6 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do end describe "handle_incoming_event" do - # test "sends event back to turn, recipient_id not found", %{} do - # body = %{ - # "statuses" => [ - # %{ - # "id" => "f74c4e6108d8418ab53dbcfd628242f3", - # "recipient_id" => nil, - # "status" => "sent", - # "timestamp" => "1572525144" - # } - # ] - # } - - # TurnJunebugExpressway.Backends.ClientMock - # |> expect(:client, fn -> :client end) - # |> expect(:post_event, fn :client, ^body -> :not_called end) - - # event = %{ - # "transport_name" => "d49d3569-47d5-47a0-8074-5a7ffa684832", - # "event_type" => "ack", - # "event_id" => "b3db4f670d4c4e2297c58a6dc5b72980", - # "sent_message_id" => "f74c4e6108d8418ab53dbcfd628242f3", - # "helper_metadata" => %{}, - # "routing_metadata" => %{}, - # "message_version" => "20110921", - # "timestamp" => "2019-10-31 12:32:24.930687", - # "transport_metadata" => %{}, - # "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3", - # "message_type" => "event" - # } - # assert Utils.handle_incoming_event(Jason.encode!(event)) == nil - - # end test "sends event back to turn, recipient_id not found", %{} do TurnJunebugExpressway.Backends.ClientMock |> expect(:client, fn -> :client end) @@ -85,7 +54,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do "message_type" => "event" } - # assert Utils.handle_incoming_event(Jason.encode!(event)) == nil + assert Utils.handle_incoming_event(Jason.encode!(event)) == nil end test "sends event back to turn", %{} do @@ -233,6 +202,21 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do end end + describe "ttl" do + test "checking if key is deleated after 3 seconds" do + message = %{ + "content" => "something", + "recipient_id" => "1234", + "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3" + } + + Utils.send_message(message) + assert TurnAgent.get(:my_cache, Map.get(message, "user_message_id")) == "1234" + :timer.sleep(3_000) + assert TurnAgent.get(:my_cache, Map.get(message, "user_message_id")) == nil + end + end + describe "queue_stuck?" do test "true if rate is 0 and there is messages", %{} do assert Utils.queue_stuck?(0, 1) == true