Skip to content

Commit

Permalink
Adding ttl to messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew committed Aug 6, 2024
1 parent 201fcb1 commit c53dc53
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 46 deletions.
35 changes: 27 additions & 8 deletions lib/turn_junebug_expressway/agent.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
defmodule TurnJunebugExpressway.TurnAgent do
use Agent
use GenServer

def start_link(_) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
def start_link(options \\ []) do
{name, options} = Keyword.pop(options, :name, __MODULE__)
GenServer.start_link(__MODULE__, options, name: name)
end

def put(key, value) do
Agent.update(__MODULE__, &Map.put(&1, key, value))
def put(pid, key, value, ttl \\ 2_000) do
GenServer.call(pid, {:put, key, value, ttl})
end

def get(key) do
Agent.get(__MODULE__, &Map.get(&1, key))
def get(pid, key) do
GenServer.call(pid, {:get, key})
end

# TODO: Add a way to clear the cache after a set time
# GenServer callbacks

def init(_) do
state = %{}
{:ok, state}
end

def handle_call({:put, key, value, ttl}, _from, state) do
Process.send_after(self(), {:expire, key}, ttl)
{:reply, :ok, Map.put(state, key, value)}
end

def handle_call({:get, key}, _from, state) do
{:reply, Map.get(state, key), state}
end

def handle_info({:expire, key}, state) do
{:noreply, Map.delete(state, key)}
end
end
2 changes: 1 addition & 1 deletion lib/turn_junebug_expressway/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule TurnJunebugExpressway.Application do
# Define workers and child supervisors to be supervised
children = [
# Start Agent
TurnJunebugExpressway.TurnAgent,
{TurnJunebugExpressway.TurnAgent, name: :my_cache},
# Start the endpoint when the application starts
TurnJunebugExpresswayWeb.Endpoint,
# Start your own worker by calling: TurnJunebugExpressway.Worker.start_link(arg1, arg2, arg3)
Expand Down
8 changes: 4 additions & 4 deletions lib/turn_junebug_expressway_web/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule TurnJunebugExpresswayWeb.Utils do
value = Map.get(message, "recipient_id")
# IO.puts("#{message}")
# IO.puts("#{inspect(key)}, #{inspect(value)}")
TurnJunebugExpressway.TurnAgent.put(key, value)
TurnJunebugExpressway.TurnAgent.put(:my_cache, key, value)
TurnJunebugExpressway.MessageEngine.publish_message(message)
end

Expand Down Expand Up @@ -119,21 +119,21 @@ defmodule TurnJunebugExpresswayWeb.Utils do

def forward_event(event) do
# IO.puts("#{inspect(event)}")
IO.puts("#{inspect(TurnAgent.get(Map.get(event, "user_message_id")))}")
IO.puts("#{inspect(TurnAgent.get(:my_cache, Map.get(event, "user_message_id")))}")

case event |> get_event_status do
{:ignore, _} ->
:ok

{:ok, status} ->
if recipient_id = TurnAgent.get(Map.get(event, "user_message_id")) != nil do
if recipient_id = TurnAgent.get(:my_cache, Map.get(event, "user_message_id")) != nil do
@turn_client.client()
|> @turn_client.post_event(%{
"statuses" => [
%{
"id" => Map.get(event, "user_message_id"),
# "recipient_id" => nil,
"recipient_id" => TurnAgent.get(Map.get(event, "user_message_id")),
"recipient_id" => TurnAgent.get(:my_cache, Map.get(event, "user_message_id")),
"status" => status,
"timestamp" => get_event_timestamp(event, :second)
}
Expand Down
50 changes: 17 additions & 33 deletions test/turn_junebug_expressway_web/utils_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
import Mox

alias TurnJunebugExpresswayWeb.Utils
alias TurnJunebugExpressway.TurnAgent

describe "format_urn" do
test "format_urn/1 with + for turn" do
Expand Down Expand Up @@ -34,38 +35,6 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
end

describe "handle_incoming_event" do
# test "sends event back to turn, recipient_id not found", %{} do
# body = %{
# "statuses" => [
# %{
# "id" => "f74c4e6108d8418ab53dbcfd628242f3",
# "recipient_id" => nil,
# "status" => "sent",
# "timestamp" => "1572525144"
# }
# ]
# }

# TurnJunebugExpressway.Backends.ClientMock
# |> expect(:client, fn -> :client end)
# |> expect(:post_event, fn :client, ^body -> :not_called end)

# event = %{
# "transport_name" => "d49d3569-47d5-47a0-8074-5a7ffa684832",
# "event_type" => "ack",
# "event_id" => "b3db4f670d4c4e2297c58a6dc5b72980",
# "sent_message_id" => "f74c4e6108d8418ab53dbcfd628242f3",
# "helper_metadata" => %{},
# "routing_metadata" => %{},
# "message_version" => "20110921",
# "timestamp" => "2019-10-31 12:32:24.930687",
# "transport_metadata" => %{},
# "user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3",
# "message_type" => "event"
# }
# assert Utils.handle_incoming_event(Jason.encode!(event)) == nil

# end
test "sends event back to turn, recipient_id not found", %{} do
TurnJunebugExpressway.Backends.ClientMock
|> expect(:client, fn -> :client end)
Expand All @@ -85,7 +54,7 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
"message_type" => "event"
}

# assert Utils.handle_incoming_event(Jason.encode!(event)) == nil
assert Utils.handle_incoming_event(Jason.encode!(event)) == nil
end

test "sends event back to turn", %{} do
Expand Down Expand Up @@ -233,6 +202,21 @@ defmodule TurnJunebugExpresswayWeb.UtilsTest do
end
end

describe "ttl" do
test "checking if key is deleated after 3 seconds" do
message = %{
"content" => "something",
"recipient_id" => "1234",
"user_message_id" => "f74c4e6108d8418ab53dbcfd628242f3"
}

Utils.send_message(message)
assert TurnAgent.get(:my_cache, Map.get(message, "user_message_id")) == "1234"
:timer.sleep(3_000)
assert TurnAgent.get(:my_cache, Map.get(message, "user_message_id")) == nil
end
end

describe "queue_stuck?" do
test "true if rate is 0 and there is messages", %{} do
assert Utils.queue_stuck?(0, 1) == true
Expand Down

0 comments on commit c53dc53

Please sign in to comment.