Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions lib/ex_ice/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule ExICE.ICEAgent do
For exact meaning refer to the W3C WebRTC standard, sec. 5.6.4.
"""
@type connection_state_change() ::
{:connection_state_change, :checking | :connected | :completed | :failed}
{:connection_state_change, :checking | :connected | :completed | :failed | :closed}

@typedoc """
Messages sent by the ExICE.
Expand Down Expand Up @@ -277,7 +277,21 @@ defmodule ExICE.ICEAgent do
end

@doc """
Stops ICE agent and all of its sockets.
Irreversibly closes ICE agent and all of its sockets but does not terminate its process.

The only practical thing that you can do after closing an agent,
is to get its stats using `get_stats/1`.
Most of the other functions have no effect.

To terminate ICE agent process, see `stop/1`.
"""
@spec close(pid()) :: :ok
def close(ice_agent) do
GenServer.call(ice_agent, :close)
end

@doc """
Stops ICE agent process.
"""
@spec stop(pid()) :: :ok
def stop(ice_agent) do
Expand Down Expand Up @@ -346,6 +360,12 @@ defmodule ExICE.ICEAgent do
{:reply, stats, state}
end

@impl true
def handle_call(:close, _from, state) do
ice_agent = ExICE.Priv.ICEAgent.close(state.ice_agent)
{:reply, :ok, %{state | ice_agent: ice_agent}}
end

@impl true
def handle_cast({:set_role, role}, state) do
ice_agent = ExICE.Priv.ICEAgent.set_role(state.ice_agent, role)
Expand Down
132 changes: 108 additions & 24 deletions lib/ex_ice/priv/ice_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec set_role(t(), ExICE.ICEAgent.role()) :: t()
def set_role(%__MODULE__{state: :closed} = ice_agent, _) do
Logger.debug("Tried to set role in closed state. Ignoring.")
ice_agent
end

def set_role(%__MODULE__{role: nil} = ice_agent, role) do
%__MODULE__{ice_agent | role: role}
end
Expand All @@ -250,8 +255,9 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec set_remote_credentials(t(), binary(), binary()) :: t()
def set_remote_credentials(%__MODULE__{state: :failed} = ice_agent, _, _) do
Logger.debug("Tried to set remote credentials in failed state. ICE restart needed. Ignoring.")
def set_remote_credentials(%__MODULE__{state: state} = ice_agent, _, _)
when state in [:failed, :closed] do
Logger.debug("Tried to set remote credentials in state #{state}. Ignoring.")
ice_agent
end

Expand Down Expand Up @@ -286,8 +292,8 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec gather_candidates(t()) :: t()
def gather_candidates(%__MODULE__{state: :failed} = ice_agent) do
Logger.warning("Can't gather candidates in state failed. ICE restart needed. Ignoring.")
def gather_candidates(%__MODULE__{state: state} = ice_agent) when state in [:failed, :closed] do
Logger.warning("Can't gather candidates in state #{state}. Ignoring.")
ice_agent
end

Expand Down Expand Up @@ -364,9 +370,10 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec add_remote_candidate(t(), Candidate.t()) :: t()
def add_remote_candidate(%__MODULE__{state: :failed} = ice_agent, _) do
def add_remote_candidate(%__MODULE__{state: state} = ice_agent, _)
when state in [:failed, :closed] do
# Completed state will be caught by the next clause
Logger.debug("Can't add remote candidate in state failed. ICE restart needed. Ignoring.")
Logger.debug("Can't add remote candidate in state #{state}. Ignoring.")
ice_agent
end

Expand Down Expand Up @@ -455,8 +462,8 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec end_of_candidates(t()) :: t()
def end_of_candidates(%__MODULE__{state: :failed} = ice_agent) do
Logger.debug("Can't set end-of-candidates flag in state failed. Ignoring.")
def end_of_candidates(%__MODULE__{state: state} = ice_agent) when state in [:failed, :closed] do
Logger.debug("Can't set end-of-candidates flag in state #{state}. Ignoring.")
ice_agent
end

Expand Down Expand Up @@ -537,12 +544,22 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec restart(t()) :: t()
def restart(%__MODULE__{state: :closed} = ice_agent) do
Logger.debug("Can't restart ICE in state closed. Ignoring.")
ice_agent
end

def restart(ice_agent) do
Logger.debug("Restarting ICE")
do_restart(ice_agent)
end

@spec handle_ta_timeout(t()) :: t()
def handle_ta_timeout(%__MODULE__{state: :closed} = ice_agent) do
Logger.debug("Ta timer fired in closed state. Ignoring.")
ice_agent
end

def handle_ta_timeout(%__MODULE__{state: state} = ice_agent)
when state in [:completed, :failed] do
Logger.warning("""
Expand Down Expand Up @@ -694,6 +711,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec handle_tr_rtx_timeout(t(), integer()) :: t()
def handle_tr_rtx_timeout(%__MODULE__{state: :closed} = ice_agent, _) do
Logger.debug("Transaction rtx timer fired in state closed. Ignoring.")
ice_agent
end

def handle_tr_rtx_timeout(ice_agent, t_id) when is_map_key(ice_agent.conn_checks, t_id) do
# Mark transaction id as ready to be retransmitted.
# We will do this in handle_ta_timeout as it has to be paced.
Expand Down Expand Up @@ -725,8 +747,9 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec handle_eoc_timeout(t()) :: t()
def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do
Logger.debug("EOC timer fired but we are in the failed state. Ignoring.")
def handle_eoc_timeout(%__MODULE__{state: state} = ice_agent)
when state in [:failed, :closed] do
Logger.debug("EOC timer fired but we are in the #{state} state. Ignoring.")
%{ice_agent | eoc_timer: nil}
end

Expand All @@ -742,6 +765,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec handle_pair_timeout(t()) :: t()
def handle_pair_timeout(%__MODULE__{state: :closed} = ice_agent) do
Logger.debug("Pair timer fired in closed state. Ignoring.")
ice_agent
end

def handle_pair_timeout(ice_agent) do
start_pair_timer()

Expand Down Expand Up @@ -792,6 +820,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec handle_keepalive_timeout(t(), integer()) :: t()
def handle_keepalive_timeout(%__MODULE__{state: :closed} = ice_agent, _) do
Logger.debug("Keepalive timer fired in closed state. Ignoring.")
ice_agent
end

def handle_keepalive_timeout(%__MODULE__{selected_pair_id: id} = ice_agent, id) do
# if pair was selected, send keepalives only on that pair
s_pair = Map.fetch!(ice_agent.checklist, id)
Expand Down Expand Up @@ -842,7 +875,8 @@ defmodule ExICE.Priv.ICEAgent do
:inet.port_number(),
binary()
) :: t()
def handle_udp(%{state: :failed} = ice_agent, _socket, _src_ip, _src_port, _packet) do
def handle_udp(%{state: state} = ice_agent, _socket, _src_ip, _src_port, _packet)
when state in [:failed, :closed] do
ice_agent
end

Expand All @@ -868,6 +902,11 @@ defmodule ExICE.Priv.ICEAgent do
end

@spec handle_ex_turn_msg(t(), reference(), ExTURN.Client.notification_message()) :: t()
def handle_ex_turn_msg(%__MODULE__{state: :closed} = ice_agent, _, _) do
Logger.debug("Received ex_turn message in closed state. Ignoring.")
ice_agent
end

def handle_ex_turn_msg(ice_agent, client_ref, msg) do
tr_id_tr = find_gathering_transaction(ice_agent.gathering_transactions, client_ref)

Expand Down Expand Up @@ -919,6 +958,18 @@ defmodule ExICE.Priv.ICEAgent do
end
end

@spec close(t()) :: t()
def close(%__MODULE__{state: :closed} = ice_agent) do
ice_agent
end

def close(%__MODULE__{} = ice_agent) do
ice_agent.sockets
|> Enum.reduce(ice_agent, fn socket, ice_agent -> close_socket(ice_agent, socket) end)
|> change_gathering_state(:complete, notify: false)
|> change_connection_state(:closed, notify: false)
end

## PRIV API

defp create_srflx_gathering_transactions(stun_servers, sockets) do
Expand Down Expand Up @@ -2290,6 +2341,17 @@ defmodule ExICE.Priv.ICEAgent do
end

defp close_socket(ice_agent, socket) do
# Use sockname/1 to determine if a socket is still open.
# Alternatively, we could create a callback for `:inet.info/1`,
# but it's return type is not standardized - sometimes it's %{states: [:closed]},
# some other time %{rstates: [:closed], wstates: [:closed]}.
case ice_agent.transport_module.sockname(socket) do
{:error, :closed} -> ice_agent
_ -> do_close_socket(ice_agent, socket)
end
end

defp do_close_socket(ice_agent, socket) do
Logger.debug("Closing socket: #{inspect(socket)}")

ice_agent =
Expand All @@ -2308,9 +2370,21 @@ defmodule ExICE.Priv.ICEAgent do

tr_rtx = ice_agent.tr_rtx -- Map.keys(removed_gathering_transactions)

:ok = ice_agent.transport_module.close(socket)
:ok = flush_socket_msg(socket)

%{ice_agent | tr_rtx: tr_rtx, gathering_transactions: gathering_transactions}
end

defp flush_socket_msg(socket) do
receive do
{:udp, ^socket, _src_ip, _src_port, _packet} ->
flush_socket_msg(socket)
after
0 -> :ok
end
end

defp maybe_nominate(ice_agent) do
if time_to_nominate?(ice_agent) do
Logger.debug("Time to nominate a pair! Looking for a best valid pair...")
Expand Down Expand Up @@ -2367,6 +2441,7 @@ defmodule ExICE.Priv.ICEAgent do
# clearing the whole state anyway, we can close the socket manually
Logger.debug("Closing socket: #{inspect(ip)}:#{port}.")
:ok = ice_agent.transport_module.close(socket)
:ok = flush_socket_msg(socket)

{:error, :closed} ->
# socket already closed
Expand Down Expand Up @@ -2497,7 +2572,6 @@ defmodule ExICE.Priv.ICEAgent do
end

defp generate_credentials() do
# TODO am I using Base.encode64 correctly?
ufrag = :crypto.strong_rand_bytes(3) |> Base.encode64()
pwd = :crypto.strong_rand_bytes(16) |> Base.encode64()
{ufrag, pwd}
Expand All @@ -2523,9 +2597,13 @@ defmodule ExICE.Priv.ICEAgent do
end
end

defp change_gathering_state(ice_agent, new_gathering_state) do
Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}")
notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state})
defp change_gathering_state(ice_agent, new_gathering_state, opts \\ []) do
Logger.debug("Gatering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}")

if opts[:notify] != false do
notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state})
end

%__MODULE__{ice_agent | gathering_state: new_gathering_state}
end

Expand All @@ -2550,8 +2628,10 @@ defmodule ExICE.Priv.ICEAgent do
end

@doc false
@spec change_connection_state(t(), atom()) :: t()
def change_connection_state(ice_agent, :failed) do
@spec change_connection_state(t(), atom(), Keyword.t()) :: t()
def change_connection_state(ice_agent, new_state, opts \\ [])

def change_connection_state(ice_agent, :failed, opts) do
ice_agent =
Enum.reduce(ice_agent.sockets, ice_agent, fn socket, ice_agent ->
close_socket(ice_agent, socket)
Expand Down Expand Up @@ -2599,10 +2679,10 @@ defmodule ExICE.Priv.ICEAgent do
nominating?: {false, nil}
}
|> disable_timer()
|> do_change_connection_state(:failed)
|> do_change_connection_state(:failed, opts)
end

def change_connection_state(ice_agent, :completed) do
def change_connection_state(ice_agent, :completed, opts) do
selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id)
succeeded_pair = Map.fetch!(ice_agent.checklist, selected_pair.succeeded_pair_id)

Expand Down Expand Up @@ -2632,16 +2712,20 @@ defmodule ExICE.Priv.ICEAgent do
end
end)

do_change_connection_state(ice_agent, :completed)
do_change_connection_state(ice_agent, :completed, opts)
end

def change_connection_state(ice_agent, new_conn_state) do
do_change_connection_state(ice_agent, new_conn_state)
def change_connection_state(ice_agent, new_conn_state, opts) do
do_change_connection_state(ice_agent, new_conn_state, opts)
end

defp do_change_connection_state(ice_agent, new_conn_state) do
defp do_change_connection_state(ice_agent, new_conn_state, opts) do
Logger.debug("Connection state change: #{ice_agent.state} -> #{new_conn_state}")
notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state})

if opts[:notify] != false do
notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state})
end

%__MODULE__{ice_agent | state: new_conn_state}
end

Expand Down
6 changes: 6 additions & 0 deletions test/integration/p2p_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ defmodule ExICE.Integration.P2PTest do

assert File.read!(Path.join([tmp_dir, "a2_restart_recv_data"])) ==
File.read!("./test/fixtures/lotr.txt")

assert :ok = ICEAgent.close(agent1)
assert :ok = ICEAgent.close(agent2)

assert :ok = ICEAgent.stop(agent1)
assert :ok = ICEAgent.stop(agent2)
end

@tag :tmp_dir
Expand Down
Loading
Loading