diff --git a/lib/reqd.ml b/lib/reqd.ml index 106c06f4..8277a637 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -243,15 +243,6 @@ let output_state t : Output_state.t = | Waiting -> Waiting ;; -let is_complete t = - match input_state t with - | Ready -> false - | Complete -> - (match output_state t with - | Waiting | Ready -> false - | Complete -> true) -;; - let flush_request_body t = let request_body = request_body t in if Body.has_pending_output request_body diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 2de70414..6cfe6755 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -81,7 +81,7 @@ let current_reqd_exn t = let yield_reader t k = if is_closed t - then failwith "on_wakeup_reader on closed conn" + then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k @@ -155,6 +155,7 @@ let error_code t = else None let shutdown t = + Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -182,53 +183,44 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - (* Take the head of the queue, close the remaining request bodies, clear - * the queue, and push the head back on. We do not plan on processing any - * more requests after the current one. *) - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - if Reqd.is_complete reqd - then shutdown t - else - match Reqd.input_state reqd with - | Ready -> () - | Complete -> shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); +;; + +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Ready -> Reader.next t.reader - | Complete -> - if Reqd.persistent_connection reqd - then `Yield - else ( - shutdown_reader t; - Reader.next t.reader) + | Complete -> _final_read_operation_for t reqd ) - else Reader.next t.reader + +and _final_read_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) + in + wakeup_writer t; + next ;; let next_read_operation t = match _next_read_operation t with + (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -252,13 +244,39 @@ let read t bs ~off ~len = let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete -let next_write_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let rec _next_write_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( let reqd = current_reqd_exn t in - Reqd.flush_response_body reqd); - Writer.next t.writer + match Reqd.output_state reqd with + | Waiting -> `Yield + | Ready -> + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> _final_write_operation_for t reqd + ) + +and _final_write_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Reqd.input_state reqd with + | Ready -> assert false + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next +;; + +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result