diff --git a/examples/hello_world.ml b/examples/hello_world.ml index 44957e5a..08e6fcae 100644 --- a/examples/hello_world.ml +++ b/examples/hello_world.ml @@ -11,16 +11,17 @@ let print_param = `String ("Hello " ^ param req "name") |> respond') let streaming = + let open Lwt.Infix in get "/hello/stream" (fun _req -> - let count = ref 0 in - let chunk = "00000000000" in - `Streaming - (Lwt_stream.from_direct (fun () -> - if !count < 1000 then ( - incr count ; - Some (chunk ^ "\n") ) - else None)) - |> respond') + let f, push = App.create_stream () in + let timers = + List.map + (fun t -> + Lwt_unix.sleep t + >|= fun () -> push (Printf.sprintf "Hello after %f seconds\n" t)) + [1.; 2.; 3.] + in + f (Lwt.join timers)) let default = not_found (fun _req -> diff --git a/opium/app.ml b/opium/app.ml index 77de5873..a973c9c2 100644 --- a/opium/app.ml +++ b/opium/app.ml @@ -266,6 +266,16 @@ module Response_helpers = struct let respond' ?headers ?code s = s |> respond ?headers ?code |> return + let create_stream () = + let open Lwt.Infix in + let stream, push = Lwt_stream.create () in + let p' w = push (Some w) in + let f ?headers ?code p = + Lwt.async (fun () -> p >|= fun () -> push None) ; + respond' ?headers ?code (`Streaming stream) + in + (f, p') + let redirect ?headers uri = let headers = Cohttp.Header.add_opt headers "Location" (Uri.to_string uri) @@ -302,3 +312,5 @@ let respond' = Response_helpers.respond' let redirect = Response_helpers.redirect let redirect' = Response_helpers.redirect' + +let create_stream = Response_helpers.create_stream diff --git a/opium/app.mli b/opium/app.mli index e17e2ad3..80933705 100644 --- a/opium/app.mli +++ b/opium/app.mli @@ -108,6 +108,14 @@ val respond' : -> body -> Response.t Lwt.t +val create_stream : + unit + -> ( ?headers:Cohttp.Header.t + -> ?code:Cohttp.Code.status_code + -> unit Lwt.t + -> Response.t Lwt.t) + * (string -> unit) + val redirect : ?headers:Cohttp.Header.t -> Uri.t -> Response.t (* Same as return (redirect ...) *)