Skip to content

Commit d7faa75

Browse files
committed
add new tests and handling in replication connection and broadcast enpoint
1 parent ff17ba2 commit d7faa75

File tree

4 files changed

+58
-5
lines changed

4 files changed

+58
-5
lines changed

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,9 @@ defmodule Realtime.Tenants.BatchBroadcast do
130130
payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"}
131131

132132
payload =
133-
if message[:id] do
134-
Map.put(payload, "meta", %{"id" => message.id})
135-
else
136-
payload
137-
end
133+
if message[:id],
134+
do: Map.put(payload, "meta", %{"id" => message.id}),
135+
else: payload
138136

139137
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}
140138

lib/realtime/tenants/replication_connection.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ defmodule Realtime.Tenants.ReplicationConnection do
333333
log_error("UnableToBroadcastChanges", error)
334334
{:noreply, state}
335335

336+
%Ecto.Changeset{valid?: false, changes: %{messages: [%{errors: [payload: {error, _}]} | _]}} ->
337+
log_error("UnableToBroadcastChanges", error)
338+
{:noreply, state}
339+
336340
_ ->
337341
{:noreply, state}
338342
end

test/realtime/tenants/replication_connection_test.exs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,31 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
206206
assert logs =~ "UnableToBroadcastChanges"
207207
end
208208

209+
test "message that exceeds payload size logs error", %{tenant: tenant} do
210+
logs =
211+
capture_log(fn ->
212+
start_supervised!(
213+
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
214+
restart: :transient
215+
)
216+
217+
topic = random_string()
218+
tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false)
219+
assert :ok = Endpoint.subscribe(tenant_topic)
220+
221+
message_fixture(tenant, %{
222+
"event" => random_string(),
223+
"topic" => random_string(),
224+
"private" => true,
225+
"payload" => %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)}
226+
})
227+
228+
refute_receive %Phoenix.Socket.Broadcast{}, 500
229+
end)
230+
231+
assert logs =~ "UnableToBroadcastChanges: Payload size exceeds tenant limit"
232+
end
233+
209234
test "payload without id", %{tenant: tenant} do
210235
start_link_supervised!(
211236
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},

test/realtime_web/controllers/broadcast_controller_test.exs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,32 @@ defmodule RealtimeWeb.BroadcastControllerTest do
151151

152152
refute_receive {:socket_push, _, _}
153153
end
154+
155+
test "returns 422 when batch of messages includes a message that exceeds the tenant payload size", %{
156+
conn: conn,
157+
tenant: tenant
158+
} do
159+
sub_topic_1 = "sub_topic_1"
160+
sub_topic_2 = "sub_topic_2"
161+
topic_1 = Tenants.tenant_topic(tenant, sub_topic_1)
162+
topic_2 = Tenants.tenant_topic(tenant, sub_topic_2)
163+
164+
payload_1 = %{"data" => "data"}
165+
payload_2 = %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 100)}
166+
event_1 = "event_1"
167+
event_2 = "event_2"
168+
169+
conn =
170+
post(conn, Routes.broadcast_path(conn, :broadcast), %{
171+
"messages" => [
172+
%{"topic" => sub_topic_1, "payload" => payload_1, "event" => event_1},
173+
%{"topic" => sub_topic_1, "payload" => payload_1, "event" => event_1},
174+
%{"topic" => sub_topic_2, "payload" => payload_2, "event" => event_2}
175+
]
176+
})
177+
178+
assert conn.status == 422
179+
end
154180
end
155181

156182
describe "too many requests" do

0 commit comments

Comments
 (0)