Skip to content

Commit ffe3480

Browse files
committed
pr feedback
1 parent 9205e20 commit ffe3480

File tree

9 files changed

+29
-64
lines changed

9 files changed

+29
-64
lines changed

lib/realtime/tenants.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,9 @@ defmodule Realtime.Tenants do
511511
|> validate_payload_size(payload)
512512
end
513513

514+
@payload_size_padding 500
514515
def validate_payload_size(%Tenant{max_payload_size_in_kb: max_payload_size_in_kb}, payload) do
515-
max_payload_size = max_payload_size_in_kb * 1000
516+
max_payload_size = max_payload_size_in_kb * 1000 + @payload_size_padding
516517
payload_size = :erlang.external_size(payload)
517518
if payload_size > max_payload_size, do: {:error, :payload_size_exceeded}, else: :ok
518519
end

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
3333
messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()})
3434
},
3535
super_user :: boolean()
36-
) :: :ok | {:error, atom()}
36+
) :: :ok | {:error, atom() | %Ecto.Changeset{}}
3737
def broadcast(auth_params, tenant, messages, super_user \\ false)
3838

3939
def broadcast(%Plug.Conn{} = conn, %Tenant{} = tenant, messages, super_user) do
@@ -71,15 +71,11 @@ defmodule Realtime.Tenants.BatchBroadcast do
7171
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
7272
|> Enum.each(fn {topic, events} ->
7373
if super_user do
74-
Enum.each(events, fn message ->
75-
send_message_and_count(tenant, events_per_second_rate, message, false)
76-
end)
74+
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
7775
else
7876
case permissions_for_message(tenant, auth_params, topic) do
7977
%Policies{broadcast: %BroadcastPolicies{write: true}} ->
80-
Enum.each(events, fn message ->
81-
send_message_and_count(tenant, events_per_second_rate, message, false)
82-
end)
78+
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
8379

8480
_ ->
8581
nil
@@ -88,6 +84,9 @@ defmodule Realtime.Tenants.BatchBroadcast do
8884
end)
8985

9086
:ok
87+
else
88+
%Ecto.Changeset{valid?: false} = changeset -> {:error, changeset}
89+
{:error, error} -> {:error, error}
9190
end
9291
end
9392

lib/realtime/tenants/replication_connection.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,12 @@ defmodule Realtime.Tenants.ReplicationConnection do
329329

330330
{:noreply, state}
331331
else
332-
{:error, error} ->
332+
{:error, %Ecto.Changeset{valid?: false} = changeset} ->
333+
error = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
333334
log_error("UnableToBroadcastChanges", error)
334335
{:noreply, state}
335336

336-
%Ecto.Changeset{valid?: false, changes: %{messages: [%{errors: [payload: {error, _}]} | _]}} ->
337+
{:error, error} ->
337338
log_error("UnableToBroadcastChanges", error)
338339
{:noreply, state}
339340

lib/realtime_web/channels/realtime_channel/presence_handler.ex

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,5 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
161161
end
162162
end
163163

164-
# Added due to the fact that JSON decoding adds some overhead and erlang term will be slighly larger
165-
@payload_size_padding 500
166-
defp validate_payload_size(tenant, payload) do
167-
if :erlang.external_size(payload) > tenant.max_payload_size_in_kb * 1000 + @payload_size_padding do
168-
{:error, :payload_size_exceeded}
169-
else
170-
:ok
171-
end
172-
end
164+
defp validate_payload_size(tenant, payload), do: Tenants.validate_payload_size(tenant, payload)
173165
end

lib/realtime_web/controllers/fallback_controller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule RealtimeWeb.FallbackController do
4545
|> render("error.json", message: message)
4646
end
4747

48-
def call(conn, %Ecto.Changeset{valid?: true} = changeset) do
48+
def call(conn, {:error, %Ecto.Changeset{valid?: false} = changeset}) do
4949
log_error(
5050
"UnprocessableEntity",
5151
Ecto.Changeset.traverse_errors(changeset, &translate_error/1)

test/realtime/tenants/batch_broadcast_test.exs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
372372
reject(&TenantBroadcaster.pubsub_broadcast/5)
373373

374374
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
375-
assert %Ecto.Changeset{valid?: false} = result
375+
assert {:error, %Ecto.Changeset{valid?: false}} = result
376376
end
377377

378378
test "returns changeset error when payload is missing", %{tenant: tenant} do
@@ -382,7 +382,7 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
382382
reject(&TenantBroadcaster.pubsub_broadcast/5)
383383

384384
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
385-
assert %Ecto.Changeset{valid?: false} = result
385+
assert {:error, %Ecto.Changeset{valid?: false}} = result
386386
end
387387

388388
test "returns changeset error when event is missing", %{tenant: tenant} do
@@ -391,14 +391,14 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
391391

392392
reject(&TenantBroadcaster.pubsub_broadcast/5)
393393
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
394-
assert %Ecto.Changeset{valid?: false} = result
394+
assert {:error, %Ecto.Changeset{valid?: false}} = result
395395
end
396396

397397
test "returns changeset error when messages array is empty", %{tenant: tenant} do
398398
messages = %{messages: []}
399399
reject(&TenantBroadcaster.pubsub_broadcast/5)
400400
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
401-
assert %Ecto.Changeset{valid?: false} = result
401+
assert {:error, %Ecto.Changeset{valid?: false}} = result
402402
end
403403
end
404404

@@ -482,10 +482,11 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
482482

483483
result = BatchBroadcast.broadcast(nil, tenant, messages, false)
484484

485-
assert %Ecto.Changeset{
486-
valid?: false,
487-
changes: %{messages: [%{errors: [payload: {"Payload size exceeds tenant limit", []}]}]}
488-
} = result
485+
assert {:error,
486+
%Ecto.Changeset{
487+
valid?: false,
488+
changes: %{messages: [%{errors: [payload: {"Payload size exceeds tenant limit", []}]}]}
489+
}} = result
489490
end
490491
end
491492

test/realtime/tenants/replication_connection_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
228228
refute_receive %Phoenix.Socket.Broadcast{}, 500
229229
end)
230230

231-
assert logs =~ "UnableToBroadcastChanges: Payload size exceeds tenant limit"
231+
assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}"
232232
end
233233

234234
test "payload without id", %{tenant: tenant} do

test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
2727
socket
2828
end
2929

30-
Process.sleep(120)
31-
3230
for _ <- 1..100 do
3331
topic = "realtime:#{topic}"
3432
assert_receive {:socket_push, :text, data}
@@ -50,8 +48,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
5048
socket
5149
end
5250

53-
Process.sleep(120)
54-
5551
refute_received _any
5652

5753
{:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
@@ -68,8 +64,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
6864
socket
6965
end
7066

71-
Process.sleep(120)
72-
7367
for _ <- 1..100 do
7468
topic = "realtime:#{topic}"
7569
assert_received {:socket_push, :text, data}
@@ -96,8 +90,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
9690
socket
9791
end
9892

99-
Process.sleep(120)
100-
10193
for _ <- 1..100 do
10294
topic = "realtime:#{topic}"
10395
assert_received {:socket_push, :text, data}
@@ -120,9 +112,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
120112
socket
121113
end
122114

123-
Process.sleep(120)
124-
125-
refute_received {:socket_push, :text, _}
115+
refute_received {:socket_push, :text, _}, 120
126116
end
127117

128118
@tag policies: [:read_matching_user_role, :write_matching_user_role], role: "anon"
@@ -139,8 +129,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
139129
socket
140130
end
141131

142-
Process.sleep(120)
143-
144132
for _ <- 1..100 do
145133
topic = "realtime:#{topic}"
146134
assert_received {:socket_push, :text, data}
@@ -163,9 +151,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
163151
socket
164152
end
165153

166-
Process.sleep(120)
167-
168-
refute_received {:socket_push, :text, _}
154+
refute_received {:socket_push, :text, _}, 120
169155
end
170156

171157
test "with nil policy and invalid user, won't send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do
@@ -177,8 +163,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
177163
socket
178164
end
179165

180-
Process.sleep(120)
181-
182166
refute_received _any
183167

184168
{:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
@@ -259,8 +243,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
259243
socket
260244
end
261245

262-
Process.sleep(120)
263-
264246
for _ <- 1..100 do
265247
topic = "realtime:#{topic}"
266248
assert_received {:socket_push, :text, data}
@@ -289,7 +271,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
289271
assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic}
290272
end
291273

292-
Process.sleep(120)
293274
{:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant))
294275
assert Enum.sum(buckets) == 100
295276
assert avg > 0.0
@@ -327,9 +308,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
327308
socket
328309
)
329310

330-
Process.sleep(120)
331-
332-
refute_received {:socket_push, :text, _}
311+
refute_received {:socket_push, :text, _}, 120
333312
end
334313

335314
test "handle payload size excedding limits in public channels", %{topic: topic, tenant: tenant, db_conn: db_conn} do
@@ -342,9 +321,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
342321
socket
343322
)
344323

345-
Process.sleep(120)
346-
347-
refute_received {:socket_push, :text, _}
324+
refute_received {:socket_push, :text, _}, 120
348325
end
349326

350327
test "handle payload size excedding limits in private channel and if ack it will receive error", %{
@@ -365,9 +342,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
365342
socket
366343
)
367344

368-
Process.sleep(120)
369-
370-
refute_received {:socket_push, :text, _}
345+
refute_received {:socket_push, :text, _}, 120
371346
end
372347

373348
test "handle payload size excedding limits in public channels and if ack it will receive error", %{
@@ -384,9 +359,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do
384359
socket
385360
)
386361

387-
Process.sleep(120)
388-
389-
refute_received {:socket_push, :text, _}
362+
refute_received {:socket_push, :text, _}, 120
390363
end
391364
end
392365

test/realtime_web/controllers/broadcast_controller_test.exs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
158158
} do
159159
sub_topic_1 = "sub_topic_1"
160160
sub_topic_2 = "sub_topic_2"
161-
topic_1 = Tenants.tenant_topic(tenant, sub_topic_1)
162-
topic_2 = Tenants.tenant_topic(tenant, sub_topic_2)
163161

164162
payload_1 = %{"data" => "data"}
165163
payload_2 = %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 100)}

0 commit comments

Comments
 (0)