Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
elixir 1.18.4
elixir 1.18.4-otp-27
nodejs 18.13.0
erlang 27
16 changes: 16 additions & 0 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -501,4 +501,20 @@ defmodule Realtime.Tenants do
@spec region(Tenant.t()) :: String.t() | nil
def region(%Tenant{extensions: [%{settings: settings}]}), do: Map.get(settings, "region")
def region(_), do: nil

@doc """
"""
@spec validate_payload_size(Tenant.t() | binary(), map()) :: :ok | {:error, :payload_size_exceeded}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update PresenceHandler to also use this function?

def validate_payload_size(tenant_id, payload) when is_binary(tenant_id) do
tenant_id
|> Cache.get_tenant_by_external_id()
|> validate_payload_size(payload)
end

@payload_size_padding 500
def validate_payload_size(%Tenant{max_payload_size_in_kb: max_payload_size_in_kb}, payload) do
max_payload_size = max_payload_size_in_kb * 1000 + @payload_size_padding
payload_size = :erlang.external_size(payload)
if payload_size > max_payload_size, do: {:error, :payload_size_exceeded}, else: :ok
end
end
39 changes: 23 additions & 16 deletions lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()})
},
super_user :: boolean()
) :: :ok | {:error, atom()}
) :: :ok | {:error, atom() | Ecto.Changeset.t()}
def broadcast(auth_params, tenant, messages, super_user \\ false)

def broadcast(%Plug.Conn{} = conn, %Tenant{} = tenant, messages, super_user) do
Expand All @@ -49,7 +49,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
end

def broadcast(auth_params, %Tenant{} = tenant, messages, super_user) do
with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages),
with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages, tenant),
%Ecto.Changeset{changes: %{messages: messages}} = changeset,
events_per_second_rate = Tenants.events_per_second_rate(tenant),
:ok <- check_rate_limit(events_per_second_rate, tenant, length(messages)) do
Expand All @@ -71,15 +71,11 @@ defmodule Realtime.Tenants.BatchBroadcast do
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
|> Enum.each(fn {topic, events} ->
if super_user do
Enum.each(events, fn message ->
send_message_and_count(tenant, events_per_second_rate, message, false)
end)
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)
else
case permissions_for_message(tenant, auth_params, topic) do
%Policies{broadcast: %BroadcastPolicies{write: true}} ->
Enum.each(events, fn message ->
send_message_and_count(tenant, events_per_second_rate, message, false)
end)
Enum.each(events, fn message -> send_message_and_count(tenant, events_per_second_rate, message, false) end)

_ ->
nil
Expand All @@ -88,22 +84,26 @@ defmodule Realtime.Tenants.BatchBroadcast do
end)

:ok
else
%Ecto.Changeset{valid?: false} = changeset -> {:error, changeset}
error -> error
end
end

def broadcast(_, nil, _, _), do: {:error, :tenant_not_found}

defp changeset(payload, attrs) do
defp changeset(payload, attrs, tenant) do
payload
|> cast(attrs, [])
|> cast_embed(:messages, required: true, with: &message_changeset/2)
|> cast_embed(:messages, required: true, with: fn message, attrs -> message_changeset(message, tenant, attrs) end)
end

defp message_changeset(message, attrs) do
defp message_changeset(message, tenant, attrs) do
message
|> cast(attrs, [:id, :topic, :payload, :event, :private])
|> maybe_put_private_change()
|> validate_required([:topic, :payload, :event])
|> validate_payload_size(tenant)
end

defp maybe_put_private_change(changeset) do
Expand All @@ -113,18 +113,25 @@ defmodule Realtime.Tenants.BatchBroadcast do
end
end

defp validate_payload_size(changeset, tenant) do
payload = get_change(changeset, :payload)

case Tenants.validate_payload_size(tenant, payload) do
:ok -> changeset
_ -> add_error(changeset, :payload, "Payload size exceeds tenant limit")
end
end

@event_type "broadcast"
defp send_message_and_count(tenant, events_per_second_rate, message, public?) do
tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?)

payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"}

payload =
if message[:id] do
Map.put(payload, "meta", %{"id" => message.id})
else
payload
end
if message[:id],
do: Map.put(payload, "meta", %{"id" => message.id}),
else: payload

broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}

Expand Down
5 changes: 5 additions & 0 deletions lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ defmodule Realtime.Tenants.ReplicationConnection do

{:noreply, state}
else
{:error, %Ecto.Changeset{valid?: false} = changeset} ->
error = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

log_error("UnableToBroadcastChanges", error)
{:noreply, state}

{:error, error} ->
log_error("UnableToBroadcastChanges", error)
{:noreply, state}
Expand Down
40 changes: 34 additions & 6 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do

import Phoenix.Socket, only: [assign: 3]

alias Realtime.Tenants
alias RealtimeWeb.RealtimeChannel
alias RealtimeWeb.TenantBroadcaster
alias Phoenix.Socket
Expand Down Expand Up @@ -38,8 +39,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
|> increment_rate_counter()

%{ack_broadcast: ack_broadcast} = socket.assigns
send_message(tenant_id, self_broadcast, tenant_topic, payload)
if ack_broadcast, do: {:reply, :ok, socket}, else: {:noreply, socket}

res =
case Tenants.validate_payload_size(tenant_id, payload) do
:ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload)
{:error, error} -> {:error, error}
end

cond do
ack_broadcast && match?({:error, :payload_size_exceeded}, res) ->
{:reply, {:error, :payload_size_exceeded}, socket}

ack_broadcast ->
{:reply, :ok, socket}

true ->
{:noreply, socket}
end

{:ok, policies} ->
{:noreply, assign(socket, :policies, policies)}
Expand All @@ -65,11 +81,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
} = socket

socket = increment_rate_counter(socket)
send_message(tenant_id, self_broadcast, tenant_topic, payload)

if ack_broadcast,
do: {:reply, :ok, socket},
else: {:noreply, socket}
res =
case Tenants.validate_payload_size(tenant_id, payload) do
:ok -> send_message(tenant_id, self_broadcast, tenant_topic, payload)
{:error, error} -> {:error, error}
end

cond do
ack_broadcast && match?({:error, :payload_size_exceeded}, res) ->
{:reply, {:error, :payload_size_exceeded}, socket}

ack_broadcast ->
{:reply, :ok, socket}

true ->
{:noreply, socket}
end
end

defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do
Expand Down
10 changes: 1 addition & 9 deletions lib/realtime_web/channels/realtime_channel/presence_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,5 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
end
end

# Added due to the fact that JSON decoding adds some overhead and erlang term will be slighly larger
@payload_size_padding 500
defp validate_payload_size(tenant, payload) do
if :erlang.external_size(payload) > tenant.max_payload_size_in_kb * 1000 + @payload_size_padding do
{:error, :payload_size_exceeded}
else
:ok
end
end
defp validate_payload_size(tenant, payload), do: Tenants.validate_payload_size(tenant, payload)
end
2 changes: 1 addition & 1 deletion lib/realtime_web/controllers/fallback_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule RealtimeWeb.FallbackController do
|> render("error.json", message: message)
end

def call(conn, %Ecto.Changeset{valid?: true} = changeset) do
def call(conn, {:error, %Ecto.Changeset{valid?: false} = changeset}) do
log_error(
"UnprocessableEntity",
Ecto.Changeset.traverse_errors(changeset, &translate_error/1)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.56.5",
version: "2.57.0",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
5 changes: 1 addition & 4 deletions test/e2e/tests.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { load } from "https://deno.land/[email protected]/dotenv/mod.ts";
import {
createClient,
SupabaseClient,
} from "npm:@supabase/[email protected]";
import { createClient, SupabaseClient } from "npm:@supabase/supabase-js@latest";
import { assertEquals } from "https://deno.land/[email protected]/assert/mod.ts";
import {
describe,
Expand Down
Loading