Skip to content

Commit

Permalink
Lower-level support for long-running cohttp-async connections
Browse files Browse the repository at this point in the history
This adds Cohttp_async.Client.Connection which allows you to open
a connection and re-use it for multiple requests.

Unlike callv, this makes it easy to handle errors and correlate
requests and responses.
  • Loading branch information
brendanlong committed Oct 22, 2020
1 parent cfac3e6 commit ce24b30
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 34 deletions.
89 changes: 55 additions & 34 deletions cohttp-async/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -108,41 +108,62 @@ let request ?ssl_ctx ?uri ?(body=`Empty) req =
raise e
end

module Connection = struct
type t' =
{ ic : Reader.t
; oc : Writer.t }

(* we can't send concurrent requests over HTTP/1 *)
type t = t' Sequencer.t

let connect ?ssl_ctx uri =
Net.connect_uri ?ssl_ctx uri
>>| fun (ic, oc) ->
let t =
{ ic ; oc }
|> Sequencer.create ~continue_on_error:false
in
Throttle.at_kill t (fun { ic ; oc } ->
Deferred.both (Writer.close oc) (Reader.close ic)
>>| fun ((), ()) -> ());
(Deferred.any [ Writer.consumer_left oc ; Reader.close_finished ic ]
>>| fun () ->
Throttle.kill t)
|> don't_wait_for;
t

let close t =
Throttle.kill t;
Throttle.cleaned t

let is_closed t =
Throttle.is_dead t

let request ?(body=Body.empty) t req =
let res = Ivar.create () in
Throttle.enqueue t (fun { ic ; oc } ->
Request.write (fun writer ->
Body_raw.write_body Request.write_body body writer) req oc
>>= fun () ->
read_response ic
>>= fun (resp, body) ->
Ivar.fill res (resp, `Pipe body);
(* block starting any more requests until the consumer has finished reading this request *)
Pipe.closed body)
|> don't_wait_for;
Ivar.read res
end

let callv ?ssl_ctx uri reqs =
let reqs_c = ref 0 in
let resp_c = ref 0 in
Net.connect_uri ?ssl_ctx uri >>= fun (ic, oc) ->
try_with (fun () ->
reqs
|> Pipe.iter ~f:(fun (req, body) ->
Int.incr reqs_c;
Request.write (fun w -> Body_raw.write_body Request.write_body body w)
req oc)
|> don't_wait_for;
let last_body_drained = ref Deferred.unit in
let responses = Reader.read_all ic (fun ic ->
!last_body_drained >>= fun () ->
if Pipe.is_closed reqs && (!resp_c >= !reqs_c) then
return `Eof
else
ic |> read_response >>| fun (resp, body) ->
Int.incr resp_c;
last_body_drained := Pipe.closed body;
`Ok (resp, `Pipe body)
) in
don't_wait_for (
Pipe.closed reqs >>= fun () ->
Pipe.closed responses >>= fun () ->
Writer.close oc
);
return responses)
>>= begin function
| Ok x -> return x
| Error e ->
don't_wait_for (Reader.close ic);
don't_wait_for (Writer.close oc);
raise e
end
Connection.connect ?ssl_ctx uri
>>| fun connection ->
let responses =
Pipe.map' ~max_queue_length:1 reqs ~f:(fun reqs ->
Deferred.Queue.map reqs ~f:(fun (req, body) ->
Connection.request ~body connection req))
in
(Pipe.closed responses >>= fun () -> Connection.close connection) |> don't_wait_for;
responses

let call ?ssl_ctx ?headers ?(chunked=false) ?(body=`Empty) meth uri =
(* Create a request, then make the request. Figure out an appropriate
Expand Down
20 changes: 20 additions & 0 deletions cohttp-async/src/client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ val call :
Uri.t ->
(Cohttp.Response.t * Body.t) Async_kernel.Deferred.t


module Connection : sig
type t

val connect :
?ssl_ctx:Conduit_async_ssl.context ->
Uri.t ->
t Async_kernel.Deferred.t

val close : t -> unit Async_kernel.Deferred.t

val is_closed : t -> bool

val request :
?body: Body.t ->
t ->
Cohttp.Request.t ->
(Cohttp.Response.t * Body.t) Async_kernel.Deferred.t
end

val callv :
?ssl_ctx:Conduit_async_ssl.context ->
Uri.t ->
Expand Down

0 comments on commit ce24b30

Please sign in to comment.