From 55ef68db27d8c6795fb50151a620e8d40f88f696 Mon Sep 17 00:00:00 2001 From: Hans Krutzer Date: Sun, 22 Sep 2024 23:13:41 +0200 Subject: [PATCH] Don't finish process init until connected --- lib/late.ex | 103 +++++++++++++----------------- test/late_test.exs | 56 +++++++++++----- test/support/websocket_handler.ex | 7 +- 3 files changed, 90 insertions(+), 76 deletions(-) diff --git a/lib/late.ex b/lib/late.ex index 3eb2d74..49af7fa 100644 --- a/lib/late.ex +++ b/lib/late.ex @@ -33,7 +33,6 @@ defmodule Late do :websocket, :request_ref, :resp_headers, - :connect_buffer, :state ] @@ -145,6 +144,7 @@ defmodule Late do case mod.init(args) do {:ok, mod_state} -> mint_opts = Keyword.get(opts, :mint_opts, []) + mint_opts = Keyword.put(mint_opts, :mode, :passive) mint_websocket_opts = Keyword.get(opts, :mint_opts, []) uri = URI.parse(Keyword.get(opts, :url)) headers = Keyword.get(opts, :headers, []) @@ -171,18 +171,37 @@ defmodule Late do # TODO Make HTTP1 configurable with {:ok, conn} <- Mint.HTTP1.connect(http_scheme, uri.host, uri.port, mint_opts), {:ok, conn, ref} <- - Mint.WebSocket.upgrade(ws_scheme, conn, path, headers, mint_websocket_opts) do + Mint.WebSocket.upgrade(ws_scheme, conn, path, headers, mint_websocket_opts), + {:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers} | rest]} <- + Mint.HTTP.recv(conn, 0, connect_timeout), + {:ok, conn} <- Mint.HTTP.set_mode(conn, :active), + {:ok, conn, websocket} <- Mint.WebSocket.new(conn, ref, status, resp_headers), + # In some cases the data from recv might already contain + # the initial frames, so we decode those. + # To similate this happening, add a delay after Mint.WebSocket.upgrade + {:ok, websocket, initial_data} <- maybe_decode_initial_data(websocket, rest) do + initial_frames = + Enum.map(initial_data, &{:next_event, :internal, {:handle_frame, &1}}) + state = %__MODULE__{ - request_ref: ref, conn: conn, + websocket: websocket, + resp_headers: resp_headers, + request_ref: ref, state: {mod, mod_state} } - {:ok, :connecting, state, {{:timeout, :connect_timeout}, connect_timeout, nil}} + {:ok, :connected, state, + [{:next_event, :internal, :maybe_handle_connect}] ++ initial_frames} else {:error, reason} -> {:error, reason} + {:error, conn, reason, _response} -> + # Mint.HTTP.recv error + Mint.HTTP.close(conn) + {:error, reason} + {:error, conn, reason} -> Mint.HTTP.close(conn) {:error, reason} @@ -190,62 +209,18 @@ defmodule Late do end end - ## State functions - def connecting({:timeout, :connect_timeout}, _from, state) do - Mint.HTTP.close(state.conn) - {:stop, :connect_timeout, state} - end - - def connecting(:info, message, %{conn: conn} = state) - when Mint.HTTP.is_connection_message(state.conn, message) do - ref = state.request_ref - - {:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers} | rest]} = - Mint.WebSocket.stream(conn, message) - - buffer = - case rest do - [{:data, ^ref, data}, {:done, ^ref}] -> data - [{:done, ^ref}] -> nil - end - - case Mint.WebSocket.new(conn, ref, status, resp_headers) do - {:ok, conn, websocket} -> - state = %{ - state - | conn: conn, - websocket: websocket, - resp_headers: resp_headers, - connect_buffer: buffer - } - - {:next_state, :connected, state, - [ - # Clear connection timeout first, then trigger event to call handle_connect - {{:timeout, :connect_timeout}, :cancel}, - {:next_event, :internal, :maybe_handle_connect} - ]} - - {:error, conn, reason} -> - {:stop, reason, %{state | conn: conn}} - end - end - - def connecting(:info, _message, _state), do: {:keep_state_and_data, :postpone} - def connecting({:call, _from}, _msg, _state), do: {:keep_state_and_data, :postpone} - - def maybe_prepend_buffer(%__MODULE__{connect_buffer: nil} = state, data), do: {:ok, state, data} + defp maybe_decode_initial_data(websocket, [{:done, _ref}]), do: {:ok, websocket, []} - def maybe_prepend_buffer(%__MODULE__{connect_buffer: buffer} = state, data) do - {:ok, %{state | connect_buffer: nil}, buffer <> data} + defp maybe_decode_initial_data(websocket, [{:data, ref, data}, {:done, ref}]) do + Mint.WebSocket.decode(websocket, data) end + ## State functions def connected(:info, message, state) when Mint.HTTP.is_connection_message(state.conn, message) do ref = state.request_ref with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(state.conn, message), - {:ok, state, data} <- maybe_prepend_buffer(state, data), {:ok, websocket, frames} <- Mint.WebSocket.decode(state.websocket, data) do # Send each frame as a new action actions = Enum.map(frames, &{:next_event, :internal, {:handle_frame, &1}}) @@ -257,9 +232,19 @@ defmodule Late do # Handle stream errors {:error, conn, %Mint.TransportError{reason: :closed} = reason, _responses} -> - # TODO handle_disconnect - Logger.warning("Connection closed #{inspect(reason)}") - {:stop, reason, %{state | conn: conn}} + {mod, mod_state} = state.state + state = %{state | conn: conn} + + # TODO Add reconnect + if function_exported?(mod, :handle_disconnect, 2) do + case apply(mod, :handle_disconnect, [reason, mod_state]) do + {:ok, mod_state} -> + state = %{state | state: {mod, mod_state}} + {:stop, reason, state} + end + else + {:stop, reason, state} + end {:error, conn, reason, _responses} -> {:stop, reason, %{state | conn: conn}} @@ -282,6 +267,10 @@ defmodule Late do {:keep_state, state} end + def connected(:internal, {:handle_frame, {:pong, _data}}, state) do + {:keep_state, state} + end + def connected(:internal, {:handle_frame, {op, text}}, state) when op in [:text, :binary] do {mod, mod_state} = state.state maybe_handle(mod, :handle_in, [{op, text}, mod_state], state) @@ -292,7 +281,7 @@ defmodule Late do end def connected(:internal, {:handle_frame, frame}, _state) do - Logger.error("Received unknown websocket frame #{frame}") + Logger.error("Received unknown websocket frame #{inspect(frame)}") :keep_state_and_data end @@ -323,8 +312,6 @@ defmodule Late do end defp send_frame(state, frame) do - Logger.debug("Sending #{inspect(frame)}") - with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame), state = put_in(state.websocket, websocket), {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do diff --git a/test/late_test.exs b/test/late_test.exs index 11dfbb8..f0f7868 100644 --- a/test/late_test.exs +++ b/test/late_test.exs @@ -38,20 +38,31 @@ defmodule LateTest do {:reply, {:text, msg}, state} end - @impl true + def handle_call(:kill_server_worker, from, state) do + {:reply, {:text, "kill"}, state |> Map.put(:from, from)} + end + def handle_call(:disconnect, from, state) do Late.reply(from, :ok) {:stop, state} end @impl true + def handle_in({:text, "Greetings!"} = msg, state) do + Process.send(state.test_pid, {:handle_in, msg}, []) + {:ok, state} + end + def handle_in({:text, "bye" <> _text}, state) do {:stop, state} end @impl true - def handle_info({:disconnect, :normal_close}, state), do: {:reply, {:text, "normal_close"}, state} - def handle_info({:disconnect, :error_close}, state), do: {:reply, {:text, "error_close"}, state} + def handle_info({:disconnect, :normal_close}, state), + do: {:reply, {:text, "normal_close"}, state} + + def handle_info({:disconnect, :error_close}, state), + do: {:reply, {:text, "error_close"}, state} def handle_info(message, state) do Logger.info("Handle in 2 #{inspect(message)}") @@ -59,7 +70,7 @@ defmodule LateTest do end end - test "connects to a server and receives a message" do + test "connects to a server and send and receive a message" do client_pid = :erlang.term_to_binary(self()) |> Base.encode64() url = @@ -74,6 +85,7 @@ defmodule LateTest do debug: [:trace] ) + assert_receive {:handle_in, {:text, "Greetings!"}} assert_receive {:server_msg, {:text, "hi"}} end @@ -95,8 +107,6 @@ defmodule LateTest do assert_receive {:server_msg, {:text, "hi"}} end - test "handles call" - test "can disconnect" do client_pid = :erlang.term_to_binary(self()) |> Base.encode64() @@ -173,23 +183,18 @@ defmodule LateTest do ) end - test "crashes when connecting to host that offers no websocket" do - Process.flag(:trap_exit, true) - - {:ok, pid} = + test "does not start when connecting to host that offers no websocket" do + {:error, %Mint.WebSocket.UpgradeFailureError{}} = Late.start_link( TestConnection, [test_pid: self()], url: "ws://localhost:8888/text", debug: [:trace] ) - assert_receive {:EXIT, ^pid, %Mint.WebSocket.UpgradeFailureError{}} end - test "crashes when connection times out" do - Process.flag(:trap_exit, true) - - {:ok, pid} = + test "does not start when connection times out" do + {:error, %Mint.TransportError{reason: :timeout}} = Late.start_link( TestConnection, [test_pid: self()], @@ -197,8 +202,25 @@ defmodule LateTest do connect_timeout: 100, debug: [:trace] ) - Process.sleep(200) - assert_receive {:EXIT, ^pid, :connect_timeout} + end + + test "exits when the connection is closed" do + client_pid = :erlang.term_to_binary(self()) |> Base.encode64() + + url = + URI.parse("ws://localhost:8888/websocket") + |> URI.append_query(URI.encode_query(%{test_pid: client_pid})) + + {:ok, pid} = + Late.start_link( + TestConnection, + [test_pid: self()], + url: URI.to_string(url), + debug: [:trace] + ) + + {%Mint.TransportError{reason: :closed}, _} = + catch_exit(Late.call(pid, :kill_server_worker)) |> dbg() end end end diff --git a/test/support/websocket_handler.ex b/test/support/websocket_handler.ex index 60335f2..b752f9f 100644 --- a/test/support/websocket_handler.ex +++ b/test/support/websocket_handler.ex @@ -2,13 +2,18 @@ defmodule Late.WebsocketHandler do @moduledoc false def init(data) do - {:ok, data} + {:push, [{:text, "Greetings!"}, {:ping, <<>>}], data} end def handle_control({message, [opcode: opcode]}, state) do {:push, [{opcode, message}], state} end + def handle_in({"kill", [opcode: :text]}, state) do + Process.exit(self(), :kill) + {:ok, state} + end + def handle_in({"normal_close", [opcode: :text]}, state) do {:stop, :normal, {1000, "Bye!"}, state} end