Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor request queue mechanics #172

Merged
merged 3 commits into from
May 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,6 @@ let output_state t : Output_state.t =
| Waiting -> Waiting
;;

let is_complete t =
match input_state t with
| Ready -> false
| Complete ->
(match output_state t with
| Waiting | Ready -> false
| Complete -> true)
;;

let flush_request_body t =
let request_body = request_body t in
if Body.has_pending_output request_body
Expand Down
104 changes: 60 additions & 44 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ let current_reqd_exn t =

let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
then failwith "yield_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k
Expand Down Expand Up @@ -182,49 +182,36 @@ let set_error_and_handle ?request t error =
let report_exn t exn =
set_error_and_handle t (`Exn exn)

let advance_request_queue_if_necessary t =
if is_active t then begin
let reqd = current_reqd_exn t in
if Reqd.persistent_connection reqd then begin
if Reqd.is_complete reqd then begin
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (current_reqd_exn t);
wakeup_reader t;
end
end else begin
(* Take the head of the queue, close the remaining request bodies, clear
* the queue, and push the head back on. We do not plan on processing any
* more requests after the current one. *)
ignore (Queue.take t.request_queue);
Queue.iter Reqd.close_request_body t.request_queue;
Queue.clear t.request_queue;
Queue.push reqd t.request_queue;
if Reqd.is_complete reqd
then shutdown t
else
match Reqd.input_state reqd with
| Ready -> ()
| Complete -> shutdown_reader t
end
end else if Reader.is_closed t.reader
then shutdown t

let _next_read_operation t =
advance_request_queue_if_necessary t;
if is_active t
let advance_request_queue t =
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (Queue.peek_exn t.request_queue);
;;

let rec _next_read_operation t =
if not (is_active t)
then (
if Reader.is_closed t.reader
then shutdown t;
Reader.next t.reader
) else (
let reqd = current_reqd_exn t in
match Reqd.input_state reqd with
| Ready -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
)

and _final_read_operation_for t reqd =
if not (Reqd.persistent_connection reqd) then (
shutdown_reader t;
Reader.next t.reader;
) else (
match Reqd.output_state reqd with
| Waiting | Ready -> `Yield
| Complete ->
if Reqd.persistent_connection reqd
then `Yield
else (
shutdown_reader t;
Reader.next t.reader)
advance_request_queue t;
_next_read_operation t;
)
else Reader.next t.reader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read looks good.

;;

let next_read_operation t =
Expand Down Expand Up @@ -259,13 +246,42 @@ let read t bs ~off ~len =
let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let next_write_operation t =
advance_request_queue_if_necessary t;
if is_active t
then (
let rec _next_write_operation t =
if not (is_active t)
then Writer.next t.writer
else (
let reqd = current_reqd_exn t in
Reqd.flush_response_body reqd);
Writer.next t.writer
match Reqd.output_state reqd with
| Waiting ->
(* XXX(dpatti): I don't think we should need to call this, but it is
necessary in the case of a streaming, non-chunked body so that you can
set the appropriate flag. *)
Reqd.flush_response_body reqd;
Writer.next t.writer
| Ready ->
Reqd.flush_response_body reqd;
Writer.next t.writer
| Complete -> _final_write_operation_for t reqd
)

and _final_write_operation_for t reqd =
let next =
if not (Reqd.persistent_connection reqd) then (
shutdown_writer t;
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Ready -> Writer.next t.writer;
| Complete ->
advance_request_queue t;
_next_write_operation t;
)
in
wakeup_reader t;
next
;;

let next_write_operation t = _next_write_operation t
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writer looks good.


let report_write_result t result =
Writer.report_result t.writer result