diff --git a/.tool-versions b/.tool-versions index 9ac56e91e..8184afb36 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,3 +1,3 @@ -elixir 1.18.4 +elixir 1.18.4-otp-27 nodejs 18.13.0 erlang 27 diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 9e53e18f1..6b60caf39 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -501,4 +501,20 @@ defmodule Realtime.Tenants do @spec region(Tenant.t()) :: String.t() | nil def region(%Tenant{extensions: [%{settings: settings}]}), do: Map.get(settings, "region") def region(_), do: nil + + @doc """ + """ + @spec validate_payload_size(Tenant.t() | binary(), map()) :: :ok | {:error, :payload_size_exceeded} + def validate_payload_size(tenant_id, payload) when is_binary(tenant_id) do + tenant_id + |> Cache.get_tenant_by_external_id() + |> validate_payload_size(payload) + end + + @payload_size_padding 500 + def validate_payload_size(%Tenant{max_payload_size_in_kb: max_payload_size_in_kb}, payload) do + max_payload_size = max_payload_size_in_kb * 1000 + @payload_size_padding + payload_size = :erlang.external_size(payload) + if payload_size > max_payload_size, do: {:error, :payload_size_exceeded}, else: :ok + end end diff --git a/lib/realtime/tenants/batch_broadcast.ex b/lib/realtime/tenants/batch_broadcast.ex index 9e4ed4c3c..18b5823f1 100644 --- a/lib/realtime/tenants/batch_broadcast.ex +++ b/lib/realtime/tenants/batch_broadcast.ex @@ -33,7 +33,7 @@ defmodule Realtime.Tenants.BatchBroadcast do messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()}) }, super_user :: boolean() - ) :: :ok | {:error, atom()} + ) :: :ok | {:error, atom() | Ecto.Changeset.t()} def broadcast(auth_params, tenant, messages, super_user \\ false) def broadcast(%Plug.Conn{} = conn, %Tenant{} = tenant, messages, super_user) do @@ -49,7 +49,7 @@ defmodule Realtime.Tenants.BatchBroadcast do end def broadcast(auth_params, %Tenant{} = tenant, messages, super_user) do - with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages), + with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages, tenant), %Ecto.Changeset{changes: %{messages: messages}} = changeset, events_per_second_rate = Tenants.events_per_second_rate(tenant), :ok <- check_rate_limit(events_per_second_rate, tenant, length(messages)) do @@ -71,15 +71,11 @@ defmodule Realtime.Tenants.BatchBroadcast do |> Enum.group_by(fn event -> Map.get(event, :topic) end) |> Enum.each(fn {topic, events} -> if super_user do - Enum.each(events, fn message -> - send_message_and_count(tenant, events_per_second_rate, message, false) - end) + Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end) else case permissions_for_message(tenant, auth_params, topic) do %Policies{broadcast: %BroadcastPolicies{write: true}} -> - Enum.each(events, fn message -> - send_message_and_count(tenant, events_per_second_rate, message, false) - end) + Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end) _ -> nil @@ -88,22 +84,26 @@ defmodule Realtime.Tenants.BatchBroadcast do end) :ok + else + %Ecto.Changeset{valid?: false} = changeset -> {:error, changeset} + error -> error end end def broadcast(_, nil, _, _), do: {:error, :tenant_not_found} - defp changeset(payload, attrs) do + defp changeset(payload, attrs, tenant) do payload |> cast(attrs, []) - |> cast_embed(:messages, required: true, with: &message_changeset/2) + |> cast_embed(:messages, required: true, with: fn message, attrs -> message_changeset(message, tenant, attrs) end) end - defp message_changeset(message, attrs) do + defp message_changeset(message, tenant, attrs) do message |> cast(attrs, [:id, :topic, :payload, :event, :private]) |> maybe_put_private_change() |> validate_required([:topic, :payload, :event]) + |> validate_payload_size(tenant) end defp maybe_put_private_change(changeset) do @@ -113,6 +113,15 @@ defmodule Realtime.Tenants.BatchBroadcast do end end + defp validate_payload_size(changeset, tenant) do + payload = get_change(changeset, :payload) + + case Tenants.validate_payload_size(tenant, payload) do + :ok -> changeset + _ -> add_error(changeset, :payload, "Payload size exceeds tenant limit") + end + end + @event_type "broadcast" defp send_message_and_count(tenant, events_per_second_rate, message, public?) do tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?) @@ -120,11 +129,9 @@ defmodule Realtime.Tenants.BatchBroadcast do payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"} payload = - if message[:id] do - Map.put(payload, "meta", %{"id" => message.id}) - else - payload - end + if message[:id], + do: Map.put(payload, "meta", %{"id" => message.id}), + else: payload broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload} diff --git a/lib/realtime/tenants/replication_connection.ex b/lib/realtime/tenants/replication_connection.ex index 4ebb1f8e8..d5793a997 100644 --- a/lib/realtime/tenants/replication_connection.ex +++ b/lib/realtime/tenants/replication_connection.ex @@ -329,6 +329,11 @@ defmodule Realtime.Tenants.ReplicationConnection do {:noreply, state} else + {:error, %Ecto.Changeset{valid?: false} = changeset} -> + error = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0)) + log_error("UnableToBroadcastChanges", error) + {:noreply, state} + {:error, error} -> log_error("UnableToBroadcastChanges", error) {:noreply, state} diff --git a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex index 036ad9159..0bbc0c157 100644 --- a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex @@ -6,6 +6,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do import Phoenix.Socket, only: [assign: 3] + alias Realtime.Tenants alias RealtimeWeb.RealtimeChannel alias RealtimeWeb.TenantBroadcaster alias Phoenix.Socket @@ -38,8 +39,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do |> increment_rate_counter() %{ack_broadcast: ack_broadcast} = socket.assigns - send_message(tenant_id, self_broadcast, tenant_topic, payload) - if ack_broadcast, do: {:reply, :ok, socket}, else: {:noreply, socket} + + res = + case Tenants.validate_payload_size(tenant_id, payload) do + :ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload) + {:error, error} -> {:error, error} + end + + cond do + ack_broadcast && match?({:error, :payload_size_exceeded}, res) -> + {:reply, {:error, :payload_size_exceeded}, socket} + + ack_broadcast -> + {:reply, :ok, socket} + + true -> + {:noreply, socket} + end {:ok, policies} -> {:noreply, assign(socket, :policies, policies)} @@ -65,11 +81,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do } = socket socket = increment_rate_counter(socket) - send_message(tenant_id, self_broadcast, tenant_topic, payload) - if ack_broadcast, - do: {:reply, :ok, socket}, - else: {:noreply, socket} + res = + case Tenants.validate_payload_size(tenant_id, payload) do + :ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload) + {:error, error} -> {:error, error} + end + + cond do + ack_broadcast && match?({:error, :payload_size_exceeded}, res) -> + {:reply, {:error, :payload_size_exceeded}, socket} + + ack_broadcast -> + {:reply, :ok, socket} + + true -> + {:noreply, socket} + end end defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do diff --git a/lib/realtime_web/channels/realtime_channel/presence_handler.ex b/lib/realtime_web/channels/realtime_channel/presence_handler.ex index d5a184caa..69a941c01 100644 --- a/lib/realtime_web/channels/realtime_channel/presence_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/presence_handler.ex @@ -161,13 +161,5 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do end end - # Added due to the fact that JSON decoding adds some overhead and erlang term will be slighly larger - @payload_size_padding 500 - defp validate_payload_size(tenant, payload) do - if :erlang.external_size(payload) > tenant.max_payload_size_in_kb * 1000 + @payload_size_padding do - {:error, :payload_size_exceeded} - else - :ok - end - end + defp validate_payload_size(tenant, payload), do: Tenants.validate_payload_size(tenant, payload) end diff --git a/lib/realtime_web/controllers/fallback_controller.ex b/lib/realtime_web/controllers/fallback_controller.ex index d83d1d681..3a76fb49f 100644 --- a/lib/realtime_web/controllers/fallback_controller.ex +++ b/lib/realtime_web/controllers/fallback_controller.ex @@ -45,7 +45,7 @@ defmodule RealtimeWeb.FallbackController do |> render("error.json", message: message) end - def call(conn, %Ecto.Changeset{valid?: true} = changeset) do + def call(conn, {:error, %Ecto.Changeset{valid?: false} = changeset}) do log_error( "UnprocessableEntity", Ecto.Changeset.traverse_errors(changeset, &translate_error/1) diff --git a/mix.exs b/mix.exs index b3b20e5a4..b67066539 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.56.5", + version: "2.57.0", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/e2e/tests.ts b/test/e2e/tests.ts index 2711a959e..de55640a7 100644 --- a/test/e2e/tests.ts +++ b/test/e2e/tests.ts @@ -1,8 +1,5 @@ import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts"; -import { - createClient, - SupabaseClient, -} from "npm:@supabase/supabase-js@2.49.5-next.5"; +import { createClient, SupabaseClient } from "npm:@supabase/supabase-js@latest"; import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts"; import { describe, diff --git a/test/realtime/tenants/batch_broadcast_test.exs b/test/realtime/tenants/batch_broadcast_test.exs new file mode 100644 index 000000000..2071aac64 --- /dev/null +++ b/test/realtime/tenants/batch_broadcast_test.exs @@ -0,0 +1,529 @@ +defmodule Realtime.Tenants.BatchBroadcastTest do + use RealtimeWeb.ConnCase, async: true + use Mimic + + alias Realtime.Database + alias Realtime.GenCounter + alias Realtime.RateCounter + alias Realtime.Tenants + alias Realtime.Tenants.BatchBroadcast + alias Realtime.Tenants.Authorization + alias Realtime.Tenants.Authorization.Policies + alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies + alias Realtime.Tenants.Connect + + alias RealtimeWeb.TenantBroadcaster + + setup do + tenant = Containers.checkout_tenant(run_migrations: true) + Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant}) + {:ok, tenant: tenant} + end + + describe "public message broadcasting" do + test "broadcasts multiple public messages successfully", %{tenant: tenant} do + broadcast_events_key = Tenants.events_per_second_key(tenant) + topic1 = random_string() + topic2 = random_string() + + messages = %{ + messages: [ + %{topic: topic1, payload: %{"data" => "test1"}, event: "event1"}, + %{topic: topic2, payload: %{"data" => "test2"}, event: "event2"}, + %{topic: topic1, payload: %{"data" => "test3"}, event: "event3"} + ] + } + + expect(GenCounter, :add, 3, fn ^broadcast_events_key -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 3, fn _, _, _, _, _ -> :ok end) + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, false) + end + + test "public messages do not have private prefix in topic", %{tenant: tenant} do + broadcast_events_key = Tenants.events_per_second_key(tenant) + topic = random_string() + + messages = %{ + messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1"}] + } + + expect(GenCounter, :add, fn ^broadcast_events_key -> :ok end) + + expect(TenantBroadcaster, :pubsub_broadcast, fn _, topic, _, _, _ -> + refute String.contains?(topic, "-private") + end) + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, false) + end + end + + describe "message ID metadata" do + test "includes message ID in metadata when provided", %{tenant: tenant} do + broadcast_events_key = Tenants.events_per_second_key(tenant) + topic = random_string() + + messages = %{ + messages: [%{id: "msg-123", topic: topic, payload: %{"data" => "test"}, event: "event1"}] + } + + expect(GenCounter, :add, fn ^broadcast_events_key -> :ok end) + + expect(TenantBroadcaster, :pubsub_broadcast, fn _, _, broadcast, _, _ -> + assert %Phoenix.Socket.Broadcast{ + payload: %{ + "payload" => %{"data" => "test"}, + "event" => "event1", + "type" => "broadcast", + "meta" => %{"id" => "msg-123"} + } + } = broadcast + end) + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, false) + end + end + + describe "super user broadcasting" do + test "bypasses authorization for private messages with super_user flag", %{tenant: tenant} do + broadcast_events_key = Tenants.events_per_second_key(tenant) + topic1 = random_string() + topic2 = random_string() + + messages = %{ + messages: [ + %{topic: topic1, payload: %{"data" => "test1"}, event: "event1", private: true}, + %{topic: topic2, payload: %{"data" => "test2"}, event: "event2", private: true} + ] + } + + expect(GenCounter, :add, 2, fn ^broadcast_events_key -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 2, fn _, _, _, _, _ -> :ok end) + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, true) + end + + test "private messages have private prefix in topic", %{tenant: tenant} do + broadcast_events_key = Tenants.events_per_second_key(tenant) + topic = random_string() + + messages = %{ + messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1", private: true}] + } + + expect(GenCounter, :add, fn ^broadcast_events_key -> :ok end) + + expect(TenantBroadcaster, :pubsub_broadcast, fn _, topic, _, _, _ -> + assert String.contains?(topic, "-private") + end) + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, true) + end + end + + describe "private message authorization" do + test "broadcasts private messages with valid authorization", %{tenant: tenant} do + topic = random_string() + sub = random_string() + role = "authenticated" + + auth_params = %{ + tenant_id: tenant.external_id, + topic: topic, + headers: [{"header-1", "value-1"}], + claims: %{"sub" => sub, "role" => role, "exp" => Joken.current_time() + 1_000}, + role: role, + sub: sub + } + + messages = %{messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1", private: true}]} + + broadcast_events_key = Tenants.events_per_second_key(tenant) + + expect(GenCounter, :add, 1, fn ^broadcast_events_key -> :ok end) + + Authorization + |> expect(:build_authorization_params, fn params -> params end) + |> expect(:get_write_authorizations, fn _, _ -> {:ok, %Policies{broadcast: %BroadcastPolicies{write: true}}} end) + + expect(TenantBroadcaster, :pubsub_broadcast, 1, fn _, _, _, _, _ -> :ok end) + + assert :ok = BatchBroadcast.broadcast(auth_params, tenant, messages, false) + end + + test "skips private messages without authorization", %{tenant: tenant} do + topic = random_string() + sub = random_string() + role = "anon" + + auth_params = %{ + tenant_id: tenant.external_id, + topic: topic, + headers: [{"header-1", "value-1"}], + claims: %{"sub" => sub, "role" => role, "exp" => Joken.current_time() + 1_000}, + role: role, + sub: sub + } + + Authorization + |> expect(:build_authorization_params, 1, fn params -> params end) + |> expect(:get_write_authorizations, 1, fn _, _ -> + {:ok, %Policies{broadcast: %BroadcastPolicies{write: false}}} + end) + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + messages = %{ + messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1", private: true}] + } + + assert :ok = BatchBroadcast.broadcast(auth_params, tenant, messages, false) + + assert calls(&TenantBroadcaster.pubsub_broadcast/5) == [] + end + + test "broadcasts only authorized topics in mixed authorization batch", %{tenant: tenant} do + topic = random_string() + sub = random_string() + role = "authenticated" + + auth_params = %{ + tenant_id: tenant.external_id, + headers: [{"header-1", "value-1"}], + claims: %{"sub" => sub, "role" => role, "exp" => Joken.current_time() + 1_000}, + role: role, + sub: sub + } + + messages = %{ + messages: [ + %{topic: topic, payload: %{"data" => "test1"}, event: "event1", private: true}, + %{topic: random_string(), payload: %{"data" => "test2"}, event: "event2", private: true} + ] + } + + broadcast_events_key = Tenants.events_per_second_key(tenant) + + expect(GenCounter, :add, fn ^broadcast_events_key -> :ok end) + + Authorization + |> expect(:build_authorization_params, 2, fn params -> params end) + |> expect(:get_write_authorizations, 2, fn + _, %{topic: ^topic} -> %Policies{broadcast: %BroadcastPolicies{write: true}} + _, _ -> %Policies{broadcast: %BroadcastPolicies{write: false}} + end) + + # Only one topic will actually be broadcasted + expect(TenantBroadcaster, :pubsub_broadcast, 1, fn _, _, %Phoenix.Socket.Broadcast{topic: ^topic}, _, _ -> + :ok + end) + + assert :ok = BatchBroadcast.broadcast(auth_params, tenant, messages, false) + end + + test "groups messages by topic and checks authorization once per topic", %{tenant: tenant} do + topic_1 = random_string() + topic_2 = random_string() + sub = random_string() + role = "authenticated" + + auth_params = %{ + tenant_id: tenant.external_id, + headers: [{"header-1", "value-1"}], + claims: %{"sub" => sub, "role" => role, "exp" => Joken.current_time() + 1_000}, + role: role, + sub: sub + } + + messages = %{ + messages: [ + %{topic: topic_1, payload: %{"data" => "test1"}, event: "event1", private: true}, + %{topic: topic_2, payload: %{"data" => "test2"}, event: "event2", private: true}, + %{topic: topic_1, payload: %{"data" => "test3"}, event: "event3", private: true} + ] + } + + broadcast_events_key = Tenants.events_per_second_key(tenant) + + expect(GenCounter, :add, 3, fn ^broadcast_events_key -> :ok end) + + Authorization + |> expect(:build_authorization_params, 2, fn params -> params end) + |> expect(:get_write_authorizations, 2, fn _, _ -> + {:ok, %Policies{broadcast: %BroadcastPolicies{write: true}}} + end) + + expect(TenantBroadcaster, :pubsub_broadcast, 3, fn _, _, _, _, _ -> :ok end) + + assert :ok = BatchBroadcast.broadcast(auth_params, tenant, messages, false) + end + + test "handles missing auth params for private messages", %{tenant: tenant} do + events_per_second_rate = Tenants.events_per_second_rate(tenant) + + RateCounter + |> stub(:new, fn _ -> {:ok, nil} end) + |> stub(:get, fn ^events_per_second_rate -> {:ok, %RateCounter{avg: 0}} end) + + reject(&TenantBroadcaster.pubsub_broadcast/5) + reject(&Connect.lookup_or_start_connection/1) + + messages = %{ + messages: [%{topic: "topic1", payload: %{"data" => "test"}, event: "event1", private: true}] + } + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, false) + + assert calls(&TenantBroadcaster.pubsub_broadcast/5) == [] + end + end + + describe "mixed public and private messages" do + setup %{tenant: tenant} do + {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) + %{db_conn: db_conn} + end + + test "broadcasts both public and private messages together", %{tenant: tenant, db_conn: db_conn} do + topic = random_string() + sub = random_string() + role = "authenticated" + + create_rls_policies(db_conn, [:authenticated_write_broadcast], %{topic: topic}) + + auth_params = %{ + tenant_id: tenant.external_id, + topic: topic, + headers: [{"header-1", "value-1"}], + claims: %{"sub" => sub, "role" => role, "exp" => Joken.current_time() + 1_000}, + role: role, + sub: sub + } + + events_per_second_rate = Tenants.events_per_second_rate(tenant) + broadcast_events_key = Tenants.events_per_second_key(tenant) + + RateCounter + |> stub(:new, fn _ -> {:ok, nil} end) + |> stub(:get, fn + ^events_per_second_rate -> + {:ok, %RateCounter{avg: 0}} + + _ -> + {:ok, + %RateCounter{ + avg: 0, + limit: %{log: true, value: 10, measurement: :sum, triggered: false, log_fn: fn -> :ok end} + }} + end) + + expect(GenCounter, :add, 3, fn ^broadcast_events_key -> :ok end) + expect(Connect, :lookup_or_start_connection, fn _ -> {:ok, db_conn} end) + + Authorization + |> expect(:build_authorization_params, fn params -> params end) + |> expect(:get_write_authorizations, fn _, _ -> + {:ok, %Policies{broadcast: %BroadcastPolicies{write: true}}} + end) + + expect(TenantBroadcaster, :pubsub_broadcast, 3, fn _, _, _, _, _ -> :ok end) + + messages = %{ + messages: [ + %{topic: "public1", payload: %{"data" => "public"}, event: "event1", private: false}, + %{topic: topic, payload: %{"data" => "private"}, event: "event2", private: true}, + %{topic: "public2", payload: %{"data" => "public2"}, event: "event3"} + ] + } + + assert :ok = BatchBroadcast.broadcast(auth_params, tenant, messages, false) + + broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5) + assert length(broadcast_calls) == 3 + end + end + + describe "Plug.Conn integration" do + test "accepts and converts Plug.Conn to auth params", %{tenant: tenant} do + topic = random_string() + broadcast_events_key = Tenants.events_per_second_key(tenant) + messages = %{messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1"}]} + + expect(GenCounter, :add, fn ^broadcast_events_key -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 1, fn _, _, _, _, _ -> :ok end) + + conn = + build_conn() + |> Map.put(:assigns, %{ + claims: %{"sub" => "user123", "role" => "authenticated"}, + role: "authenticated", + sub: "user123" + }) + |> Map.put(:req_headers, [{"authorization", "Bearer token"}]) + + assert :ok = BatchBroadcast.broadcast(conn, tenant, messages, false) + end + end + + describe "message validation" do + test "returns changeset error when topic is missing", %{tenant: tenant} do + messages = %{messages: [%{payload: %{"data" => "test"}, event: "event1"}]} + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + assert {:error, %Ecto.Changeset{valid?: false}} = result + end + + test "returns changeset error when payload is missing", %{tenant: tenant} do + topic = random_string() + messages = %{messages: [%{topic: topic, event: "event1"}]} + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + assert {:error, %Ecto.Changeset{valid?: false}} = result + end + + test "returns changeset error when event is missing", %{tenant: tenant} do + topic = random_string() + messages = %{messages: [%{topic: topic, payload: %{"data" => "test"}}]} + + reject(&TenantBroadcaster.pubsub_broadcast/5) + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + assert {:error, %Ecto.Changeset{valid?: false}} = result + end + + test "returns changeset error when messages array is empty", %{tenant: tenant} do + messages = %{messages: []} + reject(&TenantBroadcaster.pubsub_broadcast/5) + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + assert {:error, %Ecto.Changeset{valid?: false}} = result + end + end + + describe "rate limiting" do + test "rejects broadcast when rate limit is exceeded", %{tenant: tenant} do + events_per_second_rate = Tenants.events_per_second_rate(tenant) + topic = random_string() + messages = %{messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1"}]} + + RateCounter + |> stub(:new, fn _ -> {:ok, nil} end) + |> stub(:get, fn ^events_per_second_rate -> {:ok, %RateCounter{avg: tenant.max_events_per_second + 1}} end) + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + assert {:error, :too_many_requests, "You have exceeded your rate limit"} = result + end + + test "rejects broadcast when batch would exceed rate limit", %{tenant: tenant} do + events_per_second_rate = Tenants.events_per_second_rate(tenant) + + messages = %{ + messages: + Enum.map(1..10, fn _ -> + %{topic: random_string(), payload: %{"data" => "test"}, event: random_string()} + end) + } + + RateCounter + |> stub(:new, fn _ -> {:ok, nil} end) + |> stub(:get, fn ^events_per_second_rate -> + {:ok, %RateCounter{avg: tenant.max_events_per_second - 5}} + end) + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + + assert {:error, :too_many_requests, "Too many messages to broadcast, please reduce the batch size"} = result + end + + test "allows broadcast at rate limit boundary", %{tenant: tenant} do + events_per_second_rate = Tenants.events_per_second_rate(tenant) + broadcast_events_key = Tenants.events_per_second_key(tenant) + current_rate = tenant.max_events_per_second - 2 + + messages = %{ + messages: [ + %{topic: random_string(), payload: %{"data" => "test1"}, event: "event1"}, + %{topic: random_string(), payload: %{"data" => "test2"}, event: "event2"} + ] + } + + RateCounter + |> stub(:new, fn _ -> {:ok, nil} end) + |> stub(:get, fn ^events_per_second_rate -> + {:ok, %RateCounter{avg: current_rate}} + end) + + expect(GenCounter, :add, 2, fn ^broadcast_events_key -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 2, fn _, _, _, _, _ -> :ok end) + + assert :ok = BatchBroadcast.broadcast(nil, tenant, messages, false) + end + + test "rejects broadcast when payload size exceeds tenant limit", %{tenant: tenant} do + messages = %{ + messages: [ + %{ + topic: random_string(), + payload: %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)}, + event: "event1" + } + ] + } + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + result = BatchBroadcast.broadcast(nil, tenant, messages, false) + + assert {:error, + %Ecto.Changeset{ + valid?: false, + changes: %{messages: [%{errors: [payload: {"Payload size exceeds tenant limit", []}]}]} + }} = result + end + end + + describe "error handling" do + test "returns error when tenant is nil" do + messages = %{messages: [%{topic: "topic1", payload: %{"data" => "test"}, event: "event1"}]} + assert {:error, :tenant_not_found} = BatchBroadcast.broadcast(nil, nil, messages, false) + end + + test "gracefully handles database connection errors for private messages", %{tenant: tenant} do + topic = random_string() + sub = random_string() + role = "authenticated" + + auth_params = %{ + tenant_id: tenant.external_id, + headers: [{"header-1", "value-1"}], + claims: %{"sub" => sub, "role" => role, "exp" => Joken.current_time() + 1_000}, + role: role, + sub: sub + } + + events_per_second_rate = Tenants.events_per_second_rate(tenant) + + RateCounter + |> stub(:new, fn _ -> {:ok, nil} end) + |> stub(:get, fn ^events_per_second_rate -> {:ok, %RateCounter{avg: 0}} end) + + expect(Connect, :lookup_or_start_connection, fn _ -> {:error, :connection_failed} end) + + reject(&TenantBroadcaster.pubsub_broadcast/5) + + messages = %{ + messages: [%{topic: topic, payload: %{"data" => "test"}, event: "event1", private: true}] + } + + assert :ok = BatchBroadcast.broadcast(auth_params, tenant, messages, false) + + assert calls(&TenantBroadcaster.pubsub_broadcast/5) == [] + end + end +end diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index b28a23988..03b922fa4 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -206,6 +206,31 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do assert logs =~ "UnableToBroadcastChanges" end + test "message that exceeds payload size logs error", %{tenant: tenant} do + logs = + capture_log(fn -> + start_supervised!( + {ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}}, + restart: :transient + ) + + topic = random_string() + tenant_topic = Tenants.tenant_topic(tenant.external_id, topic, false) + assert :ok = Endpoint.subscribe(tenant_topic) + + message_fixture(tenant, %{ + "event" => random_string(), + "topic" => random_string(), + "private" => true, + "payload" => %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)} + }) + + refute_receive %Phoenix.Socket.Broadcast{}, 500 + end) + + assert logs =~ "UnableToBroadcastChanges: %{messages: [%{payload: [\"Payload size exceeds tenant limit\"]}]}" + end + test "payload without id", %{tenant: tenant} do start_link_supervised!( {ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}}, diff --git a/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs b/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs index 2cd7005df..83a239b46 100644 --- a/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs +++ b/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs @@ -27,8 +27,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - for _ <- 1..100 do topic = "realtime:#{topic}" assert_receive {:socket_push, :text, data} @@ -36,6 +34,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} end + Process.sleep(120) + {:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) assert Enum.sum(buckets) == 100 assert avg > 0 @@ -50,8 +50,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - refute_received _any {:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) @@ -68,8 +66,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} @@ -77,6 +73,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} end + Process.sleep(120) {:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) assert Enum.sum(buckets) == 100 assert avg > 0.0 @@ -96,8 +93,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} @@ -120,9 +115,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - - refute_received {:socket_push, :text, _} + refute_received {:socket_push, :text, _}, 120 end @tag policies: [:read_matching_user_role, :write_matching_user_role], role: "anon" @@ -139,8 +132,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} @@ -163,9 +154,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - - refute_received {:socket_push, :text, _} + refute_received {:socket_push, :text, _}, 120 end test "with nil policy and invalid user, won't send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do @@ -177,8 +166,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - refute_received _any {:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) @@ -259,8 +246,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(120) - for _ <- 1..100 do topic = "realtime:#{topic}" assert_received {:socket_push, :text, data} @@ -268,6 +253,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do assert message == %{"event" => "broadcast", "payload" => %{"a" => "b"}, "ref" => nil, "topic" => topic} end + Process.sleep(120) {:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) assert Enum.sum(buckets) == 100 assert avg > 0.0 @@ -290,6 +276,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do end Process.sleep(120) + {:ok, %{avg: avg, bucket: buckets}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) assert Enum.sum(buckets) == 100 assert avg > 0.0 @@ -312,6 +299,74 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do {:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_rate(tenant)) assert avg == 0.0 end + + test "handle payload size excedding limits in private channels", %{topic: topic, tenant: tenant, db_conn: db_conn} do + socket = + socket_fixture(tenant, topic, + policies: %Policies{broadcast: %BroadcastPolicies{write: true}}, + ack_broadcast: false + ) + + assert {:noreply, _} = + BroadcastHandler.handle( + %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)}, + db_conn, + socket + ) + + refute_received {:socket_push, :text, _}, 120 + end + + test "handle payload size excedding limits in public channels", %{topic: topic, tenant: tenant, db_conn: db_conn} do + socket = socket_fixture(tenant, topic, ack_broadcast: false, private?: false) + + assert {:noreply, _} = + BroadcastHandler.handle( + %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)}, + db_conn, + socket + ) + + refute_received {:socket_push, :text, _}, 120 + end + + test "handle payload size excedding limits in private channel and if ack it will receive error", %{ + topic: topic, + tenant: tenant, + db_conn: db_conn + } do + socket = + socket_fixture(tenant, topic, + policies: %Policies{broadcast: %BroadcastPolicies{write: true}}, + ack_broadcast: true + ) + + assert {:reply, {:error, :payload_size_exceeded}, _} = + BroadcastHandler.handle( + %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)}, + db_conn, + socket + ) + + refute_received {:socket_push, :text, _}, 120 + end + + test "handle payload size excedding limits in public channels and if ack it will receive error", %{ + topic: topic, + tenant: tenant, + db_conn: db_conn + } do + socket = socket_fixture(tenant, topic, ack_broadcast: true, private?: false) + + assert {:reply, {:error, :payload_size_exceeded}, _} = + BroadcastHandler.handle( + %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 1)}, + db_conn, + socket + ) + + refute_received {:socket_push, :text, _}, 120 + end end defp initiate_tenant(context) do diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs index 209c405de..22ee74261 100644 --- a/test/realtime_web/controllers/broadcast_controller_test.exs +++ b/test/realtime_web/controllers/broadcast_controller_test.exs @@ -151,6 +151,30 @@ defmodule RealtimeWeb.BroadcastControllerTest do refute_receive {:socket_push, _, _} end + + test "returns 422 when batch of messages includes a message that exceeds the tenant payload size", %{ + conn: conn, + tenant: tenant + } do + sub_topic_1 = "sub_topic_1" + sub_topic_2 = "sub_topic_2" + + payload_1 = %{"data" => "data"} + payload_2 = %{"data" => random_string(tenant.max_payload_size_in_kb * 1000 + 100)} + event_1 = "event_1" + event_2 = "event_2" + + conn = + post(conn, Routes.broadcast_path(conn, :broadcast), %{ + "messages" => [ + %{"topic" => sub_topic_1, "payload" => payload_1, "event" => event_1}, + %{"topic" => sub_topic_1, "payload" => payload_1, "event" => event_1}, + %{"topic" => sub_topic_2, "payload" => payload_2, "event" => event_2} + ] + }) + + assert conn.status == 422 + end end describe "too many requests" do