Skip to content

Commit 6991704

Browse files
committed
fix: Disconnect when tenant has no users
To ensure that we limit the amount of connections to a tenant database, we will check if that tenant has any users and if not, we kill the database connection.
1 parent 0c6328c commit 6991704

File tree

4 files changed

+198
-74
lines changed

4 files changed

+198
-74
lines changed

lib/realtime/tenants/connect.ex

+88-15
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,40 @@ defmodule Realtime.Tenants.Connect do
22
@moduledoc """
33
This module is responsible for attempting to connect to a tenant's database and store the DBConnection in a Syn registry.
44
"""
5-
use GenServer
5+
use GenServer, restart: :transient
66

77
require Logger
88

99
alias Realtime.Helpers
1010
alias Realtime.Tenants
11+
alias Realtime.UsersCounter
1112

12-
defstruct tenant_id: nil, db_conn_reference: nil
13+
@erpc_timeout_default 5000
14+
@check_connected_user_interval_default 1000
15+
@connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
16+
17+
defstruct tenant_id: nil,
18+
db_conn_reference: nil,
19+
db_conn_pid: nil,
20+
check_connected_user_interval: nil,
21+
connected_users_bucket: [1]
1322

1423
@doc """
1524
Returns the database connection for a tenant. If the tenant is not connected, it will attempt to connect to the tenant's database.
1625
"""
17-
@spec lookup_or_start_connection(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
18-
def lookup_or_start_connection(tenant_id) do
26+
@spec lookup_or_start_connection(binary(), keyword()) ::
27+
{:ok, DBConnection.t()} | {:error, term()}
28+
def lookup_or_start_connection(tenant_id, opts \\ []) do
1929
case get_status(tenant_id) do
2030
{:ok, conn} -> {:ok, conn}
21-
{:error, :tenant_database_unavailable} -> call_external_node(tenant_id)
31+
{:error, :tenant_database_unavailable} -> call_external_node(tenant_id, opts)
2232
{:error, :initializing} -> {:error, :tenant_database_unavailable}
2333
end
2434
end
2535

2636
@doc """
2737
Returns the database connection pid from :syn if it exists.
2838
"""
29-
3039
@spec get_status(binary()) ::
3140
{:ok, DBConnection.t()} | {:error, :tenant_database_unavailable | :initializing}
3241
def get_status(tenant_id) do
@@ -40,10 +49,10 @@ defmodule Realtime.Tenants.Connect do
4049
@doc """
4150
Connects to a tenant's database and stores the DBConnection in the process :syn metadata
4251
"""
43-
@spec connect(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
44-
def connect(tenant_id) do
52+
@spec connect(binary(), keyword()) :: {:ok, DBConnection.t()} | {:error, term()}
53+
def connect(tenant_id, opts \\ []) do
4554
supervisor = {:via, PartitionSupervisor, {Realtime.Tenants.Connect.DynamicSupervisor, self()}}
46-
spec = {__MODULE__, tenant_id: tenant_id}
55+
spec = {__MODULE__, [tenant_id: tenant_id] ++ opts}
4756

4857
case DynamicSupervisor.start_child(supervisor, spec) do
4958
{:ok, _} -> get_status(tenant_id)
@@ -52,9 +61,20 @@ defmodule Realtime.Tenants.Connect do
5261
end
5362
end
5463

55-
def start_link(tenant_id: tenant_id) do
64+
def start_link(opts) do
65+
tenant_id = Keyword.get(opts, :tenant_id)
66+
67+
check_connected_user_interval =
68+
Keyword.get(opts, :check_connected_user_interval, @check_connected_user_interval_default)
69+
5670
name = {__MODULE__, tenant_id, %{conn: nil}}
57-
GenServer.start_link(__MODULE__, %__MODULE__{tenant_id: tenant_id}, name: {:via, :syn, name})
71+
72+
state = %__MODULE__{
73+
tenant_id: tenant_id,
74+
check_connected_user_interval: check_connected_user_interval
75+
}
76+
77+
GenServer.start_link(__MODULE__, state, name: {:via, :syn, name})
5878
end
5979

6080
## GenServer callbacks
@@ -66,9 +86,9 @@ defmodule Realtime.Tenants.Connect do
6686
case res do
6787
{:ok, conn} ->
6888
:syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | conn: conn} end)
69-
state = %{state | db_conn_reference: Process.monitor(conn)}
89+
state = %{state | db_conn_reference: Process.monitor(conn), db_conn_pid: conn}
7090

71-
{:ok, state}
91+
{:ok, state, {:continue, :setup_connected_users}}
7292

7393
{:error, error} ->
7494
Logger.error("Error connecting to tenant database: #{inspect(error)}")
@@ -77,6 +97,41 @@ defmodule Realtime.Tenants.Connect do
7797
end
7898
end
7999

100+
@impl GenServer
101+
def handle_continue(
102+
:setup_connected_users,
103+
%{
104+
check_connected_user_interval: check_connected_user_interval,
105+
connected_users_bucket: connected_users_bucket
106+
} = state
107+
) do
108+
send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)
109+
{:noreply, state}
110+
end
111+
112+
@impl GenServer
113+
def handle_info(
114+
:check_connected_users,
115+
%{
116+
tenant_id: tenant_id,
117+
check_connected_user_interval: check_connected_user_interval,
118+
connected_users_bucket: connected_users_bucket
119+
} = state
120+
) do
121+
connected_users_bucket =
122+
tenant_id
123+
|> update_connected_users_bucket(connected_users_bucket)
124+
|> send_connected_user_check_message(check_connected_user_interval)
125+
126+
{:noreply, %{state | connected_users_bucket: connected_users_bucket}}
127+
end
128+
129+
def handle_info(:shutdown, %{db_conn_pid: db_conn_pid} = state) do
130+
Logger.info("Tenant has no connected users, database connection will be terminated")
131+
:ok = GenServer.stop(db_conn_pid)
132+
{:stop, :normal, state}
133+
end
134+
80135
@impl GenServer
81136
def handle_info(
82137
{:DOWN, db_conn_reference, _, _, _},
@@ -88,10 +143,28 @@ defmodule Realtime.Tenants.Connect do
88143

89144
## Private functions
90145

91-
defp call_external_node(tenant_id) do
146+
defp call_external_node(tenant_id, opts) do
92147
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
93148
{:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do
94-
:erpc.call(node, __MODULE__, :connect, [tenant_id], 5000)
149+
:erpc.call(node, __MODULE__, :connect, [tenant_id, opts], @erpc_timeout_default)
95150
end
96151
end
152+
153+
defp update_connected_users_bucket(tenant_id, connected_users_bucket) do
154+
connected_users_bucket
155+
|> then(&(&1 ++ [UsersCounter.tenant_users(tenant_id)]))
156+
|> Enum.take(-6)
157+
end
158+
159+
defp send_connected_user_check_message(
160+
@connected_users_bucket_shutdown,
161+
check_connected_user_interval
162+
) do
163+
Process.send_after(self(), :shutdown, check_connected_user_interval)
164+
end
165+
166+
defp send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) do
167+
Process.send_after(self(), :check_connected_users, check_connected_user_interval)
168+
connected_users_bucket
169+
end
97170
end

mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.25.4",
7+
version: "2.25.5",
88
elixir: "~> 1.14.0",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

+61-58
Original file line numberDiff line numberDiff line change
@@ -112,72 +112,75 @@ defmodule Realtime.Integration.RtChannelTest do
112112
P.query!(conn, "insert into test (details) values ('test')", [])
113113

114114
assert_receive %Message{
115-
event: "postgres_changes",
116-
payload: %{
117-
"data" => %{
118-
"columns" => [
119-
%{"name" => "id", "type" => "int4"},
120-
%{"name" => "details", "type" => "text"}
121-
],
122-
"commit_timestamp" => _ts,
123-
"errors" => nil,
124-
"record" => %{"details" => "test", "id" => id},
125-
"schema" => "public",
126-
"table" => "test",
127-
"type" => "INSERT"
128-
},
129-
"ids" => [^sub_id]
130-
},
131-
ref: nil,
132-
topic: "realtime:any"
133-
}
115+
event: "postgres_changes",
116+
payload: %{
117+
"data" => %{
118+
"columns" => [
119+
%{"name" => "id", "type" => "int4"},
120+
%{"name" => "details", "type" => "text"}
121+
],
122+
"commit_timestamp" => _ts,
123+
"errors" => nil,
124+
"record" => %{"details" => "test", "id" => id},
125+
"schema" => "public",
126+
"table" => "test",
127+
"type" => "INSERT"
128+
},
129+
"ids" => [^sub_id]
130+
},
131+
ref: nil,
132+
topic: "realtime:any"
133+
},
134+
2000
134135

135136
P.query!(conn, "update test set details = 'test' where id = #{id}", [])
136137

137138
assert_receive %Message{
138-
event: "postgres_changes",
139-
payload: %{
140-
"data" => %{
141-
"columns" => [
142-
%{"name" => "id", "type" => "int4"},
143-
%{"name" => "details", "type" => "text"}
144-
],
145-
"commit_timestamp" => _ts,
146-
"errors" => nil,
147-
"old_record" => %{"id" => ^id},
148-
"record" => %{"details" => "test", "id" => ^id},
149-
"schema" => "public",
150-
"table" => "test",
151-
"type" => "UPDATE"
152-
},
153-
"ids" => [^sub_id]
154-
},
155-
ref: nil,
156-
topic: "realtime:any"
157-
}
139+
event: "postgres_changes",
140+
payload: %{
141+
"data" => %{
142+
"columns" => [
143+
%{"name" => "id", "type" => "int4"},
144+
%{"name" => "details", "type" => "text"}
145+
],
146+
"commit_timestamp" => _ts,
147+
"errors" => nil,
148+
"old_record" => %{"id" => ^id},
149+
"record" => %{"details" => "test", "id" => ^id},
150+
"schema" => "public",
151+
"table" => "test",
152+
"type" => "UPDATE"
153+
},
154+
"ids" => [^sub_id]
155+
},
156+
ref: nil,
157+
topic: "realtime:any"
158+
},
159+
2000
158160

159161
P.query!(conn, "delete from test where id = #{id}", [])
160162

161163
assert_receive %Message{
162-
event: "postgres_changes",
163-
payload: %{
164-
"data" => %{
165-
"columns" => [
166-
%{"name" => "id", "type" => "int4"},
167-
%{"name" => "details", "type" => "text"}
168-
],
169-
"commit_timestamp" => _ts,
170-
"errors" => nil,
171-
"old_record" => %{"id" => ^id},
172-
"schema" => "public",
173-
"table" => "test",
174-
"type" => "DELETE"
175-
},
176-
"ids" => [^sub_id]
177-
},
178-
ref: nil,
179-
topic: "realtime:any"
180-
}
164+
event: "postgres_changes",
165+
payload: %{
166+
"data" => %{
167+
"columns" => [
168+
%{"name" => "id", "type" => "int4"},
169+
%{"name" => "details", "type" => "text"}
170+
],
171+
"commit_timestamp" => _ts,
172+
"errors" => nil,
173+
"old_record" => %{"id" => ^id},
174+
"schema" => "public",
175+
"table" => "test",
176+
"type" => "DELETE"
177+
},
178+
"ids" => [^sub_id]
179+
},
180+
ref: nil,
181+
topic: "realtime:any"
182+
},
183+
2000
181184
end
182185

183186
test "broadcast" do

test/realtime/tenants/connect_test.exs

+48
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
defmodule Realtime.Tenants.ConnectTest do
22
use Realtime.DataCase, async: false
3+
34
alias Realtime.Tenants.Connect
5+
alias Realtime.UsersCounter
46

57
describe "lookup_or_start_connection/1" do
68
setup do
@@ -50,5 +52,51 @@ defmodule Realtime.Tenants.ConnectTest do
5052
test "if tenant does not exist, returns error" do
5153
assert Connect.lookup_or_start_connection("none") == {:error, :tenant_not_found}
5254
end
55+
56+
test "if no users are connected to a tenant channel, stop the connection", %{
57+
tenant: %{external_id: tenant_id}
58+
} do
59+
{:ok, db_conn} =
60+
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 50)
61+
62+
# Not enough time has passed, connection still alive
63+
:timer.sleep(100)
64+
assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id)
65+
66+
# Enough time has passed, connection stopped
67+
:timer.sleep(1000)
68+
assert :undefined = :syn.lookup(Connect, tenant_id)
69+
refute Process.alive?(db_conn)
70+
end
71+
72+
test "if users are connected to a tenant channel, keep the connection", %{
73+
tenant: %{external_id: tenant_id}
74+
} do
75+
UsersCounter.add(self(), tenant_id)
76+
77+
{:ok, db_conn} =
78+
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10)
79+
80+
assert {pid, %{conn: conn_pid}} = :syn.lookup(Connect, tenant_id)
81+
:timer.sleep(300)
82+
assert {^pid, %{conn: ^conn_pid}} = :syn.lookup(Connect, tenant_id)
83+
assert Process.alive?(db_conn)
84+
end
85+
86+
test "connection is killed after user leaving", %{
87+
tenant: %{external_id: tenant_id}
88+
} do
89+
UsersCounter.add(self(), tenant_id)
90+
91+
{:ok, db_conn} =
92+
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10)
93+
94+
assert {_pid, %{conn: _conn_pid}} = :syn.lookup(Connect, tenant_id)
95+
:timer.sleep(300)
96+
:syn.leave(:users, tenant_id, self())
97+
:timer.sleep(300)
98+
assert :undefined = :syn.lookup(Connect, tenant_id)
99+
refute Process.alive?(db_conn)
100+
end
53101
end
54102
end

0 commit comments

Comments
 (0)