Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resp_stream_is_complete() #605

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export(resp_retry_after)
export(resp_status)
export(resp_status_desc)
export(resp_stream_aws)
export(resp_stream_is_complete)
export(resp_stream_lines)
export(resp_stream_raw)
export(resp_stream_sse)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* `resp_stream_is_complete()` tells you if there is still data remaining to be streamed (#559).
* New `url_modify()`, `url_modify_query()`, and `url_modify_relative()` make it easier to modify an existing url (#464).
* New `url_query_parse()` and `url_query_build()` allow you to parse and build a query string (#425).
* `req_url_query()` gains the ability to control how spaces are encoded (#432).
Expand Down
11 changes: 11 additions & 0 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#' * `resp_stream_aws()` retrieves a single event from an AWS stream
#' (i.e. mime type `application/vnd.amazon.eventstream``).
#'
#' Use `resp_stream_is_complete()` to determine if there is further data
#' waiting on the stream.
#'
#' @returns
#' * `resp_stream_raw()`: a raw vector.
#' * `resp_stream_lines()`: a character vector.
Expand Down Expand Up @@ -80,6 +83,14 @@
}
}

#' @export
#' @rdname resp_stream_raw
resp_stream_is_complete <- function(resp) {
check_response(resp)

Check warning on line 89 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L89

Added line #L89 was not covered by tests

!isIncomplete(resp$body)

Check warning on line 91 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L91

Added line #L91 was not covered by tests
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname resp_stream_raw
Expand Down
6 changes: 6 additions & 0 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions tests/testthat/test-resp-stream.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

test_that("can stream bytes from a connection", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection()
withr::defer(close(resp))
Expand All @@ -16,6 +15,26 @@ test_that("can stream bytes from a connection", {
expect_length(out, 0)
})

test_that("can determine if a stream is complete (blocking)", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection()
withr::defer(close(resp))

expect_false(resp_stream_is_complete(resp))
expect_length(resp_stream_raw(resp, kb = 2), 2048)
expect_length(resp_stream_raw(resp, kb = 1), 0)
expect_true(resp_stream_is_complete(resp))
})

test_that("can determine if a stream is complete (non-blocking)", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection(blocking = FALSE)
withr::defer(close(resp))

expect_false(resp_stream_is_complete(resp))
expect_length(resp_stream_raw(resp, kb = 2), 2048)
expect_length(resp_stream_raw(resp, kb = 1), 0)
expect_true(resp_stream_is_complete(resp))
})

test_that("can't read from a closed connection", {
resp <- request_test("/stream-bytes/1024") %>% req_perform_connection()
close(resp)
Expand All @@ -42,7 +61,7 @@ test_that("can join lines across multiple reads", {
expect_equal(out, character())
expect_equal(resp1$cache$push_back, charToRaw("This is a "))

while(length(out) == 0) {
while (length(out) == 0) {
Sys.sleep(0.1)
out <- resp_stream_lines(resp1)
}
Expand Down Expand Up @@ -147,7 +166,7 @@ test_that("streams the specified number of lines", {

test_that("can feed sse events one at a time", {
req <- local_app_request(function(req, res) {
for(i in 1:3) {
for (i in 1:3) {
res$send_chunk(sprintf("data: %s\n\n", i))
}
})
Expand Down Expand Up @@ -185,7 +204,7 @@ test_that("can join sse events across multiple reads", {
expect_equal(out, NULL)
expect_equal(resp1$cache$push_back, charToRaw("data: 1\n"))

while(is.null(out)) {
while (is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}
Expand Down Expand Up @@ -213,7 +232,7 @@ test_that("sse always interprets data as UTF-8", {
withr::defer(close(resp1))

out <- NULL
while(is.null(out)) {
while (is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}
Expand All @@ -236,7 +255,7 @@ test_that("streaming size limits enforced", {
resp1 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp1))
expect_error(
while(is.null(out)) {
while (is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1, max_size = 999)
}
Expand Down Expand Up @@ -266,7 +285,7 @@ test_that("has a working find_event_boundary", {
}
expect_identical(
result,
list(matched=charToRaw(matched), remaining = charToRaw(remaining))
list(matched = charToRaw(matched), remaining = charToRaw(remaining))
)
}

Expand Down
Loading