Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

give the body direct access to its sink writer #218

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@

module Reader = struct
type t =
{ faraday : Faraday.t
; mutable read_scheduled : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
{ faraday : Faraday.t
; mutable read_scheduled : bool
; mutable on_eof : unit -> unit
; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit
}

let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())
let default_on_eof = Sys.opaque_identity (fun () -> ())
let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ())

let create buffer =
{ faraday = Faraday.of_bigstring buffer
; read_scheduled = false
; on_eof = default_on_eof
; on_read = default_on_read
{ faraday = Faraday.of_bigstring buffer
; read_scheduled = false
; on_eof = default_on_eof
; on_read = default_on_read
}

let create_empty () =
Expand Down Expand Up @@ -107,26 +107,33 @@ module Writer = struct
| Chunked of { mutable written_final_chunk : bool }

type t =
{ faraday : Faraday.t
; encoding : encoding
; when_ready_to_write : unit -> unit
; buffered_bytes : int ref
{ faraday : Faraday.t
; encoding : encoding
; writer : Serialize.Writer.t
; buffered_bytes : int ref
}

let of_faraday faraday ~encoding ~when_ready_to_write =
let _of_faraday faraday ~encoding ~writer =
let encoding =
match encoding with
| `Fixed _ | `Close_delimited -> Identity
| `Chunked -> Chunked { written_final_chunk = false }
in
{ faraday
; encoding
; when_ready_to_write
; writer
; buffered_bytes = ref 0
}

let create buffer ~encoding ~when_ready_to_write =
of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write
let create buffer ~encoding ~writer =
_of_faraday (Faraday.of_bigstring buffer) ~encoding ~writer

(* XXX(dpatti): [create_direct] allows you to write directly to the response
writer instead of going through an intermediary buffer, but at the cost of
not having the ability to transfer with the correct encoding. We should get
rid of this. *)
let create_direct ~encoding ~writer =
_of_faraday (Serialize.Writer.faraday writer) ~encoding ~writer

let write_char t c =
Faraday.write_char t.faraday c
Expand All @@ -140,7 +147,7 @@ module Writer = struct
let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_write t = t.when_ready_to_write ()
let ready_to_write t = Serialize.Writer.wakeup t.writer

let flush t kontinue =
Faraday.flush t.faraday kontinue;
Expand All @@ -166,7 +173,7 @@ module Writer = struct
in
faraday_has_output || additional_encoding_output

let transfer_to_writer t writer =
let transfer_to_writer t =
let faraday = t.faraday in
begin match Faraday.operation faraday with
| `Yield -> ()
Expand All @@ -176,9 +183,9 @@ module Writer = struct
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk writer [];
Serialize.Writer.schedule_chunk t.writer [];
end);
Serialize.Writer.unyield writer;
Serialize.Writer.unyield t.writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
Expand All @@ -187,10 +194,10 @@ module Writer = struct
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs
| Identity -> Serialize.Writer.schedule_fixed t.writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs
end;
Serialize.Writer.flush writer (fun () ->
Serialize.Writer.flush t.writer (fun () ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ module Oneshot = struct
failwith "Httpaf.Client_connection.request: invalid body length"
in
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size)
~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer)
~encoding ~writer
in
let t =
{ request
Expand All @@ -89,7 +89,7 @@ module Oneshot = struct

let flush_request_body t =
if Body.Writer.has_pending_output t.request_body
then Body.Writer.transfer_to_writer t.request_body t.writer
then Body.Writer.transfer_to_writer t.request_body
;;

let set_error_and_handle_without_shutdown t error =
Expand Down
5 changes: 2 additions & 3 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
failwith "httpaf.Reqd.respond_with_streaming: invalid response body length"
in
let response_body =
Body.Writer.create t.response_body_buffer ~encoding ~when_ready_to_write:(fun () ->
Writer.wakeup t.writer)
Body.Writer.create t.response_body_buffer ~encoding ~writer:t.writer
in
Writer.write_response t.writer response;
if t.persistent then
Expand Down Expand Up @@ -257,5 +256,5 @@ let flush_request_body t =
let flush_response_body t =
match t.response_state with
| Streaming (_, response_body) ->
Body.Writer.transfer_to_writer response_body t.writer
Body.Writer.transfer_to_writer response_body
| _ -> ()
3 changes: 1 addition & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ let set_error_and_handle ?request t error =
| `Error (`Bad_gateway | `Internal_server_error) ->
failwith "httpaf.Server_connection.error_handler: invalid response body length"
in
Body.Writer.of_faraday (Writer.faraday writer) ~encoding
~when_ready_to_write:(fun () -> Writer.wakeup writer));
Body.Writer.create_direct ~encoding ~writer);
end

let report_exn t exn =
Expand Down