diff --git a/async/buffer.ml b/async/buffer.ml new file mode 100644 index 00000000..83f3e606 --- /dev/null +++ b/async/buffer.ml @@ -0,0 +1,52 @@ +(** XXX(seliopou): Replace Angstrom.Buffered with a module like this, while + also supporting growing the buffer. Clients can use this to buffer and the + use the unbuffered interface for actually running the parser. *) +open Core +open Async + +type t = + { buffer : Bigstring.t + ; mutable off : int + ; mutable len : int } + +let create size = + let buffer = Bigstring.create size in + { buffer; off = 0; len = 0 } +;; + +let compress t = + if t.len = 0 + then begin + t.off <- 0; + t.len <- 0; + end else if t.off > 0 + then begin + Bigstring.blit ~src:t.buffer ~src_pos:t.off ~dst:t.buffer ~dst_pos:0 ~len:t.len; + t.off <- 0; + end +;; + +let get t ~f = + let n = f t.buffer ~off:t.off ~len:t.len in + t.off <- t.off + n; + t.len <- t.len - n; + if t.len = 0 + then t.off <- 0; + n +;; + +let put t ~f = + compress t; + let n = f t.buffer ~off:(t.off + t.len) ~len:(Bigstring.length t.buffer - t.len) in + t.len <- t.len + n; + n +;; + +let put_async t ~f = + compress t; + f t.buffer ~off:(t.off + t.len) ~len:(Bigstring.length t.buffer - t.len) + >>= fun n -> + t.len <- t.len + n; + Deferred.return n +;; + diff --git a/async/buffer.mli b/async/buffer.mli new file mode 100644 index 00000000..8767b4af --- /dev/null +++ b/async/buffer.mli @@ -0,0 +1,10 @@ +open Core +open Async + +type t + +val create : int -> t + +val get : t -> f:(Bigstring.t -> off:int -> len:int -> int) -> int +val put : t -> f:(Bigstring.t -> off:int -> len:int -> int) -> int +val put_async : t -> f:(Bigstring.t -> off:int -> len:int -> int Deferred.t) -> int Deferred.t diff --git a/async/dune b/async/dune index 411e6ed7..94c5da1f 100644 --- a/async/dune +++ b/async/dune @@ -1,7 +1,10 @@ (library (name httpaf_async) (public_name httpaf-async) - (wrapped false) (libraries - async core angstrom-async faraday-async httpaf) + async core angstrom-async faraday-async httpaf + (select ssl_io.ml from + (async_ssl -> ssl_io_real.ml) + (!async_ssl -> ssl_io_dummy.ml))) + (modules buffer httpaf_async ssl_io) (flags (:standard -safe-string))) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 0ff9421b..9a7cc0ec 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -1,56 +1,6 @@ open Core open Async -(** XXX(seliopou): Replace Angstrom.Buffered with a module like this, while - also supporting growing the buffer. Clients can use this to buffer and the - use the unbuffered interface for actually running the parser. *) -module Buffer : sig - type t - - val create : int -> t - - val get : t -> f:(Bigstring.t -> off:int -> len:int -> int) -> int - val put : t -> f:(Bigstring.t -> off:int -> len:int -> int) -> int -end= struct - type t = - { buffer : Bigstring.t - ; mutable off : int - ; mutable len : int } - - let create size = - let buffer = Bigstring.create size in - { buffer; off = 0; len = 0 } - ;; - - let compress t = - if t.len = 0 - then begin - t.off <- 0; - t.len <- 0; - end else if t.off > 0 - then begin - Bigstring.blit ~src:t.buffer ~src_pos:t.off ~dst:t.buffer ~dst_pos:0 ~len:t.len; - t.off <- 0; - end - ;; - - let get t ~f = - let n = f t.buffer ~off:t.off ~len:t.len in - t.off <- t.off + n; - t.len <- t.len - n; - if t.len = 0 - then t.off <- 0; - n - ;; - - let put t ~f = - compress t; - let n = f t.buffer ~off:(t.off + t.len) ~len:(Bigstring.length t.buffer - t.len) in - t.len <- t.len + n; - n - ;; -end - let read fd buffer = let badfd fd = failwithf "read got back fd: %s" (Fd.to_string fd) () in let rec finish fd buffer result = @@ -89,126 +39,175 @@ let read fd buffer = open Httpaf +let close_read socket = + let fd = Socket.fd socket in + if not (Fd.is_closed fd) + then Socket.shutdown socket `Receive; + Deferred.return () + +let close_write socket = + let fd = Socket.fd socket in + if not (Fd.is_closed fd) + then Socket.shutdown socket `Send; + Deferred.return () + module Server = struct + let start_read_write_loops + ?(readf=read) + ?(writev=Faraday_async.writev_of_fd) + ?(close_read=close_read) + ?(close_write=close_write) + ~config + ~socket + connection = + let fd = Socket.fd socket in + let writev = writev fd in + let read_complete = Ivar.create () in + let buffer = Buffer.create config.Config.read_buffer_size in + let rec reader_thread () = + match Server_connection.next_read_operation connection with + | `Read -> + (* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *) + readf fd buffer + >>> begin function + | `Eof -> + Buffer.get buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read_eof connection bigstring ~off ~len) + |> ignore; + reader_thread () + | `Ok _ -> + Buffer.get buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read connection bigstring ~off ~len) + |> ignore; + reader_thread () + end + | `Yield -> + (* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *) + Server_connection.yield_reader connection reader_thread + | `Close -> + (* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *) + Deferred.don't_wait_for + (close_read socket >>| Ivar.fill read_complete) + in + let write_complete = Ivar.create () in + let rec writer_thread () = + match Server_connection.next_write_operation connection with + | `Write iovecs -> + (* Log.Global.printf "write(%d)%!" (Fd.to_int_exn fd); *) + writev iovecs >>> fun result -> + Server_connection.report_write_result connection result; + writer_thread () + | `Yield -> + (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) + Server_connection.yield_writer connection writer_thread; + | `Close _ -> + (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) + Deferred.don't_wait_for + (close_write socket >>| Ivar.fill write_complete) + in + let conn_monitor = Monitor.create () in + Scheduler.within ~monitor:conn_monitor reader_thread; + Scheduler.within ~monitor:conn_monitor writer_thread; + Monitor.detach_and_iter_errors conn_monitor ~f:(fun exn -> + Server_connection.shutdown connection; + Log.Global.error "%s" (Exn.to_string exn); + if not (Fd.is_closed fd) + then don't_wait_for (Fd.close fd)); + (* The Tcp module will close the file descriptor once this becomes determined. *) + Deferred.all_unit + [ Ivar.read read_complete + ; Ivar.read write_complete ] + let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = fun client_addr socket -> - let fd = Socket.fd socket in - let writev = Faraday_async.writev_of_fd fd in let request_handler = request_handler client_addr in let error_handler = error_handler client_addr in let conn = Server_connection.create ~config ~error_handler request_handler in - let read_complete = Ivar.create () in - let buffer = Buffer.create config.read_buffer_size in - let rec reader_thread () = - match Server_connection.next_read_operation conn with - | `Read -> - (* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *) - read fd buffer - >>> begin function - | `Eof -> - Buffer.get buffer ~f:(fun bigstring ~off ~len -> - Server_connection.read_eof conn bigstring ~off ~len) - |> ignore; - reader_thread () - | `Ok _ -> - Buffer.get buffer ~f:(fun bigstring ~off ~len -> - Server_connection.read conn bigstring ~off ~len) - |> ignore; - reader_thread () - end - | `Yield -> - (* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *) - Server_connection.yield_reader conn reader_thread - | `Close -> - (* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *) - Ivar.fill read_complete (); - if not (Fd.is_closed fd) - then Socket.shutdown socket `Receive - in - let write_complete = Ivar.create () in - let rec writer_thread () = - match Server_connection.next_write_operation conn with - | `Write iovecs -> - (* Log.Global.printf "write(%d)%!" (Fd.to_int_exn fd); *) - writev iovecs >>> fun result -> - Server_connection.report_write_result conn result; - writer_thread () - | `Yield -> - (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) - Server_connection.yield_writer conn writer_thread; - | `Close _ -> - (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) - Ivar.fill write_complete (); - if not (Fd.is_closed fd) - then Socket.shutdown socket `Send - in - let conn_monitor = Monitor.create () in - Scheduler.within ~monitor:conn_monitor reader_thread; - Scheduler.within ~monitor:conn_monitor writer_thread; - Monitor.detach_and_iter_errors conn_monitor ~f:(fun exn -> - Server_connection.shutdown conn; - Log.Global.error "%s" (Exn.to_string exn); - if not (Fd.is_closed fd) - then don't_wait_for (Fd.close fd)); - (* The Tcp module will close the file descriptor once this becomes determined. *) - Deferred.all_unit - [ Ivar.read read_complete - ; Ivar.read write_complete ] + start_read_write_loops ~config ~socket conn + + module SSL = struct + let create_connection_handler + ?server + ?certfile + ?keyfile + ?(config=Config.default) + ~request_handler + ~error_handler = + fun client_addr socket -> + let request_handler = request_handler client_addr in + let error_handler = error_handler client_addr in + let conn = Server_connection.create ~config ~error_handler request_handler in + Ssl_io.make_server ?server ?certfile ?keyfile socket >>= fun ssl -> + let ssl_reader = Ssl_io.reader ssl in + let ssl_writer = Ssl_io.writer ssl in + let readf = Ssl_io.readf ssl_reader in + let writev = Ssl_io.writev ssl_writer in + let close_read = Ssl_io.close_read ssl_reader in + let close_write = Ssl_io.close_write ssl_writer in + start_read_write_loops + ~config + ~readf + ~writev + ~socket + ~close_read + ~close_write + conn + end end module Client = struct - let request ?(config=Config.default) socket request ~error_handler ~response_handler = + let start_read_write_loops + ?(readf=read) + ?(writev=Faraday_async.writev_of_fd) + ?(close_read=close_read) + ~config + ~socket + connection = let fd = Socket.fd socket in - let writev = Faraday_async.writev_of_fd fd in - let request_body, conn = - Client_connection.request request ~error_handler ~response_handler in + let writev = writev fd in let read_complete = Ivar.create () in - let buffer = Buffer.create config.read_buffer_size in + let buffer = Buffer.create config.Config.read_buffer_size in let rec reader_thread () = - match Client_connection.next_read_operation conn with + match Client_connection.next_read_operation connection with | `Read -> (* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *) - read fd buffer + readf fd buffer >>> begin function | `Eof -> Buffer.get buffer ~f:(fun bigstring ~off ~len -> - Client_connection.read_eof conn bigstring ~off ~len) + Client_connection.read_eof connection bigstring ~off ~len) |> ignore; reader_thread () | `Ok _ -> Buffer.get buffer ~f:(fun bigstring ~off ~len -> - Client_connection.read conn bigstring ~off ~len) + Client_connection.read connection bigstring ~off ~len) |> ignore; reader_thread () end | `Close -> (* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *) - Ivar.fill read_complete (); - if not (Fd.is_closed fd) - then Socket.shutdown socket `Receive + Deferred.don't_wait_for (close_read socket >>| Ivar.fill read_complete) in let write_complete = Ivar.create () in let rec writer_thread () = - match Client_connection.next_write_operation conn with + match Client_connection.next_write_operation connection with | `Write iovecs -> (* Log.Global.printf "write(%d)%!" (Fd.to_int_exn fd); *) writev iovecs >>> fun result -> - Client_connection.report_write_result conn result; + Client_connection.report_write_result connection result; writer_thread () | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) - Client_connection.yield_writer conn writer_thread; + Client_connection.yield_writer connection writer_thread; | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); - if not (Fd.is_closed fd) - then Socket.shutdown socket `Send in let conn_monitor = Monitor.create () in Scheduler.within ~monitor:conn_monitor reader_thread; Scheduler.within ~monitor:conn_monitor writer_thread; Monitor.detach_and_iter_errors conn_monitor ~f:(fun exn -> - Client_connection.shutdown conn; + Client_connection.shutdown connection; Log.Global.error "%s" (Exn.to_string exn); if not (Fd.is_closed fd) then don't_wait_for (Fd.close fd)); @@ -218,6 +217,33 @@ module Client = struct ; Ivar.read write_complete ] >>| fun () -> if not (Fd.is_closed fd) - then don't_wait_for (Fd.close fd)); + then don't_wait_for (Fd.close fd)) + + let request ?(config=Config.default) socket request ~error_handler ~response_handler = + let request_body, conn = + Client_connection.request request ~error_handler ~response_handler in + + start_read_write_loops ~config ~socket conn; request_body + + module SSL = struct + let request ?client ?(config=Config.default) socket request ~error_handler ~response_handler = + let request_body, conn = + Client_connection.request request ~error_handler ~response_handler in + + Ssl_io.make_client ?client socket >>| begin fun ssl -> + let ssl_reader = Ssl_io.reader ssl in + let ssl_writer = Ssl_io.writer ssl in + let readf = Ssl_io.readf ssl_reader in + let writev = Ssl_io.writev ssl_writer in + + start_read_write_loops + ~config + ~readf + ~writev + ~socket + conn + end |> Deferred.don't_wait_for; + request_body + end end diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624d..bc3695e2 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -11,6 +11,20 @@ module Server : sig -> ([< Socket.Address.t] as 'a) -> ([`Active], 'a) Socket.t -> unit Deferred.t + + module SSL : sig + val create_connection_handler + : ?server : Ssl_io.server + -> ?certfile : string + -> ?keyfile : string + -> ?config : Config.t + -> request_handler : ('a -> Server_connection.request_handler) + -> error_handler : ('a -> Server_connection.error_handler) + -> ([< Socket.Address.t] as 'a) + -> ([`Active], 'a) Socket.t + -> unit Deferred.t + end + end module Client : sig @@ -21,4 +35,15 @@ module Client : sig -> error_handler : Client_connection.error_handler -> response_handler : Client_connection.response_handler -> [`write] Body.t + + module SSL : sig + val request + : ?client : Ssl_io.client + -> ?config : Config.t + -> ([`Active], [< Socket.Address.t]) Socket.t + -> Request.t + -> error_handler : Client_connection.error_handler + -> response_handler : Client_connection.response_handler + -> [`write] Body.t + end end diff --git a/async/ssl_io_dummy.ml b/async/ssl_io_dummy.ml new file mode 100644 index 00000000..f6d49675 --- /dev/null +++ b/async/ssl_io_dummy.ml @@ -0,0 +1,28 @@ +let readf _socket = + fun _fd _buffer -> + failwith "Ssl not available" + +let writev _socket _fd = + fun _iovecs -> + failwith "Ssl not available" + +let close_read _ssl_reader = fun _socket -> + failwith "Ssl not available" + +let close_write _ssl_writer = fun _socket -> + failwith "Ssl not available" + +type client = [ `Ssl_not_available ] +type server = [ `Ssl_not_available ] + +let reader _ = + failwith "Ssl not available" + +let writer _ = + failwith "Ssl not available" + +let make_client ?client:_ _socket = + failwith "Ssl not available" + +let make_server ?server:_ ?certfile:_ ?keyfile:_ _socket = + failwith "Ssl not available" diff --git a/async/ssl_io_real.ml b/async/ssl_io_real.ml new file mode 100644 index 00000000..da891dc4 --- /dev/null +++ b/async/ssl_io_real.ml @@ -0,0 +1,116 @@ +open Core +open Async +open Async_ssl + +module Unix = Core.Unix + + +let readf ssl_reader = + fun _fd buffer -> + Buffer.put_async buffer ~f:(fun bigstring ~off ~len -> + let bigsubstr = Bigsubstring.create ~pos:off ~len bigstring in + Reader.read_bigsubstring ssl_reader bigsubstr >>| function + | `Eof -> 0 + | `Ok n -> n) + >>| function + | 0 -> `Eof + | n -> `Ok n + +let writev ssl_writer _fd = + fun iovecs -> + let iovecs_q = Queue.create ~capacity:(List.length iovecs) () in + let len = List.fold ~init:0 ~f:(fun acc { Faraday.buffer; off = pos; len } -> + Queue.enqueue iovecs_q (Unix.IOVec.of_bigstring ~pos ~len buffer); + acc + len) iovecs + in + Writer.schedule_iovecs ssl_writer iovecs_q; + Writer.flushed ssl_writer + >>| fun () -> `Ok len + +let close_read ssl_reader = fun _socket -> + Reader.close ssl_reader + +let close_write ssl_writer = fun _socket -> + Writer.close ssl_writer + +type client = Reader.t * Writer.t +type server = Reader.t * Writer.t + +let reader (r, _) = r +let writer (_, w) = w + +(* taken from https://github.com/janestreet/async_extra/blob/master/src/tcp.ml *) +let reader_writer_of_sock ?buffer_age_limit ?reader_buffer_size ?writer_buffer_size s = + let fd = Socket.fd s in + ( Reader.create ?buf_len:reader_buffer_size fd + , Writer.create ?buffer_age_limit ?buf_len:writer_buffer_size fd ) + +let connect r w = + let net_to_ssl = Reader.pipe r in + let ssl_to_net = Writer.pipe w in + let app_to_ssl, app_wr = Pipe.create () in + let app_rd, ssl_to_app = Pipe.create () in + Ssl.client + ~app_to_ssl + ~ssl_to_app + ~net_to_ssl + ~ssl_to_net + () + |> Deferred.Or_error.ok_exn + >>= fun conn -> + Reader.of_pipe (Info.of_string "httpaf_async_ssl_reader") app_rd >>= fun app_reader -> + Writer.of_pipe (Info.of_string "httpaf_async_ssl_writer") app_wr >>| fun (app_writer,_) -> + don't_wait_for begin + Deferred.all_unit [ + Writer.close_finished app_writer ; + Reader.close_finished app_reader ; + ] >>= fun () -> + Ssl.Connection.close conn ; + Pipe.close_read app_rd ; + Writer.close w ; + end ; + (app_reader, app_writer) + +let make_client ?client socket = + match client with + | Some client -> Deferred.return client + | None -> + let reader, writer = reader_writer_of_sock socket in + connect reader writer + +let listen ~crt_file ~key_file r w = + let net_to_ssl = Reader.pipe r in + let ssl_to_net = Writer.pipe w in + let app_to_ssl, app_wr = Pipe.create () in + let app_rd, ssl_to_app = Pipe.create () in + Ssl.server + ~crt_file + ~key_file + ~app_to_ssl + ~ssl_to_app + ~net_to_ssl + ~ssl_to_net + () + |> Deferred.Or_error.ok_exn + >>= fun conn -> + Reader.of_pipe (Info.of_string "httpaf_async_ssl_reader") app_rd >>= fun app_reader -> + Writer.of_pipe (Info.of_string "httpaf_async_ssl_writer") app_wr >>| fun (app_writer,_) -> + don't_wait_for begin + Deferred.all_unit [ + Reader.close_finished app_reader; + Writer.close_finished app_writer + ] >>= fun () -> + Ssl.Connection.close conn ; + Pipe.close_read app_rd ; + Writer.close w ; + end; + (app_reader, app_writer) + +let make_server ?server ?certfile ?keyfile socket = + match server, certfile, keyfile with + | Some server, _, _ -> Deferred.return server + | None, Some crt_file, Some key_file -> + let reader, writer = reader_writer_of_sock socket in + listen ~crt_file ~key_file reader writer + | _ -> + failwith "Certfile and Keyfile required when server isn't provided" diff --git a/certificates/server.key b/certificates/server.key new file mode 100644 index 00000000..5a92851d --- /dev/null +++ b/certificates/server.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQC2QEje5rwhlD2iq162+Ng3AH9BfA/jNJLDqi9VPk1eMUNGicJv +K+aOANKIsOOr9v4RiEXZSYmFEvGSy+Sf1bCDHwHLLSdNs6Y49b77POgatrVZOTRE +BE/t1soVT3a/vVJWCLtVCjm70u0S5tcfn4S6IapeIYAVAmcaqwSa+GQNoQIDAQAB +AoGAd/CShG8g/JBMh9Nz/8KAuKHRHc2BvysIM1C62cSosgaFmdRrazJfBrEv3Nlc +2/0uc2dVYIxuvm8bIFqi2TWOdX9jWJf6oXwEPXCD0SaDbJTaoh0b+wjyHuaGlttY +Ztvmf8mK1BOhyl3vNMxh/8Re0dGvGgPZHpn8zanaqfGVz+ECQQDngieUpwzxA0QZ +GZKRYhHoLEaPiQzBaXphqWcCLLN7oAKxZlUCUckxRRe0tKINf0cB3Kr9gGQjPpm0 +YoqXo8mNAkEAyYgdd+JDi9FH3Cz6ijvPU0hYkriwTii0V09+Ar5DvYQNzNEIEJu8 +Q3Yte/TPRuK8zhnp97Bsy9v/Ji/LSWbtZQJBAJe9y8u3otfmWCBLjrIUIcCYJLe4 +ENBFHp4ctxPJ0Ora+mjkthuLF+BfdSZQr1dBcX1a8giuuvQO+Bgv7r9t75ECQC7F +omEyaA7JEW5uGe9/Fgz0G2ph5rkdBU3GKy6jzcDsJu/EC6UfH8Bgawn7tSd0c/E5 +Xm2Xyog9lKfeK8XrV2kCQQCTico5lQPjfIwjhvn45ALc/0OrkaK0hQNpXgUNFJFQ +tuX2WMD5flMyA5PCx5XBU8gEMHYa8Kr5d6uoixnbS0cZ +-----END RSA PRIVATE KEY----- diff --git a/certificates/server.pem b/certificates/server.pem new file mode 100644 index 00000000..22a82fb7 --- /dev/null +++ b/certificates/server.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICYzCCAcwCCQDLbE6ES1ih1DANBgkqhkiG9w0BAQUFADB2MQswCQYDVQQGEwJB +VTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0 +cyBQdHkgTHRkMRUwEwYDVQQDDAxZT1VSIE5BTUUhISExGDAWBgkqhkiG9w0BCQEW +CW1lQGJhci5kZTAeFw0xNDAyMTcyMjA4NDVaFw0xNTAyMTcyMjA4NDVaMHYxCzAJ +BgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5l +dCBXaWRnaXRzIFB0eSBMdGQxFTATBgNVBAMMDFlPVVIgTkFNRSEhITEYMBYGCSqG +SIb3DQEJARYJbWVAYmFyLmRlMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC2 +QEje5rwhlD2iq162+Ng3AH9BfA/jNJLDqi9VPk1eMUNGicJvK+aOANKIsOOr9v4R +iEXZSYmFEvGSy+Sf1bCDHwHLLSdNs6Y49b77POgatrVZOTREBE/t1soVT3a/vVJW +CLtVCjm70u0S5tcfn4S6IapeIYAVAmcaqwSa+GQNoQIDAQABMA0GCSqGSIb3DQEB +BQUAA4GBAIo4ZppIlp3JRyltRC1/AyCC0tsh5TdM3W7258wdoP3lEe08UlLwpnPc +aJ/cX8rMG4Xf4it77yrbVrU3MumBEGN5TW4jn4+iZyFbp6TT3OUF55nsXDjNHBbu +deDVpGuPTI6CZQVhU5qEMF3xmlokG+VV+HCDTglNQc+fdLM0LoNF +-----END CERTIFICATE----- diff --git a/examples/async/async_https_echo_post.ml b/examples/async/async_https_echo_post.ml new file mode 100644 index 00000000..de7bb372 --- /dev/null +++ b/examples/async/async_https_echo_post.ml @@ -0,0 +1,69 @@ +open Core +open Async + +open Httpaf +open Httpaf_async + + +let error_handler _ ?request:_ error start_response = + let response_body = start_response Headers.empty in + begin match error with + | `Exn exn -> + Body.write_string response_body (Exn.to_string exn); + Body.write_string response_body "\n"; + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + Body.close_writer response_body +;; + +let request_handler _ reqd = + match Reqd.request reqd with + | { Request.meth = `POST; headers } -> + let response = + let content_type = + match Headers.get headers "content-type" with + | None -> "application/octet-stream" + | Some x -> x + in + Response.create ~headers:(Headers.of_list ["content-type", content_type; "connection", "close"]) `OK + in + let request_body = Reqd.request_body reqd in + let response_body = Reqd.respond_with_streaming reqd response in + let rec on_read buffer ~off ~len = + Body.write_bigstring response_body buffer ~off ~len; + Body.schedule_read request_body ~on_eof ~on_read; + and on_eof () = + print_endline "eof"; + Body.close_writer response_body + in + Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read + | _ -> Reqd.respond_with_string reqd (Response.create `Method_not_allowed) "" +;; + +let main port max_accepts_per_batch () = + let where_to_listen = Tcp.Where_to_listen.bind_to + (Tcp.Bind_to_address.Localhost) + (Tcp.Bind_to_port.On_port port) in + Tcp.(Server.create_sock ~on_handler_error:`Ignore + ~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen) + (Server.SSL.create_connection_handler + ?server:None + ~certfile:"./certificates/server.pem" + ~keyfile:"./certificates/server.key" + ~request_handler + ~error_handler) + >>= fun _server -> + Deferred.never () + +let () = + Command.async_spec + ~summary:"Start a hello world Async server" + Command.Spec.(empty +> + flag "-p" (optional_with_default 8080 int) + ~doc:"int Source port to listen on" + +> + flag "-a" (optional_with_default 1 int) + ~doc:"int Maximum accepts per batch" + ) main + |> Command.run diff --git a/examples/async/async_https_get.ml b/examples/async/async_https_get.ml new file mode 100644 index 00000000..9679f212 --- /dev/null +++ b/examples/async/async_https_get.ml @@ -0,0 +1,49 @@ +open Core +open Async + +open Httpaf +open Httpaf_async + +let response_handler finished response response_body = + match response with + | { Response.status = `OK; _ } -> + let rec on_read bs ~off ~len = + Bigstring.to_string ~off ~len bs |> print_endline; + Body.schedule_read response_body ~on_read ~on_eof + and on_eof () = Ivar.fill finished () in + Body.schedule_read response_body ~on_read ~on_eof; + | response -> + Format.fprintf Format.std_formatter "%a\n%!" Response.pp_hum response; + Core.exit 1 +;; + +let error_handler _ = assert false + +let main port host () = + let where_to_connect = Tcp.Where_to_connect.of_host_and_port { host; port } in + let finished = Ivar.create () in + Tcp.connect_sock where_to_connect + >>= fun socket -> + let headers = Headers.of_list [ "host", host ] in + let request_body = + Client.SSL.request + ~error_handler + ~response_handler:(response_handler finished) + socket + (Request.create ~headers `GET "/") + in + Body.close_writer request_body; + Ivar.read finished +;; + +let () = + Command.async_spec + ~summary:"Start a hello world Async server" + Command.Spec.(empty +> + flag "-p" (optional_with_default 443 int) + ~doc:"int destination port" + +> + flag "-h" (required string) + ~doc:"string destination host" + ) main + |> Command.run diff --git a/examples/async/dune b/examples/async/dune index 465e7fed..0d495b26 100644 --- a/examples/async/dune +++ b/examples/async/dune @@ -1,5 +1,5 @@ (executables (libraries httpaf httpaf-async async core) - (modules async_echo_post async_get async_post) - (names async_echo_post async_get async_post) + (modules async_echo_post async_get async_post async_https_get async_https_echo_post) + (names async_echo_post async_get async_post async_https_get async_https_echo_post) (flags (:standard -w -9)))