Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lower-level support for long-running cohttp-async connections #704

Merged
merged 1 commit into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 55 additions & 34 deletions cohttp-async/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -108,41 +108,62 @@ let request ?ssl_ctx ?uri ?(body=`Empty) req =
raise e
end

module Connection = struct
type t' =
{ ic : Reader.t
; oc : Writer.t }

(* we can't send concurrent requests over HTTP/1 *)
type t = t' Sequencer.t

let connect ?ssl_ctx uri =
Net.connect_uri ?ssl_ctx uri
>>| fun (ic, oc) ->
let t =
{ ic ; oc }
|> Sequencer.create ~continue_on_error:false
in
Throttle.at_kill t (fun { ic ; oc } ->
Deferred.both (Writer.close oc) (Reader.close ic)
>>| fun ((), ()) -> ());
(Deferred.any [ Writer.consumer_left oc ; Reader.close_finished ic ]
>>| fun () ->
Throttle.kill t)
|> don't_wait_for;
t

let close t =
Throttle.kill t;
Throttle.cleaned t

let is_closed t =
Throttle.is_dead t

let request ?(body=Body.empty) t req =
let res = Ivar.create () in
Throttle.enqueue t (fun { ic ; oc } ->
Request.write (fun writer ->
Body_raw.write_body Request.write_body body writer) req oc
>>= fun () ->
read_response ic
>>= fun (resp, body) ->
Ivar.fill res (resp, `Pipe body);
(* block starting any more requests until the consumer has finished reading this request *)
Pipe.closed body)
|> don't_wait_for;
Ivar.read res
end

let callv ?ssl_ctx uri reqs =
let reqs_c = ref 0 in
let resp_c = ref 0 in
Net.connect_uri ?ssl_ctx uri >>= fun (ic, oc) ->
try_with (fun () ->
reqs
|> Pipe.iter ~f:(fun (req, body) ->
Int.incr reqs_c;
Request.write (fun w -> Body_raw.write_body Request.write_body body w)
req oc)
|> don't_wait_for;
let last_body_drained = ref Deferred.unit in
let responses = Reader.read_all ic (fun ic ->
!last_body_drained >>= fun () ->
if Pipe.is_closed reqs && (!resp_c >= !reqs_c) then
return `Eof
else
ic |> read_response >>| fun (resp, body) ->
Int.incr resp_c;
last_body_drained := Pipe.closed body;
`Ok (resp, `Pipe body)
) in
don't_wait_for (
Pipe.closed reqs >>= fun () ->
Pipe.closed responses >>= fun () ->
Writer.close oc
);
return responses)
>>= begin function
| Ok x -> return x
| Error e ->
don't_wait_for (Reader.close ic);
don't_wait_for (Writer.close oc);
raise e
end
Connection.connect ?ssl_ctx uri
>>| fun connection ->
let responses =
Pipe.map' ~max_queue_length:1 reqs ~f:(fun reqs ->
Deferred.Queue.map reqs ~f:(fun (req, body) ->
Connection.request ~body connection req))
in
(Pipe.closed responses >>= fun () -> Connection.close connection) |> don't_wait_for;
responses

let call ?ssl_ctx ?headers ?(chunked=false) ?(body=`Empty) meth uri =
(* Create a request, then make the request. Figure out an appropriate
Expand Down
20 changes: 20 additions & 0 deletions cohttp-async/src/client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ val call :
Uri.t ->
(Cohttp.Response.t * Body.t) Async_kernel.Deferred.t


module Connection : sig
type t

val connect :
?ssl_ctx:Conduit_async_ssl.context ->
Uri.t ->
t Async_kernel.Deferred.t

val close : t -> unit Async_kernel.Deferred.t

val is_closed : t -> bool

val request :
?body: Body.t ->
t ->
Cohttp.Request.t ->
(Cohttp.Response.t * Body.t) Async_kernel.Deferred.t
end

val callv :
?ssl_ctx:Conduit_async_ssl.context ->
Uri.t ->
Expand Down