From f6803676e703d69224e988231311530ac369edfb Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Mon, 6 Jan 2025 14:00:42 -0600 Subject: [PATCH] Add `resp_stream_is_complete()` (#605) To determine if there is data remaining on the stream. Fixes #559 --- NAMESPACE | 1 + NEWS.md | 1 + R/resp-stream.R | 11 +++++++++++ man/resp_stream_raw.Rd | 6 ++++++ tests/testthat/test-resp-stream.R | 33 ++++++++++++++++++++++++------- 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index 129a96cb..165326b0 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -122,6 +122,7 @@ export(resp_retry_after) export(resp_status) export(resp_status_desc) export(resp_stream_aws) +export(resp_stream_is_complete) export(resp_stream_lines) export(resp_stream_raw) export(resp_stream_sse) diff --git a/NEWS.md b/NEWS.md index a72ab2d5..c67a4a98 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,6 @@ # httr2 (development version) +* `resp_stream_is_complete()` tells you if there is still data remaining to be streamed (#559). * New `url_modify()`, `url_modify_query()`, and `url_modify_relative()` make it easier to modify an existing url (#464). * New `url_query_parse()` and `url_query_build()` allow you to parse and build a query string (#425). * `req_url_query()` gains the ability to control how spaces are encoded (#432). diff --git a/R/resp-stream.R b/R/resp-stream.R index 20f632ce..85717c80 100644 --- a/R/resp-stream.R +++ b/R/resp-stream.R @@ -10,6 +10,9 @@ #' * `resp_stream_aws()` retrieves a single event from an AWS stream #' (i.e. mime type `application/vnd.amazon.eventstream``). #' +#' Use `resp_stream_is_complete()` to determine if there is further data +#' waiting on the stream. +#' #' @returns #' * `resp_stream_raw()`: a raw vector. #' * `resp_stream_lines()`: a character vector. @@ -80,6 +83,14 @@ resp_stream_sse <- function(resp, max_size = Inf) { } } +#' @export +#' @rdname resp_stream_raw +resp_stream_is_complete <- function(resp) { + check_response(resp) + + !isIncomplete(resp$body) +} + #' @export #' @param ... Not used; included for compatibility with generic. #' @rdname resp_stream_raw diff --git a/man/resp_stream_raw.Rd b/man/resp_stream_raw.Rd index e386a889..561b4257 100644 --- a/man/resp_stream_raw.Rd +++ b/man/resp_stream_raw.Rd @@ -6,6 +6,7 @@ \alias{resp_stream_sse} \alias{resp_stream_aws} \alias{close.httr2_response} +\alias{resp_stream_is_complete} \title{Read a streaming body a chunk at a time} \usage{ resp_stream_raw(resp, kb = 32) @@ -17,6 +18,8 @@ resp_stream_sse(resp, max_size = Inf) resp_stream_aws(resp, max_size = Inf) \method{close}{httr2_response}(con, ...) + +resp_stream_is_complete(resp) } \arguments{ \item{resp, con}{A streaming \link{response} created by \code{\link[=req_perform_connection]{req_perform_connection()}}.} @@ -56,4 +59,7 @@ no event is currently available. \item \code{resp_stream_aws()} retrieves a single event from an AWS stream (i.e. mime type `application/vnd.amazon.eventstream``). } + +Use \code{resp_stream_is_complete()} to determine if there are further events +waiting on the stream. } diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R index 718b824a..36137e27 100644 --- a/tests/testthat/test-resp-stream.R +++ b/tests/testthat/test-resp-stream.R @@ -1,4 +1,3 @@ - test_that("can stream bytes from a connection", { resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() withr::defer(close(resp)) @@ -16,6 +15,26 @@ test_that("can stream bytes from a connection", { expect_length(out, 0) }) +test_that("can determine if a stream is complete (blocking)", { + resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() + withr::defer(close(resp)) + + expect_false(resp_stream_is_complete(resp)) + expect_length(resp_stream_raw(resp, kb = 2), 2048) + expect_length(resp_stream_raw(resp, kb = 1), 0) + expect_true(resp_stream_is_complete(resp)) +}) + +test_that("can determine if a stream is complete (non-blocking)", { + resp <- request_test("/stream-bytes/2048") %>% req_perform_connection(blocking = FALSE) + withr::defer(close(resp)) + + expect_false(resp_stream_is_complete(resp)) + expect_length(resp_stream_raw(resp, kb = 2), 2048) + expect_length(resp_stream_raw(resp, kb = 1), 0) + expect_true(resp_stream_is_complete(resp)) +}) + test_that("can't read from a closed connection", { resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() close(resp) @@ -42,7 +61,7 @@ test_that("can join lines across multiple reads", { expect_equal(out, character()) expect_equal(resp1$cache$push_back, charToRaw("This is a ")) - while(length(out) == 0) { + while (length(out) == 0) { Sys.sleep(0.1) out <- resp_stream_lines(resp1) } @@ -147,7 +166,7 @@ test_that("streams the specified number of lines", { test_that("can feed sse events one at a time", { req <- local_app_request(function(req, res) { - for(i in 1:3) { + for (i in 1:3) { res$send_chunk(sprintf("data: %s\n\n", i)) } }) @@ -185,7 +204,7 @@ test_that("can join sse events across multiple reads", { expect_equal(out, NULL) expect_equal(resp1$cache$push_back, charToRaw("data: 1\n")) - while(is.null(out)) { + while (is.null(out)) { Sys.sleep(0.1) out <- resp_stream_sse(resp1) } @@ -213,7 +232,7 @@ test_that("sse always interprets data as UTF-8", { withr::defer(close(resp1)) out <- NULL - while(is.null(out)) { + while (is.null(out)) { Sys.sleep(0.1) out <- resp_stream_sse(resp1) } @@ -236,7 +255,7 @@ test_that("streaming size limits enforced", { resp1 <- req_perform_connection(req, blocking = FALSE) withr::defer(close(resp1)) expect_error( - while(is.null(out)) { + while (is.null(out)) { Sys.sleep(0.1) out <- resp_stream_sse(resp1, max_size = 999) } @@ -266,7 +285,7 @@ test_that("has a working find_event_boundary", { } expect_identical( result, - list(matched=charToRaw(matched), remaining = charToRaw(remaining)) + list(matched = charToRaw(matched), remaining = charToRaw(remaining)) ) }