diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index 1d88956..1f8fb09 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -30,7 +30,7 @@ defmodule ExICE.ICEAgent do For exact meaning refer to the W3C WebRTC standard, sec. 5.6.4. """ @type connection_state_change() :: - {:connection_state_change, :checking | :connected | :completed | :failed} + {:connection_state_change, :checking | :connected | :completed | :failed | :closed} @typedoc """ Messages sent by the ExICE. @@ -277,7 +277,21 @@ defmodule ExICE.ICEAgent do end @doc """ - Stops ICE agent and all of its sockets. + Irreversibly closes ICE agent and all of its sockets but does not terminate its process. + + The only practical thing that you can do after closing an agent, + is to get its stats using `get_stats/1`. + Most of the other functions have no effect. + + To terminate ICE agent process, see `stop/1`. + """ + @spec close(pid()) :: :ok + def close(ice_agent) do + GenServer.call(ice_agent, :close) + end + + @doc """ + Stops ICE agent process. """ @spec stop(pid()) :: :ok def stop(ice_agent) do @@ -346,6 +360,12 @@ defmodule ExICE.ICEAgent do {:reply, stats, state} end + @impl true + def handle_call(:close, _from, state) do + ice_agent = ExICE.Priv.ICEAgent.close(state.ice_agent) + {:reply, :ok, %{state | ice_agent: ice_agent}} + end + @impl true def handle_cast({:set_role, role}, state) do ice_agent = ExICE.Priv.ICEAgent.set_role(state.ice_agent, role) diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index a37284d..a7e214b 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -240,6 +240,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec set_role(t(), ExICE.ICEAgent.role()) :: t() + def set_role(%__MODULE__{state: :closed} = ice_agent, _) do + Logger.debug("Tried to set role in closed state. Ignoring.") + ice_agent + end + def set_role(%__MODULE__{role: nil} = ice_agent, role) do %__MODULE__{ice_agent | role: role} end @@ -250,8 +255,9 @@ defmodule ExICE.Priv.ICEAgent do end @spec set_remote_credentials(t(), binary(), binary()) :: t() - def set_remote_credentials(%__MODULE__{state: :failed} = ice_agent, _, _) do - Logger.debug("Tried to set remote credentials in failed state. ICE restart needed. Ignoring.") + def set_remote_credentials(%__MODULE__{state: state} = ice_agent, _, _) + when state in [:failed, :closed] do + Logger.debug("Tried to set remote credentials in state #{state}. Ignoring.") ice_agent end @@ -286,8 +292,8 @@ defmodule ExICE.Priv.ICEAgent do end @spec gather_candidates(t()) :: t() - def gather_candidates(%__MODULE__{state: :failed} = ice_agent) do - Logger.warning("Can't gather candidates in state failed. ICE restart needed. Ignoring.") + def gather_candidates(%__MODULE__{state: state} = ice_agent) when state in [:failed, :closed] do + Logger.warning("Can't gather candidates in state #{state}. Ignoring.") ice_agent end @@ -364,9 +370,10 @@ defmodule ExICE.Priv.ICEAgent do end @spec add_remote_candidate(t(), Candidate.t()) :: t() - def add_remote_candidate(%__MODULE__{state: :failed} = ice_agent, _) do + def add_remote_candidate(%__MODULE__{state: state} = ice_agent, _) + when state in [:failed, :closed] do # Completed state will be caught by the next clause - Logger.debug("Can't add remote candidate in state failed. ICE restart needed. Ignoring.") + Logger.debug("Can't add remote candidate in state #{state}. Ignoring.") ice_agent end @@ -455,8 +462,8 @@ defmodule ExICE.Priv.ICEAgent do end @spec end_of_candidates(t()) :: t() - def end_of_candidates(%__MODULE__{state: :failed} = ice_agent) do - Logger.debug("Can't set end-of-candidates flag in state failed. Ignoring.") + def end_of_candidates(%__MODULE__{state: state} = ice_agent) when state in [:failed, :closed] do + Logger.debug("Can't set end-of-candidates flag in state #{state}. Ignoring.") ice_agent end @@ -537,12 +544,22 @@ defmodule ExICE.Priv.ICEAgent do end @spec restart(t()) :: t() + def restart(%__MODULE__{state: :closed} = ice_agent) do + Logger.debug("Can't restart ICE in state closed. Ignoring.") + ice_agent + end + def restart(ice_agent) do Logger.debug("Restarting ICE") do_restart(ice_agent) end @spec handle_ta_timeout(t()) :: t() + def handle_ta_timeout(%__MODULE__{state: :closed} = ice_agent) do + Logger.debug("Ta timer fired in closed state. Ignoring.") + ice_agent + end + def handle_ta_timeout(%__MODULE__{state: state} = ice_agent) when state in [:completed, :failed] do Logger.warning(""" @@ -694,6 +711,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec handle_tr_rtx_timeout(t(), integer()) :: t() + def handle_tr_rtx_timeout(%__MODULE__{state: :closed} = ice_agent, _) do + Logger.debug("Transaction rtx timer fired in state closed. Ignoring.") + ice_agent + end + def handle_tr_rtx_timeout(ice_agent, t_id) when is_map_key(ice_agent.conn_checks, t_id) do # Mark transaction id as ready to be retransmitted. # We will do this in handle_ta_timeout as it has to be paced. @@ -725,8 +747,9 @@ defmodule ExICE.Priv.ICEAgent do end @spec handle_eoc_timeout(t()) :: t() - def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do - Logger.debug("EOC timer fired but we are in the failed state. Ignoring.") + def handle_eoc_timeout(%__MODULE__{state: state} = ice_agent) + when state in [:failed, :closed] do + Logger.debug("EOC timer fired but we are in the #{state} state. Ignoring.") %{ice_agent | eoc_timer: nil} end @@ -742,6 +765,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec handle_pair_timeout(t()) :: t() + def handle_pair_timeout(%__MODULE__{state: :closed} = ice_agent) do + Logger.debug("Pair timer fired in closed state. Ignoring.") + ice_agent + end + def handle_pair_timeout(ice_agent) do start_pair_timer() @@ -792,6 +820,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec handle_keepalive_timeout(t(), integer()) :: t() + def handle_keepalive_timeout(%__MODULE__{state: :closed} = ice_agent, _) do + Logger.debug("Keepalive timer fired in closed state. Ignoring.") + ice_agent + end + def handle_keepalive_timeout(%__MODULE__{selected_pair_id: id} = ice_agent, id) do # if pair was selected, send keepalives only on that pair s_pair = Map.fetch!(ice_agent.checklist, id) @@ -842,7 +875,8 @@ defmodule ExICE.Priv.ICEAgent do :inet.port_number(), binary() ) :: t() - def handle_udp(%{state: :failed} = ice_agent, _socket, _src_ip, _src_port, _packet) do + def handle_udp(%{state: state} = ice_agent, _socket, _src_ip, _src_port, _packet) + when state in [:failed, :closed] do ice_agent end @@ -868,6 +902,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec handle_ex_turn_msg(t(), reference(), ExTURN.Client.notification_message()) :: t() + def handle_ex_turn_msg(%__MODULE__{state: :closed} = ice_agent, _, _) do + Logger.debug("Received ex_turn message in closed state. Ignoring.") + ice_agent + end + def handle_ex_turn_msg(ice_agent, client_ref, msg) do tr_id_tr = find_gathering_transaction(ice_agent.gathering_transactions, client_ref) @@ -919,6 +958,18 @@ defmodule ExICE.Priv.ICEAgent do end end + @spec close(t()) :: t() + def close(%__MODULE__{state: :closed} = ice_agent) do + ice_agent + end + + def close(%__MODULE__{} = ice_agent) do + ice_agent.sockets + |> Enum.reduce(ice_agent, fn socket, ice_agent -> close_socket(ice_agent, socket) end) + |> change_gathering_state(:complete, notify: false) + |> change_connection_state(:closed, notify: false) + end + ## PRIV API defp create_srflx_gathering_transactions(stun_servers, sockets) do @@ -2290,6 +2341,17 @@ defmodule ExICE.Priv.ICEAgent do end defp close_socket(ice_agent, socket) do + # Use sockname/1 to determine if a socket is still open. + # Alternatively, we could create a callback for `:inet.info/1`, + # but it's return type is not standardized - sometimes it's %{states: [:closed]}, + # some other time %{rstates: [:closed], wstates: [:closed]}. + case ice_agent.transport_module.sockname(socket) do + {:error, :closed} -> ice_agent + _ -> do_close_socket(ice_agent, socket) + end + end + + defp do_close_socket(ice_agent, socket) do Logger.debug("Closing socket: #{inspect(socket)}") ice_agent = @@ -2308,9 +2370,21 @@ defmodule ExICE.Priv.ICEAgent do tr_rtx = ice_agent.tr_rtx -- Map.keys(removed_gathering_transactions) + :ok = ice_agent.transport_module.close(socket) + :ok = flush_socket_msg(socket) + %{ice_agent | tr_rtx: tr_rtx, gathering_transactions: gathering_transactions} end + defp flush_socket_msg(socket) do + receive do + {:udp, ^socket, _src_ip, _src_port, _packet} -> + flush_socket_msg(socket) + after + 0 -> :ok + end + end + defp maybe_nominate(ice_agent) do if time_to_nominate?(ice_agent) do Logger.debug("Time to nominate a pair! Looking for a best valid pair...") @@ -2367,6 +2441,7 @@ defmodule ExICE.Priv.ICEAgent do # clearing the whole state anyway, we can close the socket manually Logger.debug("Closing socket: #{inspect(ip)}:#{port}.") :ok = ice_agent.transport_module.close(socket) + :ok = flush_socket_msg(socket) {:error, :closed} -> # socket already closed @@ -2497,7 +2572,6 @@ defmodule ExICE.Priv.ICEAgent do end defp generate_credentials() do - # TODO am I using Base.encode64 correctly? ufrag = :crypto.strong_rand_bytes(3) |> Base.encode64() pwd = :crypto.strong_rand_bytes(16) |> Base.encode64() {ufrag, pwd} @@ -2523,9 +2597,13 @@ defmodule ExICE.Priv.ICEAgent do end end - defp change_gathering_state(ice_agent, new_gathering_state) do - Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}") - notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state}) + defp change_gathering_state(ice_agent, new_gathering_state, opts \\ []) do + Logger.debug("Gatering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}") + + if opts[:notify] != false do + notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state}) + end + %__MODULE__{ice_agent | gathering_state: new_gathering_state} end @@ -2550,8 +2628,10 @@ defmodule ExICE.Priv.ICEAgent do end @doc false - @spec change_connection_state(t(), atom()) :: t() - def change_connection_state(ice_agent, :failed) do + @spec change_connection_state(t(), atom(), Keyword.t()) :: t() + def change_connection_state(ice_agent, new_state, opts \\ []) + + def change_connection_state(ice_agent, :failed, opts) do ice_agent = Enum.reduce(ice_agent.sockets, ice_agent, fn socket, ice_agent -> close_socket(ice_agent, socket) @@ -2599,10 +2679,10 @@ defmodule ExICE.Priv.ICEAgent do nominating?: {false, nil} } |> disable_timer() - |> do_change_connection_state(:failed) + |> do_change_connection_state(:failed, opts) end - def change_connection_state(ice_agent, :completed) do + def change_connection_state(ice_agent, :completed, opts) do selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id) succeeded_pair = Map.fetch!(ice_agent.checklist, selected_pair.succeeded_pair_id) @@ -2632,16 +2712,20 @@ defmodule ExICE.Priv.ICEAgent do end end) - do_change_connection_state(ice_agent, :completed) + do_change_connection_state(ice_agent, :completed, opts) end - def change_connection_state(ice_agent, new_conn_state) do - do_change_connection_state(ice_agent, new_conn_state) + def change_connection_state(ice_agent, new_conn_state, opts) do + do_change_connection_state(ice_agent, new_conn_state, opts) end - defp do_change_connection_state(ice_agent, new_conn_state) do + defp do_change_connection_state(ice_agent, new_conn_state, opts) do Logger.debug("Connection state change: #{ice_agent.state} -> #{new_conn_state}") - notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state}) + + if opts[:notify] != false do + notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state}) + end + %__MODULE__{ice_agent | state: new_conn_state} end diff --git a/test/integration/p2p_test.exs b/test/integration/p2p_test.exs index a4e7035..d1cd217 100644 --- a/test/integration/p2p_test.exs +++ b/test/integration/p2p_test.exs @@ -79,6 +79,12 @@ defmodule ExICE.Integration.P2PTest do assert File.read!(Path.join([tmp_dir, "a2_restart_recv_data"])) == File.read!("./test/fixtures/lotr.txt") + + assert :ok = ICEAgent.close(agent1) + assert :ok = ICEAgent.close(agent2) + + assert :ok = ICEAgent.stop(agent1) + assert :ok = ICEAgent.stop(agent2) end @tag :tmp_dir diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index 9b60a4d..9fb8863 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -345,6 +345,64 @@ defmodule ExICE.Priv.ICEAgentTest do assert new_ice_agent == ice_agent end + test "close/1" do + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlling, + if_discovery_module: IfDiscovery.Mock, + transport_module: Transport.Mock + ) + |> ICEAgent.set_remote_credentials("remoteufrag", "remotepwd") + |> ICEAgent.gather_candidates() + |> ICEAgent.add_remote_candidate(@remote_cand) + + assert_receive {:ex_ice, _pid, {:gathering_state_change, :complete}} + + ice_agent = ICEAgent.close(ice_agent) + + assert ice_agent.state == :closed + assert ice_agent.gathering_state == :complete + assert [%{state: :failed} = pair] = Map.values(ice_agent.checklist) + assert [%{base: %{closed?: true}}] = Map.values(ice_agent.local_cands) + # make sure that sockets and remote cands were not cleared + assert [_remote_cand] = Map.values(ice_agent.remote_cands) + assert [socket] = ice_agent.sockets + + # check stats + stats = ICEAgent.get_stats(ice_agent) + assert stats.local_candidates != %{} + assert stats.remote_candidates != %{} + assert stats.candidate_pairs != %{} + assert stats.state == :closed + + refute_received {:ex_ice, _pid, {:connection_state_change, :closed}} + refute_received {:ex_ice, _pid, {:gathering_state_change, :complete}} + + # assert these functions are ignored + assert ice_agent == ICEAgent.set_role(ice_agent, :controlled) + assert ice_agent == ICEAgent.set_remote_credentials(ice_agent, "remoteufrag2", "remotepwd2") + assert ice_agent == ICEAgent.gather_candidates(ice_agent) + assert ice_agent == ICEAgent.add_remote_candidate(ice_agent, @remote_cand2) + assert ice_agent == ICEAgent.end_of_candidates(ice_agent) + assert ice_agent == ICEAgent.send_data(ice_agent, <<0, 1, 2>>) + assert ice_agent == ICEAgent.restart(ice_agent) + assert ice_agent == ICEAgent.handle_ta_timeout(ice_agent) + # only eoc_timer should change to nil + assert %{ice_agent | eoc_timer: nil} == ICEAgent.handle_eoc_timeout(ice_agent) + assert ice_agent == ICEAgent.handle_pair_timeout(ice_agent) + assert ice_agent == ICEAgent.handle_keepalive_timeout(ice_agent, pair.id) + + assert ice_agent == + ICEAgent.handle_udp( + ice_agent, + socket, + @remote_cand.address, + @remote_cand.port, + "some data" + ) + end + test "doesn't add pairs with srflx local candidate to the checklist" do ice_agent = ICEAgent.new(