Skip to content

Commit 8f4c4c7

Browse files
committed
pr feedback
1 parent 9205e20 commit 8f4c4c7

File tree

9 files changed

+35
-66
lines changed

9 files changed

+35
-66
lines changed

lib/realtime/tenants.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,9 @@ defmodule Realtime.Tenants do
511511
|> validate_payload_size(payload)
512512
end
513513

514+
@payload_size_padding 500
514515
def validate_payload_size(%Tenant{max_payload_size_in_kb: max_payload_size_in_kb}, payload) do
515-
max_payload_size = max_payload_size_in_kb * 1000
516+
max_payload_size = max_payload_size_in_kb * 1000 + @payload_size_padding
516517
payload_size = :erlang.external_size(payload)
517518
if payload_size > max_payload_size, do: {:error, :payload_size_exceeded}, else: :ok
518519
end

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
3333
messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()})
3434
},
3535
super_user :: boolean()
36-
) :: :ok | {:error, atom()}
36+
) :: :ok | {:error, atom() | Ecto.Changeset.t()}
3737
def broadcast(auth_params, tenant, messages, super_user \\ false)
3838

3939
def broadcast(%Plug.Conn{} = conn, %Tenant{} = tenant, messages, super_user) do
@@ -71,15 +71,11 @@ defmodule Realtime.Tenants.BatchBroadcast do
7171
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
7272
|> Enum.each(fn {topic, events} ->
7373
if super_user do
74-
Enum.each(events, fn message ->
75-
send_message_and_count(tenant, events_per_second_rate, message, false)
76-
end)
74+
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
7775
else
7876
case permissions_for_message(tenant, auth_params, topic) do
7977
%Policies{broadcast: %BroadcastPolicies{write: true}} ->
80-
Enum.each(events, fn message ->
81-
send_message_and_count(tenant, events_per_second_rate, message, false)
82-
end)
78+
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
8379

8480
_ ->
8581
nil
@@ -88,6 +84,9 @@ defmodule Realtime.Tenants.BatchBroadcast do
8884
end)
8985

9086
:ok
87+
else
88+
%Ecto.Changeset{valid?: false} = changeset -> {:error, changeset}
89+
error -> error
9190
end
9291
end
9392

lib/realtime/tenants/replication_connection.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,12 @@ defmodule Realtime.Tenants.ReplicationConnection do
329329

330330
{:noreply, state}
331331
else
332-
{:error, error} ->
332+
{:error, %Ecto.Changeset{valid?: false} = changeset} ->
333+
error = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
333334
log_error("UnableToBroadcastChanges", error)
334335
{:noreply, state}
335336

336-
%Ecto.Changeset{valid?: false, changes: %{messages: [%{errors: [payload: {error, _}]} | _]}} ->
337+
{:error, error} ->
337338
log_error("UnableToBroadcastChanges", error)
338339
{:noreply, state}
339340

lib/realtime_web/channels/realtime_channel/presence_handler.ex

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,5 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
161161
end
162162
end
163163

164-
# Added due to the fact that JSON decoding adds some overhead and erlang term will be slighly larger
165-
@payload_size_padding 500
166-
defp validate_payload_size(tenant, payload) do
167-
if :erlang.external_size(payload) > tenant.max_payload_size_in_kb * 1000 + @payload_size_padding do
168-
{:error, :payload_size_exceeded}
169-
else
170-
:ok
171-
end
172-
end
164+
defp validate_payload_size(tenant, payload), do: Tenants.validate_payload_size(tenant, payload)
173165
end

lib/realtime_web/controllers/fallback_controller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule RealtimeWeb.FallbackController do
4545
|> render("error.json", message: message)
4646
end
4747

48-
def call(conn, %Ecto.Changeset{valid?: true} = changeset) do
48+
def call(conn, {:error, %Ecto.Changeset{valid?: false} = changeset}) do
4949
log_error(
5050
"UnprocessableEntity",
5151
Ecto.Changeset.traverse_errors(changeset, &translate_error/1)

test/realtime/tenants/batch_broadcast_test.exs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
372372
reject(&TenantBroadcaster.pubsub_broadcast/5)
373373

374374
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
375-
assert %Ecto.Changeset{valid?: false} = result
375+
assert {:error, %Ecto.Changeset{valid?: false}} = result
376376
end
377377

378378
test "returns changeset error when payload is missing", %{tenant: tenant} do
@@ -382,7 +382,7 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
382382
reject(&TenantBroadcaster.pubsub_broadcast/5)
383383

384384
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
385-
assert %Ecto.Changeset{valid?: false} = result
385+
assert {:error, %Ecto.Changeset{valid?: false}} = result
386386
end
387387

388388
test "returns changeset error when event is missing", %{tenant: tenant} do
@@ -391,14 +391,14 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
391391

392392
reject(&TenantBroadcaster.pubsub_broadcast/5)
393393
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
394-
assert %Ecto.Changeset{valid?: false} = result
394+
assert {:error, %Ecto.Changeset{valid?: false}} = result
395395
end
396396

397397
test "returns changeset error when messages array is empty", %{tenant: tenant} do
398398
messages = %{messages: []}
399399
reject(&TenantBroadcaster.pubsub_broadcast/5)
400400
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
401-
assert %Ecto.Changeset{valid?: false} = result
401+
assert {:error, %Ecto.Changeset{valid?: false}} = result
402402
end
403403
end
404404

@@ -410,9 +410,7 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
410410

411411
RateCounter
412412
|> stub(:new, fn _ -> {:ok, nil} end)
413-
|> stub(:get, fn ^events_per_second_rate ->
414-
{:ok, %RateCounter{avg: tenant.max_events_per_second + 1}}
415-
end)
413+
|> stub(:get, fn ^events_per_second_rate -> {:ok, %RateCounter{avg: tenant.max_events_per_second + 1}} end)
416414

417415
reject(&TenantBroadcaster.pubsub_broadcast/5)
418416

@@ -482,10 +480,11 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
482480

483481
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
484482

485-
assert %Ecto.Changeset{
486-
valid?: false,
487-
changes: %{messages: [%{errors: [payload: {"Payload size exceeds tenant limit", []}]}]}
488-
} = result
483+
assert {:error,
484+
%Ecto.Changeset{
485+
valid?: false,
486+
changes: %{messages: [%{errors: [payload: {"Payload size exceeds tenant limit", []}]}]}
487+
}} = result
489488
end
490489
end
491490

test/realtime/tenants/replication_connection_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
228228
refute_receive %Phoenix.Socket.Broadcast{}, 500
229229
end)
230230

231-
assert logs =~ "UnableToBroadcastChanges: Payload size exceeds tenant limit"
231+
assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}"
232232
end
233233

234234
test "payload without id", %{tenant: tenant} do

test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
2727
socket
2828
end
2929

30-
Process.sleep(120)
31-
3230
for _ <- 1..100 do
3331
topic = "realtime:#{topic}"
3432
assert_receive {:socket_push, :text, data}
3533
message = data |> IO.iodata_to_binary() |> Jason.decode!()
3634
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
3735
end
3836

37+
Process.sleep(120)
38+
3939
{:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
4040
assert Enum.sum(buckets) == 100
4141
assert avg > 0
@@ -50,8 +50,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
5050
socket
5151
end
5252

53-
Process.sleep(120)
54-
5553
refute_received _any
5654

5755
{:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
@@ -68,15 +66,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
6866
socket
6967
end
7068

71-
Process.sleep(120)
72-
7369
for _ <- 1..100 do
7470
topic = "realtime:#{topic}"
7571
assert_received {:socket_push, :text, data}
7672
message = data |> IO.iodata_to_binary() |> Jason.decode!()
7773
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
7874
end
7975

76+
Process.sleep(120)
8077
{:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
8178
assert Enum.sum(buckets) == 100
8279
assert avg > 0.0
@@ -96,8 +93,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
9693
socket
9794
end
9895

99-
Process.sleep(120)
100-
10196
for _ <- 1..100 do
10297
topic = "realtime:#{topic}"
10398
assert_received {:socket_push, :text, data}
@@ -120,9 +115,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
120115
socket
121116
end
122117

123-
Process.sleep(120)
124-
125-
refute_received {:socket_push, :text, _}
118+
refute_received {:socket_push, :text, _}, 120
126119
end
127120

128121
@tag policies: [:read_matching_user_role, :write_matching_user_role], role: "anon"
@@ -139,8 +132,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
139132
socket
140133
end
141134

142-
Process.sleep(120)
143-
144135
for _ <- 1..100 do
145136
topic = "realtime:#{topic}"
146137
assert_received {:socket_push, :text, data}
@@ -163,9 +154,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
163154
socket
164155
end
165156

166-
Process.sleep(120)
167-
168-
refute_received {:socket_push, :text, _}
157+
refute_received {:socket_push, :text, _}, 120
169158
end
170159

171160
test "with nil policy and invalid user, won't send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do
@@ -177,8 +166,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
177166
socket
178167
end
179168

180-
Process.sleep(120)
181-
182169
refute_received _any
183170

184171
{:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
@@ -259,15 +246,14 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
259246
socket
260247
end
261248

262-
Process.sleep(120)
263-
264249
for _ <- 1..100 do
265250
topic = "realtime:#{topic}"
266251
assert_received {:socket_push, :text, data}
267252
message = data |> IO.iodata_to_binary() |> Jason.decode!()
268253
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
269254
end
270255

256+
Process.sleep(120)
271257
{:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
272258
assert Enum.sum(buckets) == 100
273259
assert avg > 0.0
@@ -290,6 +276,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
290276
end
291277

292278
Process.sleep(120)
279+
293280
{:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
294281
assert Enum.sum(buckets) == 100
295282
assert avg > 0.0
@@ -327,9 +314,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
327314
socket
328315
)
329316

330-
Process.sleep(120)
331-
332-
refute_received {:socket_push, :text, _}
317+
refute_received {:socket_push, :text, _}, 120
333318
end
334319

335320
test "handle payload size excedding limits in public channels", %{topic: topic, tenant: tenant, db_conn: db_conn} do
@@ -342,9 +327,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
342327
socket
343328
)
344329

345-
Process.sleep(120)
346-
347-
refute_received {:socket_push, :text, _}
330+
refute_received {:socket_push, :text, _}, 120
348331
end
349332

350333
test "handle payload size excedding limits in private channel and if ack it will receive error", %{
@@ -365,9 +348,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
365348
socket
366349
)
367350

368-
Process.sleep(120)
369-
370-
refute_received {:socket_push, :text, _}
351+
refute_received {:socket_push, :text, _}, 120
371352
end
372353

373354
test "handle payload size excedding limits in public channels and if ack it will receive error", %{
@@ -384,9 +365,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
384365
socket
385366
)
386367

387-
Process.sleep(120)
388-
389-
refute_received {:socket_push, :text, _}
368+
refute_received {:socket_push, :text, _}, 120
390369
end
391370
end
392371

test/realtime_web/controllers/broadcast_controller_test.exs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
158158
} do
159159
sub_topic_1 = "sub_topic_1"
160160
sub_topic_2 = "sub_topic_2"
161-
topic_1 = Tenants.tenant_topic(tenant, sub_topic_1)
162-
topic_2 = Tenants.tenant_topic(tenant, sub_topic_2)
163161

164162
payload_1 = %{"data" => "data"}
165163
payload_2 = %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 100)}

0 commit comments

Comments
 (0)