diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c623ef8e..4c15ae64 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -27,6 +27,9 @@ jobs: - name: Use OCaml ${{ matrix.ocaml-compiler }} uses: ocaml/setup-ocaml@v2 with: + opam-repositories: | + opam-repository-mingw: https://github.com/ocaml-opam/opam-repository-mingw.git#sunset + default: https://github.com/ocaml/opam-repository.git ocaml-compiler: ${{ matrix.ocaml-compiler }} - run: opam install . --deps-only --with-test diff --git a/src/stack-unix/tcp_socket.ml b/src/stack-unix/tcp_socket.ml index 4d3adda0..70d3b5a3 100644 --- a/src/stack-unix/tcp_socket.ml +++ b/src/stack-unix/tcp_socket.ml @@ -65,4 +65,12 @@ let close fd = | Unix.Unix_error (Unix.EBADF, _, _) -> Lwt.return_unit | e -> Lwt.fail e) +let shutdown fd mode = + let cmd = match mode with + | `read -> Lwt_unix.SHUTDOWN_RECEIVE + | `write -> Lwt_unix.SHUTDOWN_SEND + | `read_write -> Lwt_unix.SHUTDOWN_ALL + in + Lwt.return (Lwt_unix.shutdown fd cmd) + let input _t ~src:_ ~dst:_ _buf = Lwt.return_unit diff --git a/src/tcp/flow.ml b/src/tcp/flow.ml index 02b3932c..10107ba6 100644 --- a/src/tcp/flow.ml +++ b/src/tcp/flow.ml @@ -125,18 +125,22 @@ struct (Cstruct.create 0) (* Queue up an immediate close segment *) - let close pcb = - Log.debug (fun f -> f "Closing connection %a" WIRE.pp pcb.id); + let shutdown ctx pcb = + Log.debug (fun f -> f "%s connection %a" (match ctx with `Close -> "Closing" | `Shutdown -> "Shutting down") WIRE.pp pcb.id); match State.state pcb.state with | State.Established | State.Close_wait -> UTX.wait_for_flushed pcb.utx >>= fun () -> (let { wnd; _ } = pcb in STATE.tick pcb.state (State.Send_fin (Window.tx_nxt wnd)); - TXS.output ~flags:Segment.Fin pcb.txq (Cstruct.create 0) + TXS.output ~flags:Segment.Fin pcb.txq Cstruct.empty ) + | State.Closed | State.Syn_rcvd _ | State.Syn_sent _ when ctx = `Close -> + State.on_close pcb.state; + Lwt.return_unit | _ -> Log.debug (fun fmt -> - fmt "TX.close: close requested but no action needed, state=%a" State.pp pcb.state); + let msg = match ctx with `Close -> "close" | `Shutdown -> "shutdown" in + fmt "TX.%s: %s requested but no action needed, state=%a" msg msg State.pp pcb.state); Lwt.return_unit (* Thread that transmits ACKs in response to received packets, @@ -179,6 +183,10 @@ struct (* Coalesce any outstanding segments and retrieve ready segments *) RXS.input rxq parsed + let shutdown pcb = + User_buffer.Rx.remove_all pcb.urx; + User_buffer.Rx.add_r pcb.urx None + (* Thread that spools the data into an application receive buffer, and notifies the ACK subsystem that new data is here *) let thread pcb ~rx_data = @@ -199,8 +207,7 @@ struct | None -> (* don't send an ACK in this case; this already happened *) STATE.tick pcb.state State.Recv_fin; - User_buffer.Rx.add_r urx None >>= fun () -> - Lwt.return_unit + User_buffer.Rx.add_r urx None | Some data -> signal_ack winadv >>= fun () -> let rec queue = function @@ -632,8 +639,13 @@ struct let write_nodelay pcb data = writefn pcb (UTX.write_nodelay pcb.utx) data |> cast let writev_nodelay pcb data = iter_s (write_nodelay pcb) data |> cast - (* Close - no more will be written *) - let close pcb = Tx.close pcb + (* Close *) + let close pcb = Tx.shutdown `Close pcb + + let shutdown pcb mode = + let wr, rd = match mode with | `read -> false, true | `write -> true, false | `read_write -> true, true in + (if wr then Tx.shutdown `Shutdown pcb else Lwt.return_unit) >>= fun () -> + (if rd then Rx.shutdown pcb else Lwt.return_unit) let dst pcb = WIRE.dst pcb.id, WIRE.dst_port pcb.id diff --git a/src/tcp/state.ml b/src/tcp/state.ml index 829f58f8..95eb48d7 100644 --- a/src/tcp/state.ml +++ b/src/tcp/state.ml @@ -57,6 +57,8 @@ type t = { let t ~id ~on_close = { on_close; id; state=Closed } +let on_close t = t.on_close () + let state t = t.state let pf = Format.fprintf @@ -174,5 +176,4 @@ module Make(Time:Mirage_time.S) = struct Log.debug (fun fmt -> fmt "%d %a - %a -> %a" t.id pp_tcpstate old_state pp_action i pp_tcpstate new_state); t.state <- new_state; - end diff --git a/src/tcp/state.mli b/src/tcp/state.mli index 8f495c90..b82610be 100644 --- a/src/tcp/state.mli +++ b/src/tcp/state.mli @@ -52,6 +52,8 @@ type t val state : t -> tcpstate val t : id:int -> on_close:close_cb -> t +val on_close : t -> unit + val pp: Format.formatter -> t -> unit module Make(Time : Mirage_time.S) : sig diff --git a/src/tcp/user_buffer.ml b/src/tcp/user_buffer.ml index cc95d25c..7e6d5749 100644 --- a/src/tcp/user_buffer.ml +++ b/src/tcp/user_buffer.ml @@ -59,6 +59,13 @@ module Rx = struct | None -> 0 | Some b -> Cstruct.length b + let remove_all t = + let rec rm = function + | 0 -> () + | n -> ignore (Lwt_dllist.take_l t.q); rm (pred n) + in + rm (Lwt_dllist.length t.q) + let add_r t s = if t.cur_size > t.max_size then let th,u = Lwt.wait () in diff --git a/src/tcp/user_buffer.mli b/src/tcp/user_buffer.mli index 63f984d3..c0d0ec19 100644 --- a/src/tcp/user_buffer.mli +++ b/src/tcp/user_buffer.mli @@ -19,6 +19,7 @@ module Rx : sig type t val create : max_size:int32 -> wnd:Window.t -> t + val remove_all : t -> unit val add_r : t -> Cstruct.t option -> unit Lwt.t val take_l : t -> Cstruct.t option Lwt.t val cur_size : t -> int32 diff --git a/tcpip.opam b/tcpip.opam index ac97bb05..a272b7ae 100644 --- a/tcpip.opam +++ b/tcpip.opam @@ -40,10 +40,10 @@ depends: [ "lwt-dllist" "logs" {>= "0.6.0"} "duration" - "randomconv" + "randomconv" {< "0.2.0"} "ethernet" {>= "3.0.0"} "arp" {>= "3.0.0"} - "mirage-flow" {>= "2.0.0"} + "mirage-flow" {>= "4.0.0"} "mirage-vnetif" {with-test & >= "0.5.0"} "alcotest" {with-test & >="1.5.0"} "pcap-format" {with-test}