Skip to content

Commit ff17ba2

Browse files
committed
add errors on broadcast handler
1 parent c41d0eb commit ff17ba2

File tree

4 files changed

+129
-11
lines changed

4 files changed

+129
-11
lines changed

lib/realtime/tenants.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,4 +501,19 @@ defmodule Realtime.Tenants do
501501
@spec region(Tenant.t()) :: String.t() | nil
502502
def region(%Tenant{extensions: [%{settings: settings}]}), do: Map.get(settings, "region")
503503
def region(_), do: nil
504+
505+
@doc """
506+
"""
507+
@spec validate_payload_size(%Tenant{} | binary(), map()) :: :ok | {:error, :payload_size_exceeded}
508+
def validate_payload_size(tenant_id, payload) when is_binary(tenant_id) do
509+
tenant_id
510+
|> Cache.get_tenant_by_external_id()
511+
|> validate_payload_size(payload)
512+
end
513+
514+
def validate_payload_size(%Tenant{max_payload_size_in_kb: max_payload_size_in_kb}, payload) do
515+
max_payload_size = max_payload_size_in_kb * 1000
516+
payload_size = :erlang.external_size(payload)
517+
if payload_size > max_payload_size, do: {:error, :payload_size_exceeded}, else: :ok
518+
end
504519
end

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,11 @@ defmodule Realtime.Tenants.BatchBroadcast do
116116

117117
defp validate_payload_size(changeset, tenant) do
118118
payload = get_change(changeset, :payload)
119-
payload_size = byte_size(Jason.encode!(payload))
120-
max_payload_size = tenant.max_payload_size_in_kb * 1000
121119

122-
if payload_size > max_payload_size,
123-
do: add_error(changeset, :payload, "Payload size exceeds tenant limit"),
124-
else: changeset
120+
case Tenants.validate_payload_size(tenant, payload) do
121+
:ok -> changeset
122+
_ -> add_error(changeset, :payload, "Payload size exceeds tenant limit")
123+
end
125124
end
126125

127126
@event_type "broadcast"

lib/realtime_web/channels/realtime_channel/broadcast_handler.ex

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
66

77
import Phoenix.Socket, only: [assign: 3]
88

9+
alias Realtime.Tenants
910
alias RealtimeWeb.RealtimeChannel
1011
alias RealtimeWeb.TenantBroadcaster
1112
alias Phoenix.Socket
@@ -38,8 +39,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
3839
|> increment_rate_counter()
3940

4041
%{ack_broadcast: ack_broadcast} = socket.assigns
41-
send_message(tenant_id, self_broadcast, tenant_topic, payload)
42-
if ack_broadcast, do: {:reply, :ok, socket}, else: {:noreply, socket}
42+
43+
res =
44+
case Tenants.validate_payload_size(tenant_id, payload) do
45+
:ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload)
46+
{:error, error} -> {:error, error}
47+
end
48+
49+
cond do
50+
ack_broadcast && match?({:error, :payload_size_exceeded}, res) ->
51+
{:reply, {:error, :payload_size_exceeded}, socket}
52+
53+
ack_broadcast ->
54+
{:reply, :ok, socket}
55+
56+
true ->
57+
{:noreply, socket}
58+
end
4359

4460
{:ok, policies} ->
4561
{:noreply, assign(socket, :policies, policies)}
@@ -65,11 +81,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
6581
} = socket
6682

6783
socket = increment_rate_counter(socket)
68-
send_message(tenant_id, self_broadcast, tenant_topic, payload)
6984

70-
if ack_broadcast,
71-
do: {:reply, :ok, socket},
72-
else: {:noreply, socket}
85+
res =
86+
case Tenants.validate_payload_size(tenant_id, payload) do
87+
:ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload)
88+
{:error, error} -> {:error, error}
89+
end
90+
91+
cond do
92+
ack_broadcast && match?({:error, :payload_size_exceeded}, res) ->
93+
{:reply, {:error, :payload_size_exceeded}, socket}
94+
95+
ack_broadcast ->
96+
{:reply, :ok, socket}
97+
98+
true ->
99+
{:noreply, socket}
100+
end
73101
end
74102

75103
defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do

test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,82 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
312312
{:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
313313
assert avg == 0.0
314314
end
315+
316+
test "handle payload size excedding limits in private channels", %{topic: topic, tenant: tenant, db_conn: db_conn} do
317+
socket =
318+
socket_fixture(tenant, topic,
319+
policies: %Policies{broadcast: %BroadcastPolicies{write: true}},
320+
ack_broadcast: false
321+
)
322+
323+
assert {:noreply, _} =
324+
BroadcastHandler.handle(
325+
%{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)},
326+
db_conn,
327+
socket
328+
)
329+
330+
Process.sleep(120)
331+
332+
refute_received {:socket_push, :text, _}
333+
end
334+
335+
test "handle payload size excedding limits in public channels", %{topic: topic, tenant: tenant, db_conn: db_conn} do
336+
socket = socket_fixture(tenant, topic, ack_broadcast: false, private?: false)
337+
338+
assert {:noreply, _} =
339+
BroadcastHandler.handle(
340+
%{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)},
341+
db_conn,
342+
socket
343+
)
344+
345+
Process.sleep(120)
346+
347+
refute_received {:socket_push, :text, _}
348+
end
349+
350+
test "handle payload size excedding limits in private channel and if ack it will receive error", %{
351+
topic: topic,
352+
tenant: tenant,
353+
db_conn: db_conn
354+
} do
355+
socket =
356+
socket_fixture(tenant, topic,
357+
policies: %Policies{broadcast: %BroadcastPolicies{write: true}},
358+
ack_broadcast: true
359+
)
360+
361+
assert {:reply, {:error, :payload_size_exceeded}, _} =
362+
BroadcastHandler.handle(
363+
%{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)},
364+
db_conn,
365+
socket
366+
)
367+
368+
Process.sleep(120)
369+
370+
refute_received {:socket_push, :text, _}
371+
end
372+
373+
test "handle payload size excedding limits in public channels and if ack it will receive error", %{
374+
topic: topic,
375+
tenant: tenant,
376+
db_conn: db_conn
377+
} do
378+
socket = socket_fixture(tenant, topic, ack_broadcast: true, private?: false)
379+
380+
assert {:reply, {:error, :payload_size_exceeded}, _} =
381+
BroadcastHandler.handle(
382+
%{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)},
383+
db_conn,
384+
socket
385+
)
386+
387+
Process.sleep(120)
388+
389+
refute_received {:socket_push, :text, _}
390+
end
315391
end
316392

317393
defp initiate_tenant(context) do

0 commit comments

Comments
 (0)