From 296fa324c6c94416c5f936c73b19fdc03412e362 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Mon, 7 May 2018 11:39:50 -0500 Subject: [PATCH 01/10] Lwt support --- .travis.yml | 5 +- benchmarks/jbuild | 9 +- benchmarks/wrk_lwt_benchmark.ml | 59 +++++ examples/{ => async}/async_echo_post.ml | 0 examples/{ => async}/async_get.ml | 0 examples/{ => async}/async_post.ml | 0 examples/{ => async}/jbuild | 0 examples/lwt/jbuild | 5 + examples/lwt/lwt_echo_server.ml | 125 +++++++++++ examples/lwt/lwt_get.ml | 73 ++++++ examples/lwt/lwt_post.ml | 81 +++++++ httpaf-lwt.opam | 23 ++ lwt/httpaf_lwt.ml | 285 ++++++++++++++++++++++++ lwt/httpaf_lwt.mli | 24 ++ lwt/jbuild | 7 + 15 files changed, 693 insertions(+), 3 deletions(-) create mode 100644 benchmarks/wrk_lwt_benchmark.ml rename examples/{ => async}/async_echo_post.ml (100%) rename examples/{ => async}/async_get.ml (100%) rename examples/{ => async}/async_post.ml (100%) rename examples/{ => async}/jbuild (100%) create mode 100644 examples/lwt/jbuild create mode 100644 examples/lwt/lwt_echo_server.ml create mode 100644 examples/lwt/lwt_get.ml create mode 100644 examples/lwt/lwt_post.ml create mode 100644 httpaf-lwt.opam create mode 100644 lwt/httpaf_lwt.ml create mode 100644 lwt/httpaf_lwt.mli create mode 100644 lwt/jbuild diff --git a/.travis.yml b/.travis.yml index 190e3e8..98db4ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,11 +6,14 @@ install: wget https://raw.githubusercontent.com/ocaml/ocaml-travisci-skeleton/ma script: bash -ex ./.travis-docker.sh env: global: - - PINS="httpaf-async:. httpaf:." + - PINS="httpaf-async:. httpaf-lwt:. httpaf:." matrix: - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" + - PACKAGE="httpaf-lwt" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" + - PACKAGE="httpaf-lwt" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" - PACKAGE="httpaf" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" - PACKAGE="httpaf-async" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf-lwt" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" diff --git a/benchmarks/jbuild b/benchmarks/jbuild index a5d2138..87b0897 100644 --- a/benchmarks/jbuild +++ b/benchmarks/jbuild @@ -1,6 +1,11 @@ (jbuild_version 1) -(executables +(executable ((libraries (httpaf httpaf-async async core)) (modules (wrk_async_benchmark)) - (names (wrk_async_benchmark)))) + (name wrk_async_benchmark))) + +(executable + ((name wrk_lwt_benchmark) + (modules (Wrk_lwt_benchmark)) + (libraries (httpaf httpaf-lwt lwt.unix)))) diff --git a/benchmarks/wrk_lwt_benchmark.ml b/benchmarks/wrk_lwt_benchmark.ml new file mode 100644 index 0000000..f49ed00 --- /dev/null +++ b/benchmarks/wrk_lwt_benchmark.ml @@ -0,0 +1,59 @@ +(* TODO Cleanup *) +(* TODO Organize like the echo server? *) + +module Body = Httpaf.Body +module Headers = Httpaf.Headers +module Reqd = Httpaf.Reqd +module Response = Httpaf.Response +module Status = Httpaf.Status + +let text = "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, thought Alice So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her. There was nothing so very remarkable in that; nor did Alice think it so very much out of the way to hear the Rabbit say to itself, (when she thought it over afterwards, it occurred to her that she ought to have wondered at this, but at the time it all seemed quite natural); but when the Rabbit actually took a watch out of its waistcoat-pocket, and looked at it, and then hurried on, Alice started to her feet, for it flashed across her mind that she had never before seen a rabbit with either a waistcoat-pocket, or a watch to take out of it, and burning with curiosity, she ran across the field after it, and fortunately was just in time to see it pop down a large rabbit-hole under the hedge. In another moment down went Alice after it, never once considering how in the world she was to get out again. The rabbit-hole went straight on like a tunnel for some way, and then dipped suddenly down, so suddenly that Alice had not a moment to think about stopping herself before she found herself falling down a very deep well. Either the well was very deep, or she fell very slowly, for she had plenty of time as she went down to look about her and to wonder what was going to happen next. First, she tried to look down and make out what she was coming to, but it was too dark to see anything; then she looked at the sides of the well, and noticed that they were filled with cupboards......" +let text = Lwt_bytes.of_string text + +let headers = + Headers.of_list [ + "Content-Length", string_of_int (Lwt_bytes.length text) + ] + +let error_handler _ ?request error start_response = + let response_body = start_response Headers.empty in + begin match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n"; + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + Body.close_writer response_body + +let request_handler _ reqd = + let { Httpaf.Request.target } = Reqd.request reqd in + let request_body = Reqd.request_body reqd in + Body.close_reader request_body; + match target with + | "/" -> Reqd.respond_with_bigstring reqd (Response.create ~headers `OK) text; + | _ -> Reqd.respond_with_string reqd (Response.create `Not_found) "Route not found" + +let connection_handler = + Httpaf_lwt.Server.create_connection_handler ?config:None ~request_handler ~error_handler + +let () = + let open Lwt.Infix in + + let port = ref 8080 in + Arg.parse + ["-p", Arg.Set_int port, " Listening port number (8080 by default)"] + ignore + "Echoes POST requests. Runs forever."; + + let listen_address = Unix.(ADDR_INET (inet_addr_loopback, !port)) in + + Lwt.async begin fun () -> + Lwt_io.establish_server_with_client_socket + ~backlog:11_000 listen_address connection_handler + >>= fun _server -> + Lwt.return_unit + end; + + let forever, _ = Lwt.wait () in + Lwt_main.run forever diff --git a/examples/async_echo_post.ml b/examples/async/async_echo_post.ml similarity index 100% rename from examples/async_echo_post.ml rename to examples/async/async_echo_post.ml diff --git a/examples/async_get.ml b/examples/async/async_get.ml similarity index 100% rename from examples/async_get.ml rename to examples/async/async_get.ml diff --git a/examples/async_post.ml b/examples/async/async_post.ml similarity index 100% rename from examples/async_post.ml rename to examples/async/async_post.ml diff --git a/examples/jbuild b/examples/async/jbuild similarity index 100% rename from examples/jbuild rename to examples/async/jbuild diff --git a/examples/lwt/jbuild b/examples/lwt/jbuild new file mode 100644 index 0000000..b21fe05 --- /dev/null +++ b/examples/lwt/jbuild @@ -0,0 +1,5 @@ +(jbuild_version 1) + +(executables + ((names (lwt_get lwt_post lwt_echo_server)) + (libraries (httpaf httpaf-lwt lwt lwt.unix)))) diff --git a/examples/lwt/lwt_echo_server.ml b/examples/lwt/lwt_echo_server.ml new file mode 100644 index 0000000..0c87ca8 --- /dev/null +++ b/examples/lwt/lwt_echo_server.ml @@ -0,0 +1,125 @@ +(* TODO This needs to be paired with the requester example. *) +(* TODO Usage to comment. *) + +let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = + let module Body = Httpaf.Body in + let module Headers = Httpaf.Headers in + let module Reqd = Httpaf.Reqd in + let module Response = Httpaf.Response in + let module Status = Httpaf.Status in + + let request_handler : Unix.sockaddr -> _ Reqd.t -> unit = + fun _client_address request_descriptor -> + + let request = Reqd.request request_descriptor in + match request.meth with + | `POST -> + let request_body = Reqd.request_body request_descriptor in + + let response_content_type = + match Headers.get request.headers "Content-Type" with + | Some request_content_type -> request_content_type + | None -> "application/octet-stream" + in + + (* Due to a possible bug in http/af, read from the body only once, and + create the response based on the data in that first read. + + The bug is (possibly) in the client. Client_connection seems to go into + a read loop despite Client_connection.shutdown being called, due to the + reader being in the Partial state, and the next operation function + unconditionally returning `Read in that case. + + One workaround for this is to have the server send a Content-Length + header. To do that, this code has the server simply reply after the + first chunk is read, and use that chunk's length. + + The code I would expect to work, without the possible bug, is commented + out below. *) + + Body.schedule_read + request_body + ~on_eof:ignore + ~on_read:(fun request_data ~off ~len -> + let response = + Response.create + ~headers:(Headers.of_list [ + "Content-Type", response_content_type; + "Content-Length", string_of_int len; + "Connection", "close"; + ]) + `OK + in + + let response_body = + Reqd.respond_with_streaming request_descriptor response in + + Body.write_bigstring response_body request_data ~off ~len; + Body.close_writer response_body) + + (* + let rec respond () = + Body.schedule_read + request_body + ~on_eof:(fun () -> Body.close_writer response_body) + ~on_read:(fun request_data ~off ~len -> + Body.write_bigstring response_body request_data ~off ~len; + respond ()) + in + respond () + *) + + | _ -> + Reqd.respond_with_string + request_descriptor (Response.create `Method_not_allowed) "" + in + + let error_handler : + Unix.sockaddr -> + ?request:Httpaf.Request.t -> + _ -> + (Headers.t -> [`write] Body.t) -> + unit = + fun _client_address ?request:_ error start_response -> + + let response_body = start_response Headers.empty in + + begin match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n"; + + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + + Body.close_writer response_body + in + + Httpaf_lwt.Server.create_connection_handler + ?config:None + ~request_handler + ~error_handler + + + +let () = + let open Lwt.Infix in + + let port = ref 8080 in + Arg.parse + ["-p", Arg.Set_int port, " Listening port number (8080 by default)"] + ignore + "Echoes POST requests. Runs forever."; + + let listen_address = Unix.(ADDR_INET (inet_addr_loopback, !port)) in + + Lwt.async begin fun () -> + Lwt_io.establish_server_with_client_socket + listen_address connection_handler + >>= fun _server -> + Lwt.return_unit + end; + + let forever, _ = Lwt.wait () in + Lwt_main.run forever diff --git a/examples/lwt/lwt_get.ml b/examples/lwt/lwt_get.ml new file mode 100644 index 0000000..3c871be --- /dev/null +++ b/examples/lwt/lwt_get.ml @@ -0,0 +1,73 @@ +(* TODO Cleanup *) + +module Body = Httpaf.Body +module Response = Httpaf.Response + +let response_handler : unit Lwt.u -> Response.t -> [ `read ] Body.t -> unit = + fun notify_request_finished response response_body -> + + match response.status with + | `OK -> + let rec read_response () = + Body.schedule_read + response_body + ~on_eof:(fun () -> Lwt.wakeup_later notify_request_finished ()) + ~on_read:(fun response_fragment ~off ~len -> + let response_fragment_string = Bytes.create len in + Lwt_bytes.blit_to_bytes + response_fragment off + response_fragment_string 0 + len; + print_string (Bytes.unsafe_to_string response_fragment_string); + + read_response ()) + in + read_response () + + | _ -> + Format.fprintf Format.err_formatter "%a\n%!" Response.pp_hum response; + exit 1 + +(* TODO A real error handler *) +let error_handler _ = + assert false + +open Lwt.Infix + +let () = + let host = ref None in + let port = ref 80 in + + Arg.parse + ["-p", Set_int port, " port number"] + (fun host_argument -> host := Some host_argument) + "lwt_get.exe [-p N] HOST"; + + let host = + match !host with + | None -> failwith "No hostname provided" + | Some host -> host + in + + Lwt_main.run begin + Lwt_unix.getaddrinfo host (string_of_int !port) [Unix.(AI_FAMILY PF_INET)] + >>= fun addresses -> + + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr + >>= fun () -> + + let headers = Httpaf.Headers.of_list ["Host", host] in + let request = Httpaf.Request.create ~headers `GET "/" in + let request_finished, notify_request_finished = Lwt.wait () in + let request_body = + Httpaf_lwt.Client.request + socket + request + ~error_handler + ~response_handler:(response_handler notify_request_finished) + in + Body.close_writer request_body; + + request_finished + end diff --git a/examples/lwt/lwt_post.ml b/examples/lwt/lwt_post.ml new file mode 100644 index 0000000..df81053 --- /dev/null +++ b/examples/lwt/lwt_post.ml @@ -0,0 +1,81 @@ +module Body = Httpaf.Body +module Response = Httpaf.Response + +let response_handler : unit Lwt.u -> Response.t -> [ `read ] Body.t -> unit = + fun notify_request_finished response response_body -> + + match response.status with + | `OK -> + let rec read_response () = + Body.schedule_read + response_body + ~on_eof:(fun () -> Lwt.wakeup_later notify_request_finished ()) + ~on_read:(fun response_fragment ~off ~len -> + let response_fragment_string = Bytes.create len in + Lwt_bytes.blit_to_bytes + response_fragment off + response_fragment_string 0 + len; + print_string (Bytes.unsafe_to_string response_fragment_string); + + read_response ()) + in + read_response () + + | _ -> + Format.fprintf Format.err_formatter "%a\n%!" Response.pp_hum response; + exit 1 + +(* TODO Real error handler *) +let error_handler _ = + assert false + +open Lwt.Infix + +let () = + let host = ref None in + let port = ref 8080 in + + Arg.parse + ["-p", Set_int port, " port number"] + (fun host_argument -> host := Some host_argument) + "lwt_get.exe [-p N] HOST"; + + let host = + match !host with + | None -> failwith "No hostname provided" + | Some host -> host + in + + Lwt_main.run begin + Lwt_io.(read stdin) + >>= fun request_content -> + + Lwt_unix.getaddrinfo host (string_of_int !port) [Unix.(AI_FAMILY PF_INET)] + >>= fun addresses -> + + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr + >>= fun () -> + + let headers = + Httpaf.Headers.of_list [ + "Host", host; + "Connection", "close"; + "Content-Length", string_of_int (String.length request_content); + ] + in + let request = Httpaf.Request.create ~headers `POST "/" in + let request_finished, notify_request_finished = Lwt.wait () in + let request_body = + Httpaf_lwt.Client.request + socket + request + ~error_handler + ~response_handler:(response_handler notify_request_finished) + in + Body.write_string request_body request_content; + Body.close_writer request_body; + + request_finished + end diff --git a/httpaf-lwt.opam b/httpaf-lwt.opam new file mode 100644 index 0000000..53e1a9d --- /dev/null +++ b/httpaf-lwt.opam @@ -0,0 +1,23 @@ +opam-version: "1.2" +name: "httpaf-lwt" +maintainer: "Spiros Eliopoulos " +authors: [ "Anton Bachin " ] +license: "BSD-3-clause" +homepage: "https://github.com/inhabitedtype/httpaf" +bug-reports: "https://github.com/inhabitedtype/httpaf/issues" +dev-repo: "https://github.com/inhabitedtype/httpaf.git" +build: [ + ["jbuilder" "subst"] {pinned} + ["jbuilder" "build" "-p" name "-j" jobs] +] +build-test: [ + ["jbuilder" "runtest" "-p" name] +] +depends: [ + "angstrom-lwt-unix" + "faraday-lwt-unix" + "httpaf" + "jbuilder" {build & >= "1.0+beta10"} + "lwt" +] +available: [ ocaml-version >= "4.03.0" ] diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml new file mode 100644 index 0000000..7e5d972 --- /dev/null +++ b/lwt/httpaf_lwt.ml @@ -0,0 +1,285 @@ +(* TODO Need a buffer module? *) +(* TODO Look into a possible one in Angstrom. *) +(* TODO Yes, need to provide own buffer. Copy the one from Httpaf_async. *) +(* TODO Fix all the whitespace and evetything *) +(* TODO There are too many copies here, because Lwt_io is already buffered. *) + +open Lwt.Infix + + + +(* TODO Note where this came from. *) +(* TODO What to do about cleanup exceptions in both server and client? *) +module Buffer : sig + type t + + val create : int -> t + + val get : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int) -> int + val put : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int Lwt.t) -> int Lwt.t +end = struct + type t = + { buffer : Lwt_bytes.t + ; mutable off : int + ; mutable len : int } + + let create size = + let buffer = Lwt_bytes.create size in + { buffer; off = 0; len = 0 } + + let compress t = + if t.len = 0 + then begin + t.off <- 0; + t.len <- 0; + end else if t.off > 0 + then begin + Lwt_bytes.blit t.buffer t.off t.buffer 0 t.len; + t.off <- 0; + end + + let get t ~f = + let n = f t.buffer ~off:t.off ~len:t.len in + t.off <- t.off + n; + t.len <- t.len - n; + if t.len = 0 + then t.off <- 0; + n + + let put t ~f = + compress t; + f t.buffer ~off:(t.off + t.len) ~len:(Lwt_bytes.length t.buffer - t.len) + >>= fun n -> + t.len <- t.len + n; + Lwt.return n +end + +(* TODO Should probably send exceptions into notify_read_complete. *) +let read fd buffer = + Lwt.catch + (fun () -> + Buffer.put buffer ~f:(fun bigstring ~off ~len -> + Lwt_bytes.read fd bigstring off len)) + (function + | Unix.Unix_error (Unix.EBADF, _, _) as exn -> + raise exn + | exn -> + Lwt.async (fun () -> + Lwt_unix.close fd); + raise exn) + + >>= fun bytes_read -> + if bytes_read = 0 then + Lwt.return `Eof + else + Lwt.return (`Ok bytes_read) + + + + +(* TODO But is this really awkward? We just need a finalize call on the joined + promise. *) +(* TODO Close the server's client connection, even though Lwt_io will also close + it. This is for better error handling. *) +(* TODO Get exceptions passed to the error handler? *) + +module Server = struct + type request_handler = + Lwt_unix.file_descr Httpaf.Server_connection.request_handler + + + + let create_connection_handler ?config ~request_handler ~error_handler = + fun client_addr socket -> + let module Server_connection = Httpaf.Server_connection in + let connection = + Server_connection.create + ?config + ~error_handler:(error_handler client_addr) + (request_handler client_addr) + in + + + let read_buffer = Buffer.create 0x1000 in + let read_loop_exited, notify_read_loop_exited = Lwt.wait () in + + (* TODO Explain loops and steps. *) + let rec read_loop () = + let rec read_loop_step () = + match Server_connection.next_read_operation connection with + | `Read -> + read socket read_buffer >>= begin function + | `Eof -> + Server_connection.shutdown_reader connection; + read_loop_step () + | `Ok _ -> + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read connection bigstring ~off ~len) + |> ignore; + read_loop_step () + end + + | `Yield -> + Server_connection.yield_reader connection read_loop; + Lwt.return_unit + + | `Close -> + Lwt.wakeup_later notify_read_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + Lwt_unix.shutdown socket Unix.SHUTDOWN_RECEIVE + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + read_loop_step + (fun exn -> + Lwt.wakeup_later_exn notify_read_loop_exited exn; + Lwt.return_unit)) + in + + + let writev = Faraday_lwt_unix.writev_of_fd socket in + let write_loop_exited, notify_write_loop_exited = Lwt.wait () in + + let rec write_loop () = + let rec write_loop_step () = + match Server_connection.next_write_operation connection with + | `Write io_vectors -> + writev io_vectors >>= fun result -> + Server_connection.report_write_result connection result; + write_loop_step () + + | `Yield -> + Server_connection.yield_writer connection write_loop; + Lwt.return_unit + + | `Close _ -> + Lwt.wakeup_later notify_write_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + Lwt_unix.shutdown socket Unix.SHUTDOWN_SEND + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + write_loop_step + (fun exn -> + Lwt.wakeup_later_exn notify_write_loop_exited exn; + Lwt.return_unit)) + in + + + read_loop (); + write_loop (); + + let handler_finished = Lwt.join [read_loop_exited; write_loop_exited] in + + Lwt.on_failure handler_finished begin fun _exn -> + Server_connection.shutdown connection; + if not (Lwt_unix.state socket = Lwt_unix.Closed) then + Lwt.async (fun () -> + Lwt_unix.close socket) + end; + + handler_finished +end + + + +module Client = struct + let request socket request ~error_handler ~response_handler = + let module Client_connection = Httpaf.Client_connection in + let request_body, connection = + Client_connection.request request ~error_handler ~response_handler in + + + let read_buffer = Buffer.create 0x1000 in + let read_loop_exited, notify_read_loop_exited = Lwt.wait () in + + let read_loop () = + let rec read_loop_step () = + match Client_connection.next_read_operation connection with + | `Read -> + read socket read_buffer >>= begin function + | `Eof -> + Client_connection.shutdown_reader connection; + read_loop_step () + | `Ok _ -> + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Client_connection.read connection bigstring ~off ~len) + |> ignore; + read_loop_step () + end + + | `Close -> + Lwt.wakeup_later notify_read_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + Lwt_unix.shutdown socket Unix.SHUTDOWN_RECEIVE + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + read_loop_step + (fun exn -> + Lwt.wakeup_later_exn notify_read_loop_exited exn; + Lwt.return_unit)) + in + + + let writev = Faraday_lwt_unix.writev_of_fd socket in + let write_loop_exited, notify_write_loop_exited = Lwt.wait () in + + let rec write_loop () = + let rec write_loop_step () = + match Client_connection.next_write_operation connection with + | `Write io_vectors -> + writev io_vectors >>= fun result -> + Client_connection.report_write_result connection result; + write_loop_step () + + | `Yield -> + Client_connection.yield_writer connection write_loop; + Lwt.return_unit + + | `Close _ -> + Lwt.wakeup_later notify_write_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + Lwt_unix.shutdown socket Unix.SHUTDOWN_SEND + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + write_loop_step + (fun exn -> + Lwt.wakeup_later_exn notify_write_loop_exited exn; + Lwt.return_unit)) + in + + + read_loop (); + write_loop (); + + let handler_finished = Lwt.join [read_loop_exited; write_loop_exited] in + + Lwt.on_failure handler_finished begin fun _exn -> + Client_connection.shutdown connection; + if not (Lwt_unix.state socket = Lwt_unix.Closed) then + Lwt.async (fun () -> + Lwt_unix.close socket) + end; + + Lwt.on_success handler_finished begin fun () -> + if not (Lwt_unix.state socket = Lwt_unix.Closed) then + Lwt.async (fun () -> + Lwt_unix.close socket) + end; + + request_body +end diff --git a/lwt/httpaf_lwt.mli b/lwt/httpaf_lwt.mli new file mode 100644 index 0000000..ef12085 --- /dev/null +++ b/lwt/httpaf_lwt.mli @@ -0,0 +1,24 @@ +(* TODO Document the server is meant to be used with establish_server', and +whatever client is meant to be used with. *) +(* TODO Explain where exceptions go and how to wrap the server callback. *) +(* TODO Local usage examples, or refer people to the example. *) + +module Server : sig + type request_handler = + Lwt_unix.file_descr Httpaf.Server_connection.request_handler + + val create_connection_handler + : ?config:Httpaf.Server_connection.Config.t + -> request_handler:(Unix.sockaddr -> request_handler) + -> error_handler:(Unix.sockaddr -> Httpaf.Server_connection.error_handler) + -> (Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t) +end + +module Client : sig + val request + : Lwt_unix.file_descr + -> Httpaf.Request.t + -> error_handler : Httpaf.Client_connection.error_handler + -> response_handler : Httpaf.Client_connection.response_handler + -> [`write] Httpaf.Body.t +end diff --git a/lwt/jbuild b/lwt/jbuild new file mode 100644 index 0000000..179e7f9 --- /dev/null +++ b/lwt/jbuild @@ -0,0 +1,7 @@ +(jbuild_version 1) + +(library + ((name httpaf_lwt) + (public_name httpaf-lwt) + (libraries (faraday-lwt-unix httpaf lwt.unix)) + (flags (:standard -safe-string)))) From 7b990d8b88f3a8756f97d899fa834273d9ba28cb Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Wed, 27 Jun 2018 09:23:13 -0500 Subject: [PATCH 02/10] Have the echo server read the whole request --- examples/lwt/lwt_echo_server.ml | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/examples/lwt/lwt_echo_server.ml b/examples/lwt/lwt_echo_server.ml index 0c87ca8..e6b4dfc 100644 --- a/examples/lwt/lwt_echo_server.ml +++ b/examples/lwt/lwt_echo_server.ml @@ -37,7 +37,7 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = The code I would expect to work, without the possible bug, is commented out below. *) - Body.schedule_read + (* Body.schedule_read request_body ~on_eof:ignore ~on_read:(fun request_data ~off ~len -> @@ -55,9 +55,20 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = Reqd.respond_with_streaming request_descriptor response in Body.write_bigstring response_body request_data ~off ~len; - Body.close_writer response_body) + Body.close_writer response_body) *) + + let response = + Response.create + ~headers:(Headers.of_list [ + "Content-Type", response_content_type; + "Connection", "close"; + ]) + `OK + in + + let response_body = + Reqd.respond_with_streaming request_descriptor response in - (* let rec respond () = Body.schedule_read request_body @@ -67,7 +78,6 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = respond ()) in respond () - *) | _ -> Reqd.respond_with_string From d7b33f9fcae8035dea22593b9331704c283ea8e2 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Wed, 27 Jun 2018 10:05:55 -0500 Subject: [PATCH 03/10] Use (Server|Client)_connection.report_exn --- lwt/httpaf_lwt.ml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml index 7e5d972..c6bb722 100644 --- a/lwt/httpaf_lwt.ml +++ b/lwt/httpaf_lwt.ml @@ -135,7 +135,7 @@ module Server = struct Lwt.catch read_loop_step (fun exn -> - Lwt.wakeup_later_exn notify_read_loop_exited exn; + Server_connection.report_exn connection exn; Lwt.return_unit)) in @@ -167,7 +167,7 @@ module Server = struct Lwt.catch write_loop_step (fun exn -> - Lwt.wakeup_later_exn notify_write_loop_exited exn; + Server_connection.report_exn connection exn; Lwt.return_unit)) in @@ -226,7 +226,7 @@ module Client = struct Lwt.catch read_loop_step (fun exn -> - Lwt.wakeup_later_exn notify_read_loop_exited exn; + Client_connection.report_exn connection exn; Lwt.return_unit)) in @@ -258,7 +258,7 @@ module Client = struct Lwt.catch write_loop_step (fun exn -> - Lwt.wakeup_later_exn notify_write_loop_exited exn; + Client_connection.report_exn connection exn; Lwt.return_unit)) in From edc2e5ec8ae798295118943da5b291df3f37d9a3 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Wed, 27 Jun 2018 10:38:01 -0500 Subject: [PATCH 04/10] Handle ENOTCONN in shutdown --- lwt/httpaf_lwt.ml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml index c6bb722..84b3e42 100644 --- a/lwt/httpaf_lwt.ml +++ b/lwt/httpaf_lwt.ml @@ -76,6 +76,11 @@ let read fd buffer = +let shutdown socket command = + try Lwt_unix.shutdown socket command + with Unix.Unix_error (Unix.ENOTCONN, _, _) -> () + + (* TODO But is this really awkward? We just need a finalize call on the joined promise. *) @@ -126,7 +131,7 @@ module Server = struct | `Close -> Lwt.wakeup_later notify_read_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin - Lwt_unix.shutdown socket Unix.SHUTDOWN_RECEIVE + shutdown socket Unix.SHUTDOWN_RECEIVE end; Lwt.return_unit in @@ -158,7 +163,7 @@ module Server = struct | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin - Lwt_unix.shutdown socket Unix.SHUTDOWN_SEND + shutdown socket Unix.SHUTDOWN_SEND end; Lwt.return_unit in @@ -217,7 +222,7 @@ module Client = struct | `Close -> Lwt.wakeup_later notify_read_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin - Lwt_unix.shutdown socket Unix.SHUTDOWN_RECEIVE + shutdown socket Unix.SHUTDOWN_RECEIVE end; Lwt.return_unit in @@ -249,7 +254,7 @@ module Client = struct | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin - Lwt_unix.shutdown socket Unix.SHUTDOWN_SEND + shutdown socket Unix.SHUTDOWN_SEND end; Lwt.return_unit in From 73251d459686f1119573e72ed67377982873f604 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Mon, 19 Nov 2018 21:12:24 -0600 Subject: [PATCH 05/10] Switch from shutdown_reader to read_eof --- lwt/httpaf_lwt.ml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml index 84b3e42..d5ff1c7 100644 --- a/lwt/httpaf_lwt.ml +++ b/lwt/httpaf_lwt.ml @@ -115,7 +115,9 @@ module Server = struct | `Read -> read socket read_buffer >>= begin function | `Eof -> - Server_connection.shutdown_reader connection; + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read_eof connection bigstring ~off ~len) + |> ignore; read_loop_step () | `Ok _ -> Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> @@ -210,7 +212,9 @@ module Client = struct | `Read -> read socket read_buffer >>= begin function | `Eof -> - Client_connection.shutdown_reader connection; + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Client_connection.read_eof connection bigstring ~off ~len) + |> ignore; read_loop_step () | `Ok _ -> Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> From 8f19233e4dde47b5d358a86cd9946f9e98ab0de3 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Mon, 19 Nov 2018 22:03:29 -0600 Subject: [PATCH 06/10] Cleanup --- benchmarks/wrk_lwt_benchmark.ml | 114 +++++++++++++++++++++----------- examples/lwt/lwt_echo_server.ml | 44 ++---------- examples/lwt/lwt_get.ml | 32 ++++----- examples/lwt/lwt_post.ml | 56 ++++++++-------- lwt/httpaf_lwt.ml | 10 +-- lwt/httpaf_lwt.mli | 6 +- 6 files changed, 127 insertions(+), 135 deletions(-) diff --git a/benchmarks/wrk_lwt_benchmark.ml b/benchmarks/wrk_lwt_benchmark.ml index f49ed00..e19be22 100644 --- a/benchmarks/wrk_lwt_benchmark.ml +++ b/benchmarks/wrk_lwt_benchmark.ml @@ -1,41 +1,78 @@ -(* TODO Cleanup *) -(* TODO Organize like the echo server? *) - -module Body = Httpaf.Body -module Headers = Httpaf.Headers -module Reqd = Httpaf.Reqd -module Response = Httpaf.Response -module Status = Httpaf.Status - -let text = "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, thought Alice So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her. There was nothing so very remarkable in that; nor did Alice think it so very much out of the way to hear the Rabbit say to itself, (when she thought it over afterwards, it occurred to her that she ought to have wondered at this, but at the time it all seemed quite natural); but when the Rabbit actually took a watch out of its waistcoat-pocket, and looked at it, and then hurried on, Alice started to her feet, for it flashed across her mind that she had never before seen a rabbit with either a waistcoat-pocket, or a watch to take out of it, and burning with curiosity, she ran across the field after it, and fortunately was just in time to see it pop down a large rabbit-hole under the hedge. In another moment down went Alice after it, never once considering how in the world she was to get out again. The rabbit-hole went straight on like a tunnel for some way, and then dipped suddenly down, so suddenly that Alice had not a moment to think about stopping herself before she found herself falling down a very deep well. Either the well was very deep, or she fell very slowly, for she had plenty of time as she went down to look about her and to wonder what was going to happen next. First, she tried to look down and make out what she was coming to, but it was too dark to see anything; then she looked at the sides of the well, and noticed that they were filled with cupboards......" -let text = Lwt_bytes.of_string text - -let headers = - Headers.of_list [ - "Content-Length", string_of_int (Lwt_bytes.length text) - ] - -let error_handler _ ?request error start_response = - let response_body = start_response Headers.empty in - begin match error with - | `Exn exn -> - Body.write_string response_body (Printexc.to_string exn); - Body.write_string response_body "\n"; - | #Status.standard as error -> - Body.write_string response_body (Status.default_reason_phrase error) - end; - Body.close_writer response_body - -let request_handler _ reqd = - let { Httpaf.Request.target } = Reqd.request reqd in - let request_body = Reqd.request_body reqd in - Body.close_reader request_body; - match target with - | "/" -> Reqd.respond_with_bigstring reqd (Response.create ~headers `OK) text; - | _ -> Reqd.respond_with_string reqd (Response.create `Not_found) "Route not found" +let text = +{|CHAPTER I. Down the Rabbit-Hole +Alice was beginning to get very tired of sitting by her sister on the bank, and +of having nothing to do: once or twice she had peeped into the book her sister +was reading, but it had no pictures or conversations in it, thought Alice So she was +considering in her own mind (as well as she could, for the hot day made her feel +very sleepy and stupid), whether the pleasure of making a daisy-chain would be +worth the trouble of getting up and picking the daisies, when suddenly a White +Rabbit with pink eyes ran close by her. There was nothing so very remarkable in +that; nor did Alice think it so very much out of the way to hear the Rabbit say +to itself, (when she thought it over +afterwards, it occurred to her that she ought to have wondered at this, but at +the time it all seemed quite natural); but when the Rabbit actually took a watch +out of its waistcoat-pocket, and looked at it, and then hurried on, Alice +started to her feet, for it flashed across her mind that she had never before +seen a rabbit with either a waistcoat-pocket, or a watch to take out of it, and +burning with curiosity, she ran across the field after it, and fortunately was +just in time to see it pop down a large rabbit-hole under the hedge. In another +moment down went Alice after it, never once considering how in the world she was +to get out again. The rabbit-hole went straight on like a tunnel for some way, +and then dipped suddenly down, so suddenly that Alice had not a moment to think +about stopping herself before she found herself falling down a very deep well. +Either the well was very deep, or she fell very slowly, for she had plenty of +time as she went down to look about her and to wonder what was going to happen +next. First, she tried to look down and make out what she was coming to, but it +was too dark to see anything; then she looked at the sides of the well, and +noticed that they were filled with cupboards......|} let connection_handler = - Httpaf_lwt.Server.create_connection_handler ?config:None ~request_handler ~error_handler + let module Body = Httpaf.Body in + let module Headers = Httpaf.Headers in + let module Reqd = Httpaf.Reqd in + let module Response = Httpaf.Response in + let module Status = Httpaf.Status in + + let text = Lwt_bytes.of_string text in + + let response_headers = + Headers.of_list [ + "Content-Length", string_of_int (Lwt_bytes.length text) + ] + in + + let request_handler _ reqd = + let {Httpaf.Request.target; _} = Reqd.request reqd in + let request_body = Reqd.request_body reqd in + Body.close_reader request_body; + + match target with + | "/" -> + Reqd.respond_with_bigstring + reqd (Response.create ~headers:response_headers `OK) text; + | _ -> + Reqd.respond_with_string + reqd (Response.create `Not_found) "Route not found" + in + + let error_handler _ ?request error start_response = + let response_body = start_response Headers.empty in + + begin match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n"; + + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + + Body.close_writer response_body + in + + Httpaf_lwt.Server.create_connection_handler + ?config:None ~request_handler ~error_handler let () = let open Lwt.Infix in @@ -44,15 +81,14 @@ let () = Arg.parse ["-p", Arg.Set_int port, " Listening port number (8080 by default)"] ignore - "Echoes POST requests. Runs forever."; + "Responds to requests with a fixed string for benchmarking purposes."; let listen_address = Unix.(ADDR_INET (inet_addr_loopback, !port)) in Lwt.async begin fun () -> Lwt_io.establish_server_with_client_socket ~backlog:11_000 listen_address connection_handler - >>= fun _server -> - Lwt.return_unit + >>= fun _server -> Lwt.return_unit end; let forever, _ = Lwt.wait () in diff --git a/examples/lwt/lwt_echo_server.ml b/examples/lwt/lwt_echo_server.ml index e6b4dfc..1f2bc38 100644 --- a/examples/lwt/lwt_echo_server.ml +++ b/examples/lwt/lwt_echo_server.ml @@ -1,6 +1,3 @@ -(* TODO This needs to be paired with the requester example. *) -(* TODO Usage to comment. *) - let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = let module Body = Httpaf.Body in let module Headers = Httpaf.Headers in @@ -22,41 +19,6 @@ let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = | None -> "application/octet-stream" in - (* Due to a possible bug in http/af, read from the body only once, and - create the response based on the data in that first read. - - The bug is (possibly) in the client. Client_connection seems to go into - a read loop despite Client_connection.shutdown being called, due to the - reader being in the Partial state, and the next operation function - unconditionally returning `Read in that case. - - One workaround for this is to have the server send a Content-Length - header. To do that, this code has the server simply reply after the - first chunk is read, and use that chunk's length. - - The code I would expect to work, without the possible bug, is commented - out below. *) - - (* Body.schedule_read - request_body - ~on_eof:ignore - ~on_read:(fun request_data ~off ~len -> - let response = - Response.create - ~headers:(Headers.of_list [ - "Content-Type", response_content_type; - "Content-Length", string_of_int len; - "Connection", "close"; - ]) - `OK - in - - let response_body = - Reqd.respond_with_streaming request_descriptor response in - - Body.write_bigstring response_body request_data ~off ~len; - Body.close_writer response_body) *) - let response = Response.create ~headers:(Headers.of_list [ @@ -128,7 +90,11 @@ let () = Lwt_io.establish_server_with_client_socket listen_address connection_handler >>= fun _server -> - Lwt.return_unit + Printf.printf "Listening on port %i and echoing POST requests.\n" !port; + print_string "To send a POST request, try\n\n"; + print_string " echo foo | dune exec examples/lwt/lwt_post.exe\n\n"; + flush stdout; + Lwt.return_unit end; let forever, _ = Lwt.wait () in diff --git a/examples/lwt/lwt_get.ml b/examples/lwt/lwt_get.ml index 3c871be..271e3e2 100644 --- a/examples/lwt/lwt_get.ml +++ b/examples/lwt/lwt_get.ml @@ -1,17 +1,13 @@ -(* TODO Cleanup *) - module Body = Httpaf.Body -module Response = Httpaf.Response - -let response_handler : unit Lwt.u -> Response.t -> [ `read ] Body.t -> unit = - fun notify_request_finished response response_body -> - match response.status with +let response_handler notify_response_received response response_body = + let module Response = Httpaf.Response in + match Response.(response.status) with | `OK -> let rec read_response () = Body.schedule_read response_body - ~on_eof:(fun () -> Lwt.wakeup_later notify_request_finished ()) + ~on_eof:(fun () -> Lwt.wakeup_later notify_response_received ()) ~on_read:(fun response_fragment ~off ~len -> let response_fragment_string = Bytes.create len in Lwt_bytes.blit_to_bytes @@ -28,7 +24,6 @@ let response_handler : unit Lwt.u -> Response.t -> [ `read ] Body.t -> unit = Format.fprintf Format.err_formatter "%a\n%!" Response.pp_hum response; exit 1 -(* TODO A real error handler *) let error_handler _ = assert false @@ -39,7 +34,7 @@ let () = let port = ref 80 in Arg.parse - ["-p", Set_int port, " port number"] + ["-p", Set_int port, " Port number (80 by default)"] (fun host_argument -> host := Some host_argument) "lwt_get.exe [-p N] HOST"; @@ -57,17 +52,22 @@ let () = Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr >>= fun () -> - let headers = Httpaf.Headers.of_list ["Host", host] in - let request = Httpaf.Request.create ~headers `GET "/" in - let request_finished, notify_request_finished = Lwt.wait () in + let request_headers = + Httpaf.Request.create + `GET "/" ~headers:(Httpaf.Headers.of_list ["Host", host]) + in + + let response_received, notify_response_received = Lwt.wait () in + let response_handler = response_handler notify_response_received in + let request_body = Httpaf_lwt.Client.request socket - request + request_headers ~error_handler - ~response_handler:(response_handler notify_request_finished) + ~response_handler in Body.close_writer request_body; - request_finished + response_received end diff --git a/examples/lwt/lwt_post.ml b/examples/lwt/lwt_post.ml index df81053..b9d2e84 100644 --- a/examples/lwt/lwt_post.ml +++ b/examples/lwt/lwt_post.ml @@ -1,15 +1,13 @@ module Body = Httpaf.Body -module Response = Httpaf.Response -let response_handler : unit Lwt.u -> Response.t -> [ `read ] Body.t -> unit = - fun notify_request_finished response response_body -> - - match response.status with +let response_handler notify_response_received response response_body = + let module Response = Httpaf.Response in + match Response.(response.status) with | `OK -> let rec read_response () = Body.schedule_read response_body - ~on_eof:(fun () -> Lwt.wakeup_later notify_request_finished ()) + ~on_eof:(fun () -> Lwt.wakeup_later notify_response_received ()) ~on_read:(fun response_fragment ~off ~len -> let response_fragment_string = Bytes.create len in Lwt_bytes.blit_to_bytes @@ -26,56 +24,54 @@ let response_handler : unit Lwt.u -> Response.t -> [ `read ] Body.t -> unit = Format.fprintf Format.err_formatter "%a\n%!" Response.pp_hum response; exit 1 -(* TODO Real error handler *) let error_handler _ = assert false open Lwt.Infix let () = - let host = ref None in + let host = ref "127.0.0.1" in let port = ref 8080 in Arg.parse - ["-p", Set_int port, " port number"] - (fun host_argument -> host := Some host_argument) - "lwt_get.exe [-p N] HOST"; - - let host = - match !host with - | None -> failwith "No hostname provided" - | Some host -> host - in + [ + "-h", Set_string host, " Hostname (127.0.0.1 by default)"; + "-p", Set_int port, " Port number (8080 by default)"; + ] + ignore + "lwt_get.exe [-h HOST] [-p N]"; Lwt_main.run begin Lwt_io.(read stdin) - >>= fun request_content -> + >>= fun text_to_send -> - Lwt_unix.getaddrinfo host (string_of_int !port) [Unix.(AI_FAMILY PF_INET)] + Lwt_unix.getaddrinfo !host (string_of_int !port) [Unix.(AI_FAMILY PF_INET)] >>= fun addresses -> let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr >>= fun () -> - let headers = - Httpaf.Headers.of_list [ - "Host", host; + let request_headers = + Httpaf.Request.create `POST "/" ~headers:(Httpaf.Headers.of_list [ + "Host", !host; "Connection", "close"; - "Content-Length", string_of_int (String.length request_content); - ] + "Content-Length", string_of_int (String.length text_to_send); + ]) in - let request = Httpaf.Request.create ~headers `POST "/" in - let request_finished, notify_request_finished = Lwt.wait () in + + let response_received, notify_response_received = Lwt.wait () in + let response_handler = response_handler notify_response_received in + let request_body = Httpaf_lwt.Client.request socket - request + request_headers ~error_handler - ~response_handler:(response_handler notify_request_finished) + ~response_handler in - Body.write_string request_body request_content; + Body.write_string request_body text_to_send; Body.close_writer request_body; - request_finished + response_received end diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml index d5ff1c7..d657d33 100644 --- a/lwt/httpaf_lwt.ml +++ b/lwt/httpaf_lwt.ml @@ -1,19 +1,13 @@ -(* TODO Need a buffer module? *) -(* TODO Look into a possible one in Angstrom. *) -(* TODO Yes, need to provide own buffer. Copy the one from Httpaf_async. *) -(* TODO Fix all the whitespace and evetything *) -(* TODO There are too many copies here, because Lwt_io is already buffered. *) - open Lwt.Infix -(* TODO Note where this came from. *) (* TODO What to do about cleanup exceptions in both server and client? *) +(* Based on the Buffer module in httpaf_async.ml. *) module Buffer : sig type t - val create : int -> t + val create : int -> t val get : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int) -> int val put : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int Lwt.t) -> int Lwt.t diff --git a/lwt/httpaf_lwt.mli b/lwt/httpaf_lwt.mli index ef12085..a961b17 100644 --- a/lwt/httpaf_lwt.mli +++ b/lwt/httpaf_lwt.mli @@ -8,9 +8,9 @@ module Server : sig Lwt_unix.file_descr Httpaf.Server_connection.request_handler val create_connection_handler - : ?config:Httpaf.Server_connection.Config.t - -> request_handler:(Unix.sockaddr -> request_handler) - -> error_handler:(Unix.sockaddr -> Httpaf.Server_connection.error_handler) + : ?config : Httpaf.Server_connection.Config.t + -> request_handler : (Unix.sockaddr -> request_handler) + -> error_handler : (Unix.sockaddr -> Httpaf.Server_connection.error_handler) -> (Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t) end From 177296e8ae1e5071c2f1912a91c8e9ceaad9d3b9 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Mon, 19 Nov 2018 22:07:01 -0600 Subject: [PATCH 07/10] Convert httpaf-lwt.opam to opam 2.0 format --- httpaf-lwt.opam | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/httpaf-lwt.opam b/httpaf-lwt.opam index 53e1a9d..553ef42 100644 --- a/httpaf-lwt.opam +++ b/httpaf-lwt.opam @@ -1,23 +1,20 @@ -opam-version: "1.2" +opam-version: "2.0" name: "httpaf-lwt" maintainer: "Spiros Eliopoulos " authors: [ "Anton Bachin " ] license: "BSD-3-clause" homepage: "https://github.com/inhabitedtype/httpaf" bug-reports: "https://github.com/inhabitedtype/httpaf/issues" -dev-repo: "https://github.com/inhabitedtype/httpaf.git" +dev-repo: "git+https://github.com/inhabitedtype/httpaf.git" build: [ - ["jbuilder" "subst"] {pinned} + ["jbuilder" "subst" "-p" name] {pinned} ["jbuilder" "build" "-p" name "-j" jobs] ] -build-test: [ - ["jbuilder" "runtest" "-p" name] -] depends: [ + "ocaml" {>= "4.03.0"} "angstrom-lwt-unix" "faraday-lwt-unix" "httpaf" "jbuilder" {build & >= "1.0+beta10"} "lwt" ] -available: [ ocaml-version >= "4.03.0" ] From 7e177f9ba24c59088b19393a0f4dec61d5cdbd8e Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Mon, 19 Nov 2018 22:37:35 -0600 Subject: [PATCH 08/10] Review error handling --- lwt/httpaf_lwt.ml | 51 ++++++++++++++++------------------------------ lwt/httpaf_lwt.mli | 9 ++++---- 2 files changed, 21 insertions(+), 39 deletions(-) diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml index d657d33..2da8c10 100644 --- a/lwt/httpaf_lwt.ml +++ b/lwt/httpaf_lwt.ml @@ -2,7 +2,6 @@ open Lwt.Infix -(* TODO What to do about cleanup exceptions in both server and client? *) (* Based on the Buffer module in httpaf_async.ml. *) module Buffer : sig type t @@ -48,7 +47,6 @@ end = struct Lwt.return n end -(* TODO Should probably send exceptions into notify_read_complete. *) let read fd buffer = Lwt.catch (fun () -> @@ -56,11 +54,11 @@ let read fd buffer = Lwt_bytes.read fd bigstring off len)) (function | Unix.Unix_error (Unix.EBADF, _, _) as exn -> - raise exn + Lwt.fail exn | exn -> Lwt.async (fun () -> Lwt_unix.close fd); - raise exn) + Lwt.fail exn) >>= fun bytes_read -> if bytes_read = 0 then @@ -76,12 +74,6 @@ let shutdown socket command = -(* TODO But is this really awkward? We just need a finalize call on the joined - promise. *) -(* TODO Close the server's client connection, even though Lwt_io will also close - it. This is for better error handling. *) -(* TODO Get exceptions passed to the error handler? *) - module Server = struct type request_handler = Lwt_unix.file_descr Httpaf.Server_connection.request_handler @@ -102,7 +94,6 @@ module Server = struct let read_buffer = Buffer.create 0x1000 in let read_loop_exited, notify_read_loop_exited = Lwt.wait () in - (* TODO Explain loops and steps. *) let rec read_loop () = let rec read_loop_step () = match Server_connection.next_read_operation connection with @@ -175,17 +166,14 @@ module Server = struct read_loop (); write_loop (); + Lwt.join [read_loop_exited; write_loop_exited] >>= fun () -> - let handler_finished = Lwt.join [read_loop_exited; write_loop_exited] in - - Lwt.on_failure handler_finished begin fun _exn -> - Server_connection.shutdown connection; - if not (Lwt_unix.state socket = Lwt_unix.Closed) then - Lwt.async (fun () -> - Lwt_unix.close socket) - end; - - handler_finished + if Lwt_unix.state socket <> Lwt_unix.Closed then + Lwt.catch + (fun () -> Lwt_unix.close socket) + (fun _exn -> Lwt.return_unit) + else + Lwt.return_unit end @@ -269,20 +257,15 @@ module Client = struct read_loop (); write_loop (); - let handler_finished = Lwt.join [read_loop_exited; write_loop_exited] in + Lwt.async (fun () -> + Lwt.join [read_loop_exited; write_loop_exited] >>= fun () -> - Lwt.on_failure handler_finished begin fun _exn -> - Client_connection.shutdown connection; - if not (Lwt_unix.state socket = Lwt_unix.Closed) then - Lwt.async (fun () -> - Lwt_unix.close socket) - end; - - Lwt.on_success handler_finished begin fun () -> - if not (Lwt_unix.state socket = Lwt_unix.Closed) then - Lwt.async (fun () -> - Lwt_unix.close socket) - end; + if Lwt_unix.state socket <> Lwt_unix.Closed then + Lwt.catch + (fun () -> Lwt_unix.close socket) + (fun _exn -> Lwt.return_unit) + else + Lwt.return_unit); request_body end diff --git a/lwt/httpaf_lwt.mli b/lwt/httpaf_lwt.mli index a961b17..63fe2a5 100644 --- a/lwt/httpaf_lwt.mli +++ b/lwt/httpaf_lwt.mli @@ -1,8 +1,6 @@ -(* TODO Document the server is meant to be used with establish_server', and -whatever client is meant to be used with. *) -(* TODO Explain where exceptions go and how to wrap the server callback. *) -(* TODO Local usage examples, or refer people to the example. *) - +(* The function that results from [create_connection_handler] should be passed + to [Lwt_io.establish_server_with_client_socket]. For an example, see + [examples/lwt_echo_server.ml]. *) module Server : sig type request_handler = Lwt_unix.file_descr Httpaf.Server_connection.request_handler @@ -14,6 +12,7 @@ module Server : sig -> (Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t) end +(* For an example, see [examples/lwt_get.ml]. *) module Client : sig val request : Lwt_unix.file_descr From 43b77ed3aaeb3fbdbed09600b11ac685a6261aa6 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Tue, 20 Nov 2018 07:04:10 -0600 Subject: [PATCH 09/10] httpaf-lwt.opam: no direct dependency on angstrom --- httpaf-lwt.opam | 1 - 1 file changed, 1 deletion(-) diff --git a/httpaf-lwt.opam b/httpaf-lwt.opam index 553ef42..bff6c35 100644 --- a/httpaf-lwt.opam +++ b/httpaf-lwt.opam @@ -12,7 +12,6 @@ build: [ ] depends: [ "ocaml" {>= "4.03.0"} - "angstrom-lwt-unix" "faraday-lwt-unix" "httpaf" "jbuilder" {build & >= "1.0+beta10"} From 486627e891961bd1c1be1db85c06d8e6e8452597 Mon Sep 17 00:00:00 2001 From: Anton Bachin Date: Tue, 20 Nov 2018 07:18:46 -0600 Subject: [PATCH 10/10] httpaf-lwt.opam: add synopsis --- httpaf-lwt.opam | 1 + 1 file changed, 1 insertion(+) diff --git a/httpaf-lwt.opam b/httpaf-lwt.opam index bff6c35..1c47a2c 100644 --- a/httpaf-lwt.opam +++ b/httpaf-lwt.opam @@ -17,3 +17,4 @@ depends: [ "jbuilder" {build & >= "1.0+beta10"} "lwt" ] +synopsis: "Lwt support for http/af"