Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update pg strategy test #673

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.22.15",
version: "2.22.16",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
236 changes: 34 additions & 202 deletions test/realtime/cluster_strategy/postgres_test.exs
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
defmodule Realtime.Cluster.Strategy.PostgresTest do
use ExUnit.Case

import Mock

alias Cluster.Strategy.State
alias Ecto.Adapters.SQL
alias Postgrex.Notifications, as: PN
alias Realtime.Cluster.Strategy.Postgres

def connect(caller, result \\ true, node) do
send(caller, {:connect, node})
result
end

def disconnect(caller, result \\ true, node) do
send(caller, {:disconnect, node})
result
setup do
Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :auto)

Check warning on line 9 in test/realtime/cluster_strategy/postgres_test.exs

View workflow job for this annotation

GitHub Actions / Formatting Checks

Nested modules could be aliased at the top of the invoking module.
end

def list_nodes(nodes) do
nodes
test "handle_event/4, :internal, :connect is successful" do
assert {:noreply, state} = Postgres.handle_continue(:connect, libcluster_state())
assert is_pid(state.meta.conn)
assert is_pid(state.meta.conn_notif)
end

test "notify cluster after start" do
state = libcluster_state()
Postgres.start_link([state])

{:ok, conn_notif} = PN.start_link(state.meta.opts.())
PN.listen(conn_notif, "realtime_cluster")
node = "#{node()}"
assert_receive {:notification, _, _, "realtime_cluster", ^node}
end

defp libcluster_state() do
%State{
topology: [],
connect: {__MODULE__, :connect, [self()]},
disconnect: {__MODULE__, :disconnect, [self()]},
list_nodes: {__MODULE__, :list_nodes, [[]]},
config: opts(),
meta: %{
opts: fn -> opts() end,
conn: nil,
conn_notif: nil,
heartbeat_ref: nil
}
}
end

@test_node :testhost@testnode
@state :no_state
@libcluster_state %State{
topology: [],
connect: {__MODULE__, :connect, [self()]},
disconnect: {__MODULE__, :disconnect, [self()]},
list_nodes: {__MODULE__, :list_nodes, [[]]},
config: [
defp opts() do
[
hostname: "localhost",
username: "postgres",
password: "postgres",
Expand All @@ -40,187 +53,6 @@
],
heartbeat_interval: 5_000,
node_timeout: 15_000
],
meta: %Postgres.MetaState{conn: nil, listen_ref: nil, inactive_nodes: MapSet.new()}
}

setup do
Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :auto)
end

test "handle_event/4, :internal, :connect is successful" do
assert {:keep_state, %State{} = state, {{:timeout, :listen}, 0, nil}} =
Postgres.handle_event(:internal, :connect, @state, @libcluster_state)

assert is_pid(state.meta.conn)
end

test "handle_event/4, :internal, :connect is unsuccessful" do
with_mock PN, start_link: fn _ -> {:error, :error} end do
assert {:keep_state, %State{} = state, {{:timeout, :connect}, timeout, nil}} =
Postgres.handle_event(:internal, :connect, @state, @libcluster_state)

assert is_nil(state.meta.conn)
assert timeout <= 1_000
end
end

test "handle_event/4, :internal, :listen is successful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

assert {:keep_state, %State{} = state, {{:timeout, {:notify, :sync}}, 0, nil}} =
Postgres.handle_event(:internal, :listen, @state, test_state)

assert is_reference(state.meta.listen_ref)
end

test "handle_event/4, :internal, :listen is unsuccessful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

with_mock PN, listen: fn _, _ -> {:eventually, make_ref()} end do
assert {:keep_state, %State{} = state, {{:timeout, :listen}, timeout, nil}} =
Postgres.handle_event(:internal, :listen, @state, test_state)

assert is_nil(state.meta.listen_ref)
assert timeout <= 1_000
end
end

test "handle_event/4, :internal, {:notify, :sync} is successful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

assert {:keep_state, %State{}, {{:timeout, {:notify, :heartbeat}}, timeout, nil}} =
Postgres.handle_event(:internal, {:notify, :sync}, @state, test_state)

assert timeout >= 4_000
assert timeout <= 6_000
end

test "handle_event/4, :internal, {:notify, :sync} is unsuccessful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

with_mock SQL, query: fn _, _, _ -> {:error, :error} end do
assert {:keep_state, %State{}, {{:timeout, {:notify, :heartbeat}}, timeout, nil}} =
Postgres.handle_event(:internal, {:notify, :sync}, @state, test_state)

assert timeout <= 1_000
end
end

test "handle_event/4, :internal, {:notify, :heartbeat} is successful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

assert {:keep_state, %State{}, {{:timeout, {:notify, :heartbeat}}, timeout, nil}} =
Postgres.handle_event(:internal, {:notify, :heartbeat}, @state, test_state)

assert timeout >= 4_000
assert timeout <= 6_000
end

test "handle_event/4, :internal, {:notify, :heartbeat} is unsuccessful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

with_mock SQL, query: fn _, _, _ -> {:error, :error} end do
assert {:keep_state, %State{}, {{:timeout, {:notify, :heartbeat}}, timeout, nil}} =
Postgres.handle_event(:internal, {:notify, :heartbeat}, @state, test_state)

assert timeout <= 1_000
end
end

test "handle_event/4, {:timeout, {:listen, node}} is successful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

assert {:keep_state, %State{}} =
Postgres.handle_event({:timeout, {:listen, @test_node}}, nil, @state, test_state)
end

test "handle_event/4, {:timeout, {:listen, node}} is unsuccessful" do
{:ok, conn} = PN.start_link(@libcluster_state.config)
test_state = %{@libcluster_state | meta: %{@libcluster_state.meta | conn: conn}}

expected_inactive_nodes = MapSet.new([@test_node])

with_mock Cluster.Strategy,
disconnect_nodes: fn _, _, _, _ -> {:error, [{@test_node, "reason"}]} end do
assert {:keep_state,
%Cluster.Strategy.State{
meta: %Postgres.MetaState{inactive_nodes: ^expected_inactive_nodes}
}} =
Postgres.handle_event({:timeout, {:listen, @test_node}}, nil, @state, test_state)
end
end

test "notify cluster of sync" do
[
%State{
topology: [],
connect: {__MODULE__, :connect, [self()]},
disconnect: {__MODULE__, :disconnect, [self()]},
list_nodes: {__MODULE__, :list_nodes, [[]]},
config: [
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "realtime_test",
port: 5432,
parameters: [
application_name: "#{node()}"
],
heartbeat_interval: 5_000,
node_timeout: 15_000
],
meta: %Postgres.MetaState{conn: nil, listen_ref: nil, inactive_nodes: MapSet.new([])}
}
]
|> Postgres.start_link()

{:ok, conn} = Postgrex.start_link(@libcluster_state.config)

Enum.map(1..100, fn _ ->
Postgrex.query!(conn, "NOTIFY cluster, 'sync::#{@test_node}'", [])
end)

assert_receive {:connect, @test_node}
end

test "notify cluster of heartbeat" do
[
%State{
topology: [],
connect: {__MODULE__, :connect, [self()]},
disconnect: {__MODULE__, :disconnect, [self()]},
list_nodes: {__MODULE__, :list_nodes, [[]]},
config: [
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "realtime_test",
port: 5432,
parameters: [
application_name: "#{node()}"
],
heartbeat_interval: 5_000,
node_timeout: 15_000
],
meta: %Postgres.MetaState{conn: nil, listen_ref: nil, inactive_nodes: MapSet.new([])}
}
]
|> Postgres.start_link()

{:ok, conn} = Postgrex.start_link(@libcluster_state.config)

Enum.map(1..100, fn _ ->
Postgrex.query!(conn, "NOTIFY cluster, 'heartbeat::#{@test_node}'", [])
end)

assert_receive {:connect, @test_node}
end
end
Loading