Skip to content

Commit ba1e60c

Browse files
committed
Verify durable slot handover and deactivation.
1 parent 24f03ac commit ba1e60c

File tree

5 files changed

+122
-57
lines changed

5 files changed

+122
-57
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ by adding `walex` to your list of dependencies in `mix.exs`:
2929
```elixir
3030
def deps do
3131
[
32-
{:walex, "~> 4.2.0"}
32+
{:walex, "~> 4.4.0"}
3333
]
3434
end
3535
```
+10-10
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
defmodule WalEx.Replication.QueryBuilder do
2-
def publication_exists(state) do
3-
"SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication}' LIMIT 1;"
2+
def publication_exists(%{publication: publication}) do
3+
"SELECT 1 FROM pg_publication WHERE pubname = '#{publication}' LIMIT 1;"
44
end
55

6-
def slot_exists(state) do
7-
"SELECT active FROM pg_replication_slots WHERE slot_name = '#{state.slot_name}' LIMIT 1;"
6+
def slot_exists(%{slot_name: slot_name}) do
7+
"SELECT active FROM pg_replication_slots WHERE slot_name = '#{slot_name}' LIMIT 1;"
88
end
99

10-
def create_temporary_slot(state) do
11-
"CREATE_REPLICATION_SLOT #{state.slot_name} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
10+
def create_temporary_slot(%{slot_name: slot_name}) do
11+
"CREATE_REPLICATION_SLOT #{slot_name} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
1212
end
1313

14-
def create_durable_slot(state) do
15-
"CREATE_REPLICATION_SLOT #{state.slot_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
14+
def create_durable_slot(%{slot_name: slot_name}) do
15+
"CREATE_REPLICATION_SLOT #{slot_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT;"
1616
end
1717

18-
def start_replication_slot(state) do
19-
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication}')"
18+
def start_replication_slot(%{slot_name: slot_name, publication: publication}) do
19+
"START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')"
2020
end
2121
end

lib/walex/replication/server.ex

+82-29
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,18 @@ defmodule WalEx.Replication.Server do
1111
alias WalEx.Decoder
1212
alias WalEx.Replication.QueryBuilder
1313

14+
require Logger
15+
16+
@max_retries 10
17+
@initial_backoff 1000
18+
1419
def start_link(opts) do
1520
app_name = Keyword.get(opts, :app_name)
1621
opts = set_pgx_replication_conn_opts(app_name)
1722

1823
Postgrex.ReplicationConnection.start_link(__MODULE__, [app_name: app_name], opts)
1924
end
2025

21-
defp set_pgx_replication_conn_opts(app_name) do
22-
database_configs_keys = [
23-
:hostname,
24-
:username,
25-
:password,
26-
:port,
27-
:database,
28-
:ssl,
29-
:ssl_opts,
30-
:socket_options
31-
]
32-
33-
extra_opts = [auto_reconnect: true]
34-
database_configs = WalEx.Config.get_configs(app_name, database_configs_keys)
35-
36-
replications_name = [
37-
name: WalExRegistry.set_name(:set_gen_server, __MODULE__, app_name)
38-
]
39-
40-
extra_opts ++ database_configs ++ replications_name
41-
end
42-
4326
@impl true
4427
def init(opts) do
4528
app_name = Keyword.get(opts, :app_name)
@@ -90,7 +73,7 @@ defmodule WalEx.Replication.Server do
9073
def handle_result(results, %{step: :publication_exists} = state) do
9174
case results do
9275
[%Postgrex.Result{num_rows: 0}] ->
93-
raise "Publication doesn't exists. publication: #{inspect(state.publication)}"
76+
raise "Publication doesn't exist. publication: #{inspect(state.publication)}"
9477

9578
_ ->
9679
raise "Unexpected result when checking if publication exists. #{inspect(results)}"
@@ -110,11 +93,17 @@ defmodule WalEx.Replication.Server do
11093
) do
11194
case active do
11295
"f" ->
113-
query = QueryBuilder.start_replication_slot(state)
114-
{:stream, query, [], %{state | step: :streaming}}
96+
Logger.info("Activating inactive replication slot: #{state.slot_name}")
97+
start_replication_with_retry(state, 0, @initial_backoff)
11598

11699
"t" ->
117-
raise "Durable slot already active"
100+
Logger.info(
101+
"Replication slot #{state.slot_name} is active. Waiting for it to become inactive."
102+
)
103+
104+
schedule_slot_check()
105+
106+
{:noreply, state}
118107
end
119108
end
120109

@@ -125,8 +114,7 @@ defmodule WalEx.Replication.Server do
125114

126115
@impl true
127116
def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do
128-
query = QueryBuilder.start_replication_slot(state)
129-
{:stream, query, [], %{state | step: :streaming}}
117+
start_replication_with_retry(state, 0, @initial_backoff)
130118
end
131119

132120
@impl true
@@ -136,7 +124,22 @@ defmodule WalEx.Replication.Server do
136124
end
137125

138126
@impl true
139-
# https://www.postgresql.org/docs/14/protocol-replication.html
127+
def handle_result(
128+
%Postgrex.Error{postgres: %{code: :object_in_use}},
129+
state = %{step: {:start_replication, retry_count, backoff}}
130+
) do
131+
Logger.warning("Replication slot in use, retrying... (attempt #{retry_count + 1})")
132+
Process.sleep(backoff)
133+
start_replication_with_retry(state, retry_count + 1, backoff * 2)
134+
end
135+
136+
@impl true
137+
def handle_result(_, state = %{step: {:start_replication, _retry_count, _backoff}}) do
138+
Logger.info("Successfully started replication slot: #{state.slot_name}")
139+
{:noreply, %{state | step: :streaming}}
140+
end
141+
142+
@impl true
140143
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
141144
rest
142145
|> Decoder.decode_message()
@@ -145,6 +148,7 @@ defmodule WalEx.Replication.Server do
145148
{:noreply, state}
146149
end
147150

151+
@impl true
148152
def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
149153
messages =
150154
case reply do
@@ -155,6 +159,55 @@ defmodule WalEx.Replication.Server do
155159
{:noreply, messages, state}
156160
end
157161

162+
@impl true
163+
def handle_info(:check_slot_status, state) do
164+
query = QueryBuilder.slot_exists(state)
165+
{:query, query, %{state | step: :slot_exists}}
166+
end
167+
168+
defp set_pgx_replication_conn_opts(app_name) do
169+
database_configs_keys = [
170+
:hostname,
171+
:username,
172+
:password,
173+
:port,
174+
:database,
175+
:ssl,
176+
:ssl_opts,
177+
:socket_options
178+
]
179+
180+
extra_opts = [auto_reconnect: true]
181+
database_configs = WalEx.Config.get_configs(app_name, database_configs_keys)
182+
183+
replications_name = [
184+
name: WalExRegistry.set_name(:set_gen_server, __MODULE__, app_name)
185+
]
186+
187+
extra_opts ++ database_configs ++ replications_name
188+
end
189+
190+
defp start_replication_with_retry(state, retry_count, backoff)
191+
when retry_count < @max_retries do
192+
query = QueryBuilder.start_replication_slot(state)
193+
{:stream, query, [], %{state | step: {:start_replication, retry_count, backoff}}}
194+
end
195+
196+
defp start_replication_with_retry(state, _retry_count, _backoff) do
197+
Logger.warning(
198+
"Failed to start replication slot after maximum retries. Scheduling another check."
199+
)
200+
201+
schedule_slot_check()
202+
203+
{:noreply, state}
204+
end
205+
206+
defp schedule_slot_check() do
207+
# Check again after 5 seconds
208+
Process.send_after(self(), :check_slot_status, 5000)
209+
end
210+
158211
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
159212
defp current_time, do: System.os_time(:microsecond) - @epoch
160213
end

mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule WalEx.MixProject do
44
def project do
55
[
66
app: :walex,
7-
version: "4.3.0",
7+
version: "4.4.0",
88
elixir: "~> 1.15",
99
build_embedded: Mix.env() == :prod,
1010
start_permanent: Mix.env() == :prod,

test/walex/database_test.exs

+28-16
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ defmodule WalEx.DatabaseTest do
163163
"temporary" => false
164164
}
165165

166-
stopped_slot = Map.replace(durable_slot, "active", false)
166+
inactive_slot = %{durable_slot | "active" => false}
167167

168168
start_supervised!({WalExSupervisor, durable_opts},
169169
restart: :temporary,
@@ -174,27 +174,39 @@ defmodule WalEx.DatabaseTest do
174174

175175
other_app_opts = Keyword.replace!(durable_opts, :name, :other_app)
176176

177-
assert {:error, {{:shutdown, error}, _}} =
178-
start_supervised({WalExSupervisor, other_app_opts},
179-
restart: :temporary,
180-
id: :other_app_supervisor
181-
)
177+
# Start another supervisor with the same slot name
178+
{:ok, pid} =
179+
start_supervised({WalExSupervisor, other_app_opts},
180+
restart: :temporary,
181+
id: :other_app_supervisor
182+
)
182183

183-
assert {:failed_to_start_child, WalEx.Replication.Supervisor, {:shutdown, error}} = error
184-
assert {:failed_to_start_child, WalEx.Replication.Server, error} = error
185-
assert %RuntimeError{message: "Durable slot already active"} = error
184+
# Wait for the retry mechanism to complete
185+
Process.sleep(10_000)
186186

187+
# Check that the other app is still running and waiting
188+
assert Process.alive?(pid)
189+
190+
# The original slot should still be active
191+
assert [^durable_slot] = pg_replication_slots(database_pid)
192+
193+
# Stop the first supervisor
187194
stop_supervised(:ok_supervisor)
188-
# sleep to make sure that Postgres detect that the connection is closed
189-
Process.sleep(1_000)
190-
assert [^stopped_slot] = pg_replication_slots(database_pid)
191195

192-
start_supervised!({WalExSupervisor, durable_opts},
193-
restart: :temporary,
194-
id: :ok_supervisor
195-
)
196+
# Sleep to make sure that Postgres detects that the connection is closed
197+
# and the second supervisor has time to activate the slot
198+
Process.sleep(10_000)
196199

200+
# The slot should now be active under the second supervisor
197201
assert [^durable_slot] = pg_replication_slots(database_pid)
202+
203+
# Clean up
204+
stop_supervised(:other_app_supervisor)
205+
206+
# Wait for the slot to become inactive after stopping all supervisors
207+
Process.sleep(5_000)
208+
[slot] = pg_replication_slots(database_pid)
209+
assert slot == inactive_slot
198210
end
199211
end
200212

0 commit comments

Comments
 (0)