Skip to content

Commit dcf7692

Browse files
authored
Arrow IPC serializers and parsers (#968)
1 parent a3af795 commit dcf7692

11 files changed

+107
-1
lines changed

DESCRIPTION

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,5 @@ Collate:
9898
'validate_api_spec.R'
9999
'zzz.R'
100100
Language: en-US
101-
Config/Needs/check: Cairo
101+
Config/Needs/check: Cairo, r-quantities/units
102102
Config/Needs/website: tidyverse/tidytemplate

NAMESPACE

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export(include_md)
2626
export(include_rmd)
2727
export(is_plumber)
2828
export(options_plumber)
29+
export(parser_arrow_ipc_stream)
2930
export(parser_csv)
3031
export(parser_excel)
3132
export(parser_feather)
@@ -76,6 +77,7 @@ export(registered_serializers)
7677
export(serializer_agg_jpeg)
7778
export(serializer_agg_png)
7879
export(serializer_agg_tiff)
80+
export(serializer_arrow_ipc_stream)
7981
export(serializer_bmp)
8082
export(serializer_cat)
8183
export(serializer_content_type)

NEWS.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# plumber (development version)
22

3+
* Added support for [Arrow IPC Streams](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) (@josiahparry #968)
4+
35
# plumber 1.3.0
46

57
* The port many now be specified as an environment variable. User-provided ports must be between 1024 and 49151 (following [IANA guidelines](https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml)) and may not be a known unsafe port. plumber will now throw an error if an invalid port is requested. (@shikokuchuo @gadenbuie #963)
@@ -20,6 +22,7 @@
2022

2123
* Added `operationId` to each operation within the auto-generated OpenAPI output. The value is similar to the `PATH-VERB`, e.g. `/users/create-POST`. (#986)
2224

25+
* Added support for graphic devices provided by ragg and svglite (@thomasp85 #964)
2326

2427
# plumber 1.2.2
2528

R/parse-body.R

+12
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,17 @@ parser_feather <- function(...) {
496496
}
497497
}
498498

499+
#' @describeIn parsers Arrow IPC parser. See [arrow::read_ipc_stream()] for more details.
500+
#' @export
501+
parser_arrow_ipc_stream <- function(...) {
502+
parser_read_file(function(tmpfile) {
503+
if (!requireNamespace("arrow", quietly = TRUE)) {
504+
stop("`arrow` must be installed for `parser_arrow_ipc_stream` to work")
505+
}
506+
arrow::read_ipc_stream(tmpfile, ...)
507+
})
508+
}
509+
499510
#' @describeIn parsers parquet parser. See [arrow::read_parquet()] for more details.
500511
#' @export
501512
parser_parquet <- function(...) {
@@ -623,6 +634,7 @@ register_parsers_onLoad <- function() {
623634
register_parser("rds", parser_rds, fixed = "application/rds")
624635
register_parser("feather", parser_feather, fixed = c("application/vnd.apache.arrow.file", "application/feather"))
625636
register_parser("parquet", parser_parquet, fixed = "application/vnd.apache.parquet")
637+
register_parser("arrow_ipc_stream", parser_arrow_ipc_stream, fixed = "application/vnd.apache.arrow.stream")
626638
register_parser("excel", parser_excel, fixed = c("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel"))
627639
register_parser("text", parser_text, fixed = "text/plain", regex = "^text/")
628640
register_parser("tsv", parser_tsv, fixed = c("application/tab-separated-values", "text/tab-separated-values"))

R/serializer.R

+16
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,21 @@ serializer_feather <- function(type = "application/vnd.apache.arrow.file") {
291291
)
292292
}
293293

294+
#' @describeIn serializers Arrow IPC serializer. See also: [arrow::write_ipc_stream()]
295+
#' @export
296+
serializer_arrow_ipc_stream <- function(type = "application/vnd.apache.arrow.stream") {
297+
if (!requireNamespace("arrow", quietly = TRUE)) {
298+
stop("`arrow` must be installed for `serializer_arrow_ipc_stream` to work")
299+
}
300+
serializer_write_file(
301+
fileext = "",
302+
type = type,
303+
write_fn = function(val, tmpfile) {
304+
arrow::write_ipc_stream(val, tmpfile)
305+
}
306+
)
307+
}
308+
294309
#' @describeIn serializers parquet serializer. See also: [arrow::write_parquet()]
295310
#' @export
296311
serializer_parquet <- function(type = "application/vnd.apache.parquet") {
@@ -708,6 +723,7 @@ add_serializers_onLoad <- function() {
708723
register_serializer("csv", serializer_csv)
709724
register_serializer("tsv", serializer_tsv)
710725
register_serializer("feather", serializer_feather)
726+
register_serializer("arrow_ipc_stream", serializer_arrow_ipc_stream)
711727
register_serializer("parquet", serializer_parquet)
712728
register_serializer("excel", serializer_excel)
713729
register_serializer("yaml", serializer_yaml)

inst/WORDLIST

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ HTTPOnly
2121
HTTPS
2222
Hookable
2323
IANA
24+
IPC
2425
IPs
2526
IPv
2627
JSON

man/parsers.Rd

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/serializers.Rd

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/testthat/test-parse-body.R

+20
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,26 @@ test_that("Test feather parser", {
109109
expect_equal(parsed, r_object)
110110
})
111111

112+
test_that("Test Arrow IPC parser", {
113+
skip_if_not_installed("arrow")
114+
115+
tmp <- tempfile()
116+
on.exit({
117+
file.remove(tmp)
118+
}, add = TRUE)
119+
120+
r_object <- iris
121+
arrow::write_ipc_stream(r_object, tmp)
122+
val <- readBin(tmp, "raw", 10000)
123+
124+
parsed <- parse_body(val, "application/vnd.apache.arrow.stream", make_parser("arrow_ipc_stream"))
125+
# convert from feather tibble to data.frame
126+
parsed <- as.data.frame(parsed, stringsAsFactors = FALSE)
127+
attr(parsed, "spec") <- NULL
128+
129+
expect_equal(parsed, r_object)
130+
})
131+
112132
test_that("Test parquet parser", {
113133
skip_if_not_installed("arrow")
114134

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
context("Arrow IPC serializer")
2+
3+
test_that("Arrow IPC serializes properly", {
4+
skip_if_not_installed("arrow")
5+
6+
d <- data.frame(a=1, b=2, c="hi")
7+
val <- serializer_arrow_ipc_stream()(d, data.frame(), PlumberResponse$new(), stop)
8+
expect_equal(val$status, 200L)
9+
expect_equal(val$headers$`Content-Type`, "application/vnd.apache.arrow.stream")
10+
11+
# can test by doing a full round trip if we believe the parser works via `test-parse-body.R`
12+
parsed <- parse_body(val$body, "application/vnd.apache.arrow.stream", make_parser("arrow_ipc_stream"))
13+
# convert from feather tibble to data.frame
14+
parsed <- as.data.frame(parsed, stringsAsFactors = FALSE)
15+
attr(parsed, "spec") <- NULL
16+
17+
expect_equal(parsed, d)
18+
})
19+
20+
test_that("Errors call error handler", {
21+
skip_if_not_installed("arrow")
22+
23+
errors <- 0
24+
errHandler <- function(req, res, err){
25+
errors <<- errors + 1
26+
}
27+
28+
expect_equal(errors, 0)
29+
serializer_feather()(parse(text="hi"), data.frame(), PlumberResponse$new("csv"), errorHandler = errHandler)
30+
expect_equal(errors, 1)
31+
})
32+
33+
test_that("Errors are rendered correctly with debug TRUE", {
34+
skip_if_not_installed("arrow")
35+
36+
pr <- pr() %>% pr_get("/", function() stop("myerror"), serializer = serializer_feather()) %>% pr_set_debug(TRUE)
37+
capture.output(res <- pr$serve(make_req(pr = pr), PlumberResponse$new("csv")))
38+
39+
expect_match(res$body, "Error in (function () : myerror", fixed = TRUE)
40+
})
41+

tests/testthat/test-serializer-feather.R

+1
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ test_that("Errors are rendered correctly with debug TRUE", {
3838

3939
expect_match(res$body, "Error in (function () : myerror", fixed = TRUE)
4040
})
41+

0 commit comments

Comments
 (0)