Skip to content

Commit

Permalink
Don't finish process init until connected
Browse files Browse the repository at this point in the history
  • Loading branch information
hkrutzer committed Sep 22, 2024
1 parent 5504ef9 commit 55ef68d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 76 deletions.
103 changes: 45 additions & 58 deletions lib/late.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ defmodule Late do
:websocket,
:request_ref,
:resp_headers,
:connect_buffer,
:state
]

Expand Down Expand Up @@ -145,6 +144,7 @@ defmodule Late do
case mod.init(args) do
{:ok, mod_state} ->
mint_opts = Keyword.get(opts, :mint_opts, [])
mint_opts = Keyword.put(mint_opts, :mode, :passive)
mint_websocket_opts = Keyword.get(opts, :mint_opts, [])
uri = URI.parse(Keyword.get(opts, :url))
headers = Keyword.get(opts, :headers, [])
Expand All @@ -171,81 +171,56 @@ defmodule Late do
# TODO Make HTTP1 configurable
with {:ok, conn} <- Mint.HTTP1.connect(http_scheme, uri.host, uri.port, mint_opts),
{:ok, conn, ref} <-
Mint.WebSocket.upgrade(ws_scheme, conn, path, headers, mint_websocket_opts) do
Mint.WebSocket.upgrade(ws_scheme, conn, path, headers, mint_websocket_opts),
{:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers} | rest]} <-
Mint.HTTP.recv(conn, 0, connect_timeout),
{:ok, conn} <- Mint.HTTP.set_mode(conn, :active),
{:ok, conn, websocket} <- Mint.WebSocket.new(conn, ref, status, resp_headers),
# In some cases the data from recv might already contain
# the initial frames, so we decode those.
# To similate this happening, add a delay after Mint.WebSocket.upgrade
{:ok, websocket, initial_data} <- maybe_decode_initial_data(websocket, rest) do
initial_frames =
Enum.map(initial_data, &{:next_event, :internal, {:handle_frame, &1}})

state = %__MODULE__{
request_ref: ref,
conn: conn,
websocket: websocket,
resp_headers: resp_headers,
request_ref: ref,
state: {mod, mod_state}
}

{:ok, :connecting, state, {{:timeout, :connect_timeout}, connect_timeout, nil}}
{:ok, :connected, state,
[{:next_event, :internal, :maybe_handle_connect}] ++ initial_frames}
else
{:error, reason} ->
{:error, reason}

{:error, conn, reason, _response} ->
# Mint.HTTP.recv error
Mint.HTTP.close(conn)
{:error, reason}

{:error, conn, reason} ->
Mint.HTTP.close(conn)
{:error, reason}
end
end
end

## State functions
def connecting({:timeout, :connect_timeout}, _from, state) do
Mint.HTTP.close(state.conn)
{:stop, :connect_timeout, state}
end

def connecting(:info, message, %{conn: conn} = state)
when Mint.HTTP.is_connection_message(state.conn, message) do
ref = state.request_ref

{:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers} | rest]} =
Mint.WebSocket.stream(conn, message)

buffer =
case rest do
[{:data, ^ref, data}, {:done, ^ref}] -> data
[{:done, ^ref}] -> nil
end

case Mint.WebSocket.new(conn, ref, status, resp_headers) do
{:ok, conn, websocket} ->
state = %{
state
| conn: conn,
websocket: websocket,
resp_headers: resp_headers,
connect_buffer: buffer
}

{:next_state, :connected, state,
[
# Clear connection timeout first, then trigger event to call handle_connect
{{:timeout, :connect_timeout}, :cancel},
{:next_event, :internal, :maybe_handle_connect}
]}

{:error, conn, reason} ->
{:stop, reason, %{state | conn: conn}}
end
end

def connecting(:info, _message, _state), do: {:keep_state_and_data, :postpone}
def connecting({:call, _from}, _msg, _state), do: {:keep_state_and_data, :postpone}

def maybe_prepend_buffer(%__MODULE__{connect_buffer: nil} = state, data), do: {:ok, state, data}
defp maybe_decode_initial_data(websocket, [{:done, _ref}]), do: {:ok, websocket, []}

def maybe_prepend_buffer(%__MODULE__{connect_buffer: buffer} = state, data) do
{:ok, %{state | connect_buffer: nil}, buffer <> data}
defp maybe_decode_initial_data(websocket, [{:data, ref, data}, {:done, ref}]) do
Mint.WebSocket.decode(websocket, data)
end

## State functions
def connected(:info, message, state)
when Mint.HTTP.is_connection_message(state.conn, message) do
ref = state.request_ref

with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(state.conn, message),
{:ok, state, data} <- maybe_prepend_buffer(state, data),
{:ok, websocket, frames} <- Mint.WebSocket.decode(state.websocket, data) do
# Send each frame as a new action
actions = Enum.map(frames, &{:next_event, :internal, {:handle_frame, &1}})
Expand All @@ -257,9 +232,19 @@ defmodule Late do

# Handle stream errors
{:error, conn, %Mint.TransportError{reason: :closed} = reason, _responses} ->
# TODO handle_disconnect
Logger.warning("Connection closed #{inspect(reason)}")
{:stop, reason, %{state | conn: conn}}
{mod, mod_state} = state.state
state = %{state | conn: conn}

# TODO Add reconnect
if function_exported?(mod, :handle_disconnect, 2) do
case apply(mod, :handle_disconnect, [reason, mod_state]) do
{:ok, mod_state} ->
state = %{state | state: {mod, mod_state}}
{:stop, reason, state}
end
else
{:stop, reason, state}
end

{:error, conn, reason, _responses} ->
{:stop, reason, %{state | conn: conn}}
Expand All @@ -282,6 +267,10 @@ defmodule Late do
{:keep_state, state}
end

def connected(:internal, {:handle_frame, {:pong, _data}}, state) do
{:keep_state, state}
end

def connected(:internal, {:handle_frame, {op, text}}, state) when op in [:text, :binary] do
{mod, mod_state} = state.state
maybe_handle(mod, :handle_in, [{op, text}, mod_state], state)
Expand All @@ -292,7 +281,7 @@ defmodule Late do
end

def connected(:internal, {:handle_frame, frame}, _state) do
Logger.error("Received unknown websocket frame #{frame}")
Logger.error("Received unknown websocket frame #{inspect(frame)}")
:keep_state_and_data
end

Expand Down Expand Up @@ -323,8 +312,6 @@ defmodule Late do
end

defp send_frame(state, frame) do
Logger.debug("Sending #{inspect(frame)}")

with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame),
state = put_in(state.websocket, websocket),
{:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do
Expand Down
56 changes: 39 additions & 17 deletions test/late_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,39 @@ defmodule LateTest do
{:reply, {:text, msg}, state}
end

@impl true
def handle_call(:kill_server_worker, from, state) do
{:reply, {:text, "kill"}, state |> Map.put(:from, from)}
end

def handle_call(:disconnect, from, state) do
Late.reply(from, :ok)
{:stop, state}
end

@impl true
def handle_in({:text, "Greetings!"} = msg, state) do
Process.send(state.test_pid, {:handle_in, msg}, [])
{:ok, state}
end

def handle_in({:text, "bye" <> _text}, state) do
{:stop, state}
end

@impl true
def handle_info({:disconnect, :normal_close}, state), do: {:reply, {:text, "normal_close"}, state}
def handle_info({:disconnect, :error_close}, state), do: {:reply, {:text, "error_close"}, state}
def handle_info({:disconnect, :normal_close}, state),
do: {:reply, {:text, "normal_close"}, state}

def handle_info({:disconnect, :error_close}, state),
do: {:reply, {:text, "error_close"}, state}

def handle_info(message, state) do
Logger.info("Handle in 2 #{inspect(message)}")
{:reply, [{:text, "message one"}, {:text, message}], state}
end
end

test "connects to a server and receives a message" do
test "connects to a server and send and receive a message" do
client_pid = :erlang.term_to_binary(self()) |> Base.encode64()

url =
Expand All @@ -74,6 +85,7 @@ defmodule LateTest do
debug: [:trace]
)

assert_receive {:handle_in, {:text, "Greetings!"}}
assert_receive {:server_msg, {:text, "hi"}}
end

Expand All @@ -95,8 +107,6 @@ defmodule LateTest do
assert_receive {:server_msg, {:text, "hi"}}
end

test "handles call"

test "can disconnect" do
client_pid = :erlang.term_to_binary(self()) |> Base.encode64()

Expand Down Expand Up @@ -173,32 +183,44 @@ defmodule LateTest do
)
end

test "crashes when connecting to host that offers no websocket" do
Process.flag(:trap_exit, true)

{:ok, pid} =
test "does not start when connecting to host that offers no websocket" do
{:error, %Mint.WebSocket.UpgradeFailureError{}} =
Late.start_link(
TestConnection,
[test_pid: self()],
url: "ws://localhost:8888/text",
debug: [:trace]
)
assert_receive {:EXIT, ^pid, %Mint.WebSocket.UpgradeFailureError{}}
end

test "crashes when connection times out" do
Process.flag(:trap_exit, true)

{:ok, pid} =
test "does not start when connection times out" do
{:error, %Mint.TransportError{reason: :timeout}} =
Late.start_link(
TestConnection,
[test_pid: self()],
url: "ws://localhost:8888/sleep",
connect_timeout: 100,
debug: [:trace]
)
Process.sleep(200)
assert_receive {:EXIT, ^pid, :connect_timeout}
end

test "exits when the connection is closed" do
client_pid = :erlang.term_to_binary(self()) |> Base.encode64()

url =
URI.parse("ws://localhost:8888/websocket")
|> URI.append_query(URI.encode_query(%{test_pid: client_pid}))

{:ok, pid} =
Late.start_link(
TestConnection,
[test_pid: self()],
url: URI.to_string(url),
debug: [:trace]
)

{%Mint.TransportError{reason: :closed}, _} =
catch_exit(Late.call(pid, :kill_server_worker)) |> dbg()
end
end
end
7 changes: 6 additions & 1 deletion test/support/websocket_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ defmodule Late.WebsocketHandler do
@moduledoc false

def init(data) do
{:ok, data}
{:push, [{:text, "Greetings!"}, {:ping, <<>>}], data}
end

def handle_control({message, [opcode: opcode]}, state) do
{:push, [{opcode, message}], state}
end

def handle_in({"kill", [opcode: :text]}, state) do
Process.exit(self(), :kill)
{:ok, state}
end

def handle_in({"normal_close", [opcode: :text]}, state) do
{:stop, :normal, {1000, "Bye!"}, state}
end
Expand Down

0 comments on commit 55ef68d

Please sign in to comment.