Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Investigate weak performance #3

Open
krlmlr opened this issue Oct 23, 2022 · 30 comments
Open

Investigate weak performance #3

krlmlr opened this issue Oct 23, 2022 · 30 comments

Comments

@krlmlr
Copy link
Member

krlmlr commented Oct 23, 2022

For duckdb, it seems that going through a RecordBatchReader is slower than through data frames. Further investigation is needed.

I've tested various scenarios for 10k rows of flights data: streaming, reading as data frame, writing to parquet files.

adbc + duckdb

Stream

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(conflicted)

con <- dbConnect(
  adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
)

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:10000, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.438   0.036   3.450

bench::mark(
  as.data.frame(dbStreamTable(con, "flights"))[10000:1, ]
)
#> # A tibble: 1 × 6
#>   expression                                                  min median itr/s…¹
#>   <bch:expr>                                              <bch:t> <bch:>   <dbl>
#> 1 as.data.frame(dbStreamTable(con, "flights"))[10000:1, ]  32.3ms 32.9ms    30.2
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

Created on 2022-10-23 with reprex v2.0.2

Read

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(conflicted)

con <- dbConnect(
  adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
)

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:10000, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.395   0.035   3.438

bench::mark(
  dbReadTable(con, "flights")[10000:1, ]
)
#> # A tibble: 1 × 6
#>   expression                                  min median itr/s…¹ mem_a…² gc/se…³
#>   <bch:expr>                             <bch:tm> <bch:>   <dbl> <bch:b>   <dbl>
#> 1 dbReadTable(con, "flights")[10000:1, ]   29.2ms 30.4ms    32.6  19.5MB    4.34
#> # … with abbreviated variable names ¹​`itr/sec`, ²​mem_alloc, ³​`gc/sec`

Created on 2022-10-23 with reprex v2.0.2

Parquet

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(arrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(conflicted)

con <- dbConnect(
  adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
)

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:10000, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.407   0.034   3.451

bench::mark(
  write_dataset(dbStreamTable(con, "flights"), "f.p")
)
#> # A tibble: 1 × 6
#>   expression                                               min   median itr/se…¹
#>   <bch:expr>                                          <bch:tm> <bch:tm>    <dbl>
#> 1 write_dataset(dbStreamTable(con, "flights"), "f.p")   35.8ms   36.9ms     26.9
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

Created on 2022-10-23 with reprex v2.0.2

duckdb + DBI classic

Stream

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(conflicted)

con <- dbConnect(
  duckdb::duckdb()
)

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:10000, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.660   0.038   3.685

bench::mark(
  as.data.frame(dbStreamTable(con, "flights"))[10000:1, ]
)
#> # A tibble: 1 × 6
#>   expression                                                  min median itr/s…¹
#>   <bch:expr>                                              <bch:t> <bch:>   <dbl>
#> 1 as.data.frame(dbStreamTable(con, "flights"))[10000:1, ]  22.2ms   23ms    43.7
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

Created on 2022-10-23 with reprex v2.0.2

Read

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(conflicted)

con <- dbConnect(
  duckdb::duckdb()
)

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:10000, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.665   0.034   3.673

bench::mark(
  dbReadTable(con, "flights")[10000:1, ]
)
#> # A tibble: 1 × 6
#>   expression                                  min median itr/s…¹ mem_a…² gc/se…³
#>   <bch:expr>                             <bch:tm> <bch:>   <dbl> <bch:b>   <dbl>
#> 1 dbReadTable(con, "flights")[10000:1, ]   12.8ms 13.1ms    75.5   2.3MB       0
#> # … with abbreviated variable names ¹​`itr/sec`, ²​mem_alloc, ³​`gc/sec`

Created on 2022-10-23 with reprex v2.0.2

Parquet

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(arrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(conflicted)

con <- dbConnect(
  duckdb::duckdb()
)

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:10000, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.629   0.034   3.663

bench::mark(
  write_dataset(dbStreamTable(con, "flights"), "f.p")
)
#> # A tibble: 1 × 6
#>   expression                                               min   median itr/se…¹
#>   <bch:expr>                                          <bch:tm> <bch:tm>    <dbl>
#> 1 write_dataset(dbStreamTable(con, "flights"), "f.p")   26.1ms   26.7ms     37.2
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

Created on 2022-10-23 with reprex v2.0.2

@nealrichardson
Copy link

I might be missing the point here, but a couple of thoughts:

  • I wouldn't expect the duckdb -> data.frame conversion to get faster because you went duckdb -> arrow::RecordBatchReader -> data.frame. Though significantly slower merits investigation, and I wouldn't be surprised if there was room for more/better parallelization, ALTREP, etc. that would help. I know @paleolimbot has been working on some of this kind of stuff in nanoarrow.
  • Where I would expect performance (and fidelity) benefits of ADBC: (1) connecting multiple ADBC services, so you don't have to convert to an R data.frame in the middle; (2) connecting to databases that aren't in-process or aren't local--duckdb can create R data structures directly.

@paleolimbot
Copy link

If you point me to the code that's getting evaluated here I might be able to help! (I browsed the repo a bit but I'm not familiar enough with duckdb/DBI to find exactly what's getting evaluated).

@jonkeane
Copy link

I also have a theory that I was hoping to test, but had trouble getting adbc+duckdb installed.

What I tried:

Installing duckdb from source (with R CMD INSTALL . inside of duckdb/tools/rpkg)

and then:

> library(adbc)
> library(duckdb)
> #> Loading required package: DBI
> library(dbplyr)
> library(glue)
> 
> load_driver(glue("Driver={asNamespace('duckdb')$.__NAMESPACE__.$DLLs$duckdb[['path']]};Entrypoint=duckdb_adbc_driver_init"))
Error in normalizePath(path, mustWork = TRUE) : 
  path[1]="Driver=/Library/Frameworks/R.framework/Versions/4.1/Resources/library/duckdb/libs/duckdb.so;Entrypoint=duckdb_adbc_driver_init": No such file or directory
> con <- dbConnect(
+   adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
+ )
Error in h(simpleError(msg, call)) : 
  error in evaluating the argument 'drv' in selecting a method for function 'dbConnect': Could not load driver: error code 9: dlsym() failed: dlsym(0x214695940, duckdb_adbc_init): symbol not found.

Am I missing something obvious?

@krlmlr
Copy link
Member Author

krlmlr commented Oct 24, 2022

No, it currently works on my machine (TM).

You need:

Sorry for the mess, will clean up soon.

@jonkeane
Copy link

Thanks for that — with those pointers I was able to get this running, and my theory was wrong: I was worried that this might be using duckdb's chunk size (IIRC 1024 rows) as rowgroup size which can lead to performance issues (we ended up needing to do some work with the C-bridge work to make sure we requested slightly larger groups.

> library(adbc)
> library(duckdb)
Loading required package: DBI
> con <- dbConnect(
+   adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
+ )
> # filter out time_hour, or else we get a type error
> # I'm using dbWriteTable here because I ran into a d(b)plyr error about the backend
> # not supporting all()
> dbWriteTable(con, "flights", dplyr::select(nycflights13::flights[1:10000, ], -time_hour), overwrite = TRUE)
> RBR <- dbStreamTable(con, "flights")
> length(RBR$batches())
[1] 1

I was worried this would be something like 10 batches where each batch was too small — but it looks good

@krlmlr
Copy link
Member Author

krlmlr commented Oct 27, 2022

Thanks. I am confused about the Parquet results, though. I suspect we're comparing duckdb -> Arrow C -> Parquet with duckdb -> data frame -> Parquet, the latter is still faster in my results.

Useful next steps:

  • Create renv or rstudio.cloud project or similar for accurate replication
    • Also test with arrow 10.0.0
  • Measure with larger tables
    • Work around dbWriteTable() problems
  • Test with out-of-memory backend, e.g., Postgres?
    • Needs building the Arrow ADBC driver, failed for me last time I tried
  • Profile this code to see where time is spent

This gives us a few more data points that can help understand future directions.

CC @lidavidm.

@lidavidm
Copy link

lidavidm commented Oct 27, 2022

What was the error you ran into with Postgres? (Also the Postgres driver is still very much a WIP, unfortunately - I've been too distracted to invest in it)

I was meaning to take the time to profile here and I'll see if I can catch up today

@paleolimbot
Copy link

I spent some time trying to wrap my mind around ADBC so I can be of some help here and ended up with https://github.com/paleolimbot/radbc (may or may not be useful here, but feel free to use anything that is!).

@lidavidm
Copy link

lidavidm commented Oct 28, 2022

Is there a package I'm missing? I'm getting this when I try to run the ADBC example:

Error in `sql_render()`:
! `all` argument not supported by this backend

I'm pretty sure I have the latest DBI from GitHub…

> rlang::last_error()
<error/rlang_error>
Error in `sql_render()`:
! `all` argument not supported by this backend
---
Backtrace:
  1. flights %>% sql_render()
  3. dbplyr:::sql_render.tbl_lazy(.)
  5. dbplyr:::sql_render.lazy_values_query(...)
  7. dbplyr:::sql_values_subquery.DBIConnection(con, query$x, lvl = lvl)
  8. dbplyr:::sql_values_subquery_default(con, df, lvl = lvl, row = FALSE)
 10. dbplyr:::sql_render.set_op_query(union_query, con = con, lvl = lvl + 1)
Run `rlang::last_trace()` to see the full context.
> rlang::last_trace()
<error/rlang_error>
Error in `sql_render()`:
! `all` argument not supported by this backend
---
Backtrace:
     ▆
  1. ├─flights %>% sql_render()
  2. ├─dbplyr::sql_render(.)
  3. └─dbplyr:::sql_render.tbl_lazy(.)
  4.   ├─dbplyr::sql_render(...)
  5.   └─dbplyr:::sql_render.lazy_values_query(...)
  6.     ├─dbplyr:::sql_values_subquery(con, query$x, lvl = lvl)
  7.     └─dbplyr:::sql_values_subquery.DBIConnection(con, query$x, lvl = lvl)
  8.       └─dbplyr:::sql_values_subquery_default(con, df, lvl = lvl, row = FALSE)
  9.         ├─dbplyr::sql_render(union_query, con = con, lvl = lvl + 1)
 10.         └─dbplyr:::sql_render.set_op_query(union_query, con = con, lvl = lvl + 1)
 11.           └─cli::cli_abort("{.arg all} argument not supported by this backend")
 12.             └─rlang::abort(...)

@krlmlr
Copy link
Member Author

krlmlr commented Oct 29, 2022

Sorry, I forgot to push a change to the adbc package. 😱

Below are results for one more iteration, with 100000 rows. DBI classic is still faster everywhere.

I'll spin up an rstudio.cloud project and try to do some profiling.

adbc + duckdb

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(arrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(conflicted)

adbc <- TRUE
n <- 100000

if (adbc) {
  con <- dbConnect(
    adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
  )
} else {
  con <- dbConnect(
    duckdb::duckdb()
  )
}

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:n, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>  34.828   0.620  36.235

bench::mark(
  as.data.frame(dbStreamTable(con, "flights"))[n:1, ]
)
#> Warning: Some expressions had a GC in every iteration; so filtering is disabled.
#> # A tibble: 1 × 6
#>   expression                                               min   median itr/se…¹
#>   <bch:expr>                                          <bch:tm> <bch:tm>    <dbl>
#> 1 as.data.frame(dbStreamTable(con, "flights"))[n:1, ]    286ms    288ms     3.48
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

bench::mark(
  dbReadTable(con, "flights")[n:1, ]
)
#> Warning: Some expressions had a GC in every iteration; so filtering is disabled.
#> # A tibble: 1 × 6
#>   expression                              min   median `itr/sec` mem_a…¹ gc/se…²
#>   <bch:expr>                         <bch:tm> <bch:tm>     <dbl> <bch:b>   <dbl>
#> 1 dbReadTable(con, "flights")[n:1, ]    438ms    444ms      2.25  22.4MB    2.25
#> # … with abbreviated variable names ¹​mem_alloc, ²​`gc/sec`

bench::mark(
  write_dataset(dbStreamTable(con, "flights"), "f.p")
)
#> # A tibble: 1 × 6
#>   expression                                               min   median itr/se…¹
#>   <bch:expr>                                          <bch:tm> <bch:tm>    <dbl>
#> 1 write_dataset(dbStreamTable(con, "flights"), "f.p")    248ms    248ms     4.03
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

Created on 2022-10-29 with reprex v2.0.2

duckdb + DBI classic

library(adbc)
library(duckdb)
#> Loading required package: DBI
library(dbplyr)
library(arrow)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(conflicted)

adbc <- FALSE
n <- 100000

if (adbc) {
  con <- dbConnect(
    adbc(asNamespace("duckdb")$.__NAMESPACE__.$DLLs$duckdb[["path"]], "duckdb_adbc_init")
  )
} else {
  con <- dbConnect(
    duckdb::duckdb()
  )
}

# dbWriteTable() doesn't work yet
flights <- copy_inline(con, nycflights13::flights[1:n, ])
sql <- flights %>% sql_render()
system.time(dbExecute(con, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>  36.585   0.494  37.184

bench::mark(
  as.data.frame(dbStreamTable(con, "flights"))[n:1, ]
)
#> Warning: Some expressions had a GC in every iteration; so filtering is disabled.
#> # A tibble: 1 × 6
#>   expression                                               min   median itr/se…¹
#>   <bch:expr>                                          <bch:tm> <bch:tm>    <dbl>
#> 1 as.data.frame(dbStreamTable(con, "flights"))[n:1, ]    192ms    193ms     5.13
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

bench::mark(
  dbReadTable(con, "flights")[n:1, ]
)
#> # A tibble: 1 × 6
#>   expression                              min   median `itr/sec` mem_a…¹ gc/se…²
#>   <bch:expr>                         <bch:tm> <bch:tm>     <dbl> <bch:b>   <dbl>
#> 1 dbReadTable(con, "flights")[n:1, ]    107ms    107ms      9.36  22.4MB    28.1
#> # … with abbreviated variable names ¹​mem_alloc, ²​`gc/sec`

bench::mark(
  write_dataset(dbStreamTable(con, "flights"), "f.p")
)
#> # A tibble: 1 × 6
#>   expression                                               min   median itr/se…¹
#>   <bch:expr>                                          <bch:tm> <bch:tm>    <dbl>
#> 1 write_dataset(dbStreamTable(con, "flights"), "f.p")    154ms    156ms     6.41
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

Created on 2022-10-29 with reprex v2.0.2

@krlmlr
Copy link
Member Author

krlmlr commented Oct 29, 2022

Can't build duckdb on rstudio.cloud, out of memory?

The new home of the adbc branch is https://github.com/r-dbi/duckdb, I will keep the main branch of this fork updated. I'll try to get binary packages from r-universe.

Next: profiling.

@paleolimbot
Copy link

paleolimbot commented Oct 31, 2022

I did some poking, and it seems like the majority of the time is ALTREP materialization in both cases but it does seem like there's extra overhead from the ADBC/DBI end of things (3x faster for dbi to get the record batch reader). My S4 knowhow is not good and I had trouble locating the code that's generating the RecordBatchReader (but with a few links I'd be happy to investigate further).

# install.packages("remotes")
# remotes::install_github("r-dbi/adbc")
# remotes::install_github("r-dbi/duckdb/tools/rpkg", build = FALSE)
library(duckadbc)
#> Loading required package: DBI
library(adbc)
library(dbplyr)
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

n <- 100000

duck_con_adbc <- dbConnect(
  adbc(asNamespace("duckadbc")$.__NAMESPACE__.$DLLs$duckadbc[["path"]], "duckdb_adbc_init")
)

duck_con_dbi <- dbConnect(duckadbc::duckdb())

flights <- copy_inline(duck_con_adbc, nycflights13::flights[1:n, ])
sql <- flights %>% sql_render()

system.time(dbExecute(duck_con_adbc, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.988   0.318   4.608
system.time(dbExecute(duck_con_dbi, paste0("CREATE TABLE flights AS ", sql)))
#>    user  system elapsed 
#>   3.876   0.251   4.281

bench::mark(
  adbc_rbr = dbStreamTable(duck_con_adbc, "flights"),
  adbc = as.data.frame(dbStreamTable(duck_con_adbc, "flights")),
  adbc_materialized = as.data.frame(dbStreamTable(duck_con_adbc, "flights"))[n:1, ],
  dbi_rbr = dbStreamTable(duck_con_dbi, "flights"),
  dbi = as.data.frame(dbStreamTable(duck_con_dbi, "flights")),
  dbi_materialized = as.data.frame(dbStreamTable(duck_con_dbi, "flights"))[n:1, ],
  iterations = 5,
  check = FALSE
)
#> Warning: Some expressions had a GC in every iteration; so filtering is disabled.
#> # A tibble: 6 × 6
#>   expression             min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>        <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 adbc_rbr           103.2ms  106.8ms      9.34     8.9MB     0   
#> 2 adbc               105.2ms  108.7ms      9.23    6.58MB     0   
#> 3 adbc_materialized  197.4ms  202.3ms      4.77   23.56MB     5.72
#> 4 dbi_rbr             33.8ms   34.4ms     27.5    11.47MB     5.51
#> 5 dbi                 34.5ms   35.8ms     27.6    12.63MB    11.0 
#> 6 dbi_materialized   125.6ms  126.3ms      7.23   29.66MB     7.23

I had some fun last week while waiting for Arrow's CI and prototyped a less useful but more literal wrapper around ADBC...a quick check suggests that the bottleneck is data.frame conversion:

# install.packages("remotes")
# remotes::install_github("r-dbi/adbc")
# remotes::install_github("r-dbi/duckdb/tools/rpkg", build = FALSE)
# remotes::install_github("paleolimbot/radbc")
library(duckadbc)
#> Loading required package: DBI
library(adbc)
library(dbplyr)
library(radbc)
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

duck_con_adbc <- dbConnect(
  adbc(asNamespace("duckadbc")$.__NAMESPACE__.$DLLs$duckadbc[["path"]], "duckdb_adbc_init")
)

radbc_driver_duckdb <- function() {
  radbc_driver(
    asNamespace("duckadbc")$.__NAMESPACE__.$DLLs$duckadbc[["path"]],
    "duckdb_adbc_init",
    subclass = "radbc_driver_duckdb"
  )
}

n <- 100000
flights <- copy_inline(duck_con_adbc, nycflights13::flights[1:n, ])
flights_sql <- paste0("CREATE TABLE flights AS ", flights %>% sql_render())

db <- radbc_database_init(radbc_driver_duckdb())
con <- radbc_connection_init(db)

con |> 
  radbc_statement_init() |> 
  radbc_statement_set_sql_query(flights_sql) |> 
  radbc_statement_execute_query() |> 
  as_record_batch_reader() |> 
  as_arrow_table() |> 
  as.data.frame()
#> # A tibble: 1 × 1
#>    Count
#>    <int>
#> 1 100000

dbExecute(duck_con_adbc, flights_sql)
#> [1] 0

bench::mark(
  radbc_nanoarrow_stream = con |> 
    radbc_statement_init() |> 
    radbc_statement_set_sql_query("SELECT * from flights") |> 
    radbc_statement_execute_query(),
  radbc_arrow_record_batch_reader = con |> 
    radbc_statement_init() |> 
    radbc_statement_set_sql_query("SELECT * from flights") |> 
    radbc_statement_execute_query() |> 
    as_record_batch_reader(),
  radbc_arrow_table = con |> 
    radbc_statement_init() |> 
    radbc_statement_set_sql_query("SELECT * from flights") |> 
    radbc_statement_execute_query() |> 
    as_record_batch_reader(),
  radbc = con |> 
    radbc_statement_init() |> 
    radbc_statement_set_sql_query("SELECT * from flights") |> 
    radbc_statement_execute_query() |> 
    as_record_batch_reader() |> 
    as_arrow_table() |> 
    as.data.frame(),
  adbc = as.data.frame(dbStreamTable(duck_con_adbc, "flights")),
  check = FALSE
)
#> Warning: Some expressions had a GC in every iteration; so filtering is disabled.
#> # A tibble: 5 × 6
#>   expression                           min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>                      <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 radbc_nanoarrow_stream            4.64ms   4.82ms    204.      1.09KB     0   
#> 2 radbc_arrow_record_batch_reader    4.9ms   5.17ms    184.      1.09KB     0   
#> 3 radbc_arrow_table                 4.93ms   5.44ms    178.      1.09KB     0   
#> 4 radbc                            93.08ms  95.01ms     10.5     5.35MB     0   
#> 5 adbc                            629.93ms 629.93ms      1.59    7.41MB     1.59

Created on 2022-10-31 with reprex v2.0.2

On my build it looks like this is coming through in a batch size of 1024 (98 batches total). If that value can be configured from duckdb's end, I imagine that a much larger value would result in faster R conversion.

@jonkeane
Copy link

On my build it looks like this is coming through in a batch size of 1024 (98 batches total)

Oh interesting — where did you see that? I suspected the same thing, but when I saw the Record Batch Reader coming out of DBI|adbc was one batch I thought it was good (but it's possible it was re-wrapping that — or I totally looked in the wrong place...)

@lidavidm
Copy link

Thanks for the slick example Dewey! I ran the same script and got this for the final results:

> bench::mark(
  adbc_rbr = dbStreamTable(duck_con_adbc, "flights"),
  adbc = as.data.frame(dbStreamTable(duck_con_adbc, "flights")),
  adbc_materialized = as.data.frame(dbStreamTable(duck_con_adbc, "flights"))[n:1, ],
  dbi_rbr = dbStreamTable(duck_con_dbi, "flights"),
  dbi = as.data.frame(dbStreamTable(duck_con_dbi, "flights")),
  dbi_materialized = as.data.frame(dbStreamTable(duck_con_dbi, "flights"))[n:1, ],
  iterations = 5,
  check = FALSE
)
# A tibble: 6 × 13
  expression            min  median itr/s…¹ mem_a…² gc/se…³ n_itr  n_gc total_…⁴
  <bch:expr>        <bch:t> <bch:t>   <dbl> <bch:b>   <dbl> <int> <dbl> <bch:tm>
1 adbc_rbr           61.9ms  67.3ms   15.1    8.6MB    0        5     0 331.74ms
2 adbc               70.1ms  70.7ms   13.9   6.57MB    0        5     0 359.77ms
3 adbc_materialized 213.8ms 219.2ms    4.51 23.56MB    7.21     5     8    1.11s
4 dbi_rbr            46.8ms  47.4ms   15.0  11.47MB    3.01     5     1 332.45ms
5 dbi                49.9ms    50ms   19.9  12.63MB    0        5     0 250.74ms
6 dbi_materialized  192.3ms 195.3ms    5.05 29.66MB    7.07     5     7 990.67ms
# … with 4 more variables: result <list>, memory <list>, time <list>,
#   gc <list>, and abbreviated variable names ¹​`itr/sec`, ²​mem_alloc,
#   ³​`gc/sec`, ⁴​total_time
# ℹ Use `colnames()` to see all variable names
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled. 

So the difference is much less pronounced for me, though it's still there, and it seems it's the initial RecordBatchReader that's still the issue. I'll see about profiling it…

@lidavidm
Copy link

FWIW I get a 100k row batch:

> rbr <- dbStreamTable(duck_con_adbc, "flights")
> b <- rbr$read_next_batch()
> b
RecordBatch
100000 rows x 19 columns
$year <int32>
$month <int32>
$day <int32>
$dep_time <int32>
$sched_dep_time <int32>
$dep_delay <double>
$arr_time <int32>
$sched_arr_time <int32>
$arr_delay <double>
$carrier <string>
$flight <int32>
$tailnum <string>
$origin <string>
$dest <string>
$air_time <double>
$distance <double>
$hour <double>
$minute <double>
$time_hour <timestamp[us, tz=America/New_York]>

See $metadata for additional Schema metadata
> rbr$read_next_batch()
NULL

@lidavidm
Copy link

Sorry for the naive question - I did a quick dig into the call graph and spotted this: https://github.com/r-dbi/adbc/blob/main/R/dbFetch_Result.R

Does this materialize the data into R before sending it back into Arrow?

dbStreamTable
	dbGetStream
		dbSendQueryArrow 
			dbSendQuery
				[ADBC] cpp_send_query
				[ADBC] adbcResult (trivial?)
		dbStream
			dbFetch
				as.data.frame(as.data.frame(res@record_batch_reader))
			arrow::as_record_batch_reader

@lidavidm
Copy link

Also ran it through Profvis:
Screenshot from 2022-10-31 15-29-16

It really does seem like we're materializing the result into R before sending it back into Arrow?

@paleolimbot
Copy link

It really does seem like we're materializing the result into R before sending it back into Arrow?

The S4 method thing makes it hard to get the source refs (or at least they don't show up on my install), but I did a debug(arrow:::as.data.frame.RecordBatchReader) and this does indeed seem to be the case (only for the adbc package result).

Screen Shot 2022-10-31 at 8 44 08 PM

Oh interesting — where did you see that? I suspected the same thing, but when I saw the Record Batch Reader coming out of DBI|adbc was one batch I thought it was good (but it's possible it was re-wrapping that — or I totally looked in the wrong place...)

Y'all are totally right, the batch size coming off both the DBI and the adbc package readers are the entire dataset in one batch. The 1024 number I got was from the radbc version, which is a literal mapping to the ADBC C calls. I assume this is configurable somewhere (the default for the R package appears to be 1 million rows).

@lidavidm
Copy link

lidavidm commented Nov 1, 2022

Oh, that is a neat trick to learn about, thank you. I'd guess this accounts for the difference, then…

@lidavidm
Copy link

lidavidm commented Nov 2, 2022

@krlmlr one other thing, what's the plan for the DuckDB fork - guessing we are going to try and get it upstreamed at some point?

@krlmlr
Copy link
Member Author

krlmlr commented Nov 7, 2022

  • The duckdb fork will be upstreamed, there are a few loose ends at the moment that make it useful to live in a fork.
  • The adbc R package has indeed implemented dbFetch() in a way that fetches all data from the record batch reader. This is subject to improvement. The intention was that dbStream() calls DuckDB streaming methods directly, bypassing data frames. I'll double-check why this may be the case.

@krlmlr
Copy link
Member Author

krlmlr commented Nov 7, 2022

Indeed, it seems that a bunch of methods aren't implemented yet, both for duckadbc and the adbc package. 🙀🙀🙀

I'll follow up with updated benchmarks, I now really expect improvements.

@lidavidm
Copy link

lidavidm commented Nov 7, 2022

Thanks!

I suppose I don't understand why we'd need anything particular to DuckDB here, vs connecting the RecordBatchReader/ArrowArrayStream from ADBC to an R object directly, but I'll defer to people who actually know R 🙂

@krlmlr
Copy link
Member Author

krlmlr commented Nov 7, 2022

DuckDB is our adbc implementation. We could also use another adbc driver, but so far, I haven't been successful in building and using them. I'll try again soon.

@krlmlr
Copy link
Member Author

krlmlr commented Nov 7, 2022

I have now implemented the Arrow path locally, I'm seeing segfaults, likely from duckdb. Next steps:

  • Understand and fix segfaults
  • Try with existing adbc driver

For the second option, what driver would work best?

@lidavidm
Copy link

lidavidm commented Nov 7, 2022

I don't have a good option sadly, since I've been distracted as of late.

I think I fixed the issues you likely had with the Postgres driver: apache/arrow-adbc#161

However the driver is still very feature-incomplete and hasn't been benchmarked.

I'm still working on the SQLite driver but haven't had much time.

@paleolimbot
Copy link

I found that implementing a driver that did nothing ( https://github.com/paleolimbot/radbc/blob/master/src/driver_void.c ) and a driver that you could spit out an ArrowArrayStream of your choosing ( https://github.com/paleolimbot/radbc/blob/master/src/driver_monkey.c ) was useful for testing in lieu of a driver I could build.

@krlmlr
Copy link
Member Author

krlmlr commented Nov 12, 2022

Thanks. With a tweak to radbc (paleolimbot/radbc#1) and adbc (pushed to the main branch), I can now run benchmarks with a simple static driver that keeps repeating the same output that it sees on input. This shows that the adbc R package doesn't add a lot of overhead, and that there are benefits (if not huge) if the output is processed further by Arrow.

Can you confirm?

# install.packages("remotes")
# remotes::install_github("r-dbi/adbc")
# remotes::install_github("paleolimbot/radbc#1")
library(DBI)

drv <- radbc::radbc_driver_broken_record(nycflights13::flights)

dbi_con <- dbConnect(adbc::from_radbc(drv))

bench::mark(
  adbc_rbr = dbStreamTable(dbi_con, "flights"),
  adbc = as.data.frame(dbStreamTable(dbi_con, "flights")),
  df = dbReadTable(dbi_con, "flights"),
  adbc_roundtrip = as.data.frame(arrow::as_record_batch_reader(dbStreamTable(dbi_con, "flights"))),
  df_roundtrip = as.data.frame(arrow::as_record_batch_reader(dbReadTable(dbi_con, "flights"))),
  check = FALSE
)
#> # A tibble: 5 × 6
#>   expression          min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>     <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 adbc_rbr         23.4ms   23.9ms      41.8   13.75MB     0   
#> 2 adbc             23.9ms   24.6ms      40.6    3.94MB     2.14
#> 3 df               24.2ms   24.8ms      40.4    3.09MB     0   
#> 4 adbc_roundtrip   23.6ms   24.3ms      40.6    2.58MB     2.14
#> 5 df_roundtrip     27.6ms   28.6ms      35.0    6.75MB     2.06

Created on 2022-11-12 with reprex v2.0.2

@paleolimbot
Copy link

Looks good! A note that I think in this case you're mostly seeing the time it takes to convert the string columns in flights:

bench::mark(arrow::as_record_batch_reader(nycflights13::flights))
#> # A tibble: 1 × 6
#>   expression                                                min   median itr/s…¹
#>   <bch:expr>                                           <bch:tm> <bch:tm>   <dbl>
#> 1 arrow::as_record_batch_reader(nycflights13::flights)   23.8ms   24.5ms    40.8
#> # … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>, and abbreviated
#> #   variable name ¹​`itr/sec`

...and if you care about ms-level R evaluation overhead then there's some opportunities to shave that down to 100 us or so:

# install.packages("remotes")
# remotes::install_github("r-dbi/adbc")
# remotes::install_github("paleolimbot/radbc#1")
library(DBI)

drv <- radbc::radbc_driver_broken_record(nycflights13::flights[0, ])

dbi_con <- dbConnect(adbc::from_radbc(drv))
adbc_table <- arrow::as_arrow_table(dbStreamTable(dbi_con, "flights"))

bench::mark(
  base = nanoarrow::as_nanoarrow_array_stream(nycflights13::flights[0, ]),
  adbc_rbr = dbStreamTable(dbi_con, "flights"),
  adbc = as.data.frame(dbStreamTable(dbi_con, "flights")),
  df = dbReadTable(dbi_con, "flights"),
  adbc_roundtrip = as.data.frame(arrow::as_record_batch_reader(dbStreamTable(dbi_con, "flights"))),
  df_roundtrip = as.data.frame(arrow::as_record_batch_reader(dbReadTable(dbi_con, "flights"))),
  radbc_rbr = radbc::radbc_database_init(
    radbc::radbc_driver_broken_record(nycflights13::flights[0, ])
  ) |> 
    radbc::radbc_connection_init() |> 
    radbc::radbc_statement_init() |> 
    radbc::radbc_statement_execute_query(),
  
  radbc_df = radbc::radbc_database_init(
    radbc::radbc_driver_broken_record(nycflights13::flights[0, ])
  ) |> 
    radbc::radbc_connection_init() |> 
    radbc::radbc_statement_init() |> 
    radbc::radbc_statement_execute_query() |> 
    # requires remotes::install_github("apache/arrow-nanoarrow/r#65")
    # which implements all the conversions
    as.data.frame(),
  check = FALSE
)
#> # A tibble: 8 × 6
#>   expression          min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>     <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 base           447.43µs 458.19µs     2135.    3.51KB     21.6
#> 2 adbc_rbr       730.13µs 748.37µs     1298.    3.37KB     19.3
#> 3 adbc              1.2ms   1.26ms      779.  773.67KB     17.5
#> 4 df               1.39ms   1.44ms      675.  311.68KB     19.7
#> 5 adbc_roundtrip    1.2ms   1.26ms      776.   14.01KB     17.3
#> 6 df_roundtrip     2.85ms   2.95ms      328.  251.94KB     18.0
#> 7 radbc_rbr      499.09µs 509.14µs     1855.   107.3KB     21.6
#> 8 radbc_df       539.15µs 563.22µs     1691.  188.02KB     22.4

Created on 2022-11-14 with reprex v2.0.2

@paleolimbot
Copy link

It's a complete hack, but I cobbled together https://github.com/paleolimbot/adbcsqlite3 thanks to the new SQLite driver, which will hopefully help here!

# install.packages("remotes")
# remotes::install_github("paleolimbot/adbcsqlite3")
library(adbcsqlite3)
library(radbc)

dummy_dbi_con <- DBI::dbConnect(RSQLite::SQLite())
n <- 100000
flights <- dbplyr::copy_inline(dummy_dbi_con, nycflights13::flights[1:n, ])
flights_sql <- paste0("CREATE TABLE flights AS ", dbplyr::sql_render(flights))
DBI::dbDisconnect(dummy_dbi_con)

db <- radbc_database_init(adbcsqlite3())
con <- radbc_connection_init(db)

con |> 
  radbc_statement_init() |> 
  radbc_statement_set_sql_query(flights_sql) |> 
  radbc_statement_execute_query() |> 
  as.data.frame()
#> data frame with 0 columns and 0 rows

con |> 
  radbc_statement_init() |> 
  radbc_statement_set_sql_query("SELECT * from flights") |> 
  radbc_statement_execute_query() |> 
  tibble::as_tibble()
#> # A tibble: 100,000 × 19
#>     year month   day dep_time sched_de…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#>    <dbl> <dbl> <dbl>    <dbl>      <dbl>   <dbl>   <dbl>   <dbl>   <dbl> <chr>  
#>  1  2013     1     1      517        515       2     830     819      11 UA     
#>  2  2013     1     1      533        529       4     850     830      20 UA     
#>  3  2013     1     1      542        540       2     923     850      33 AA     
#>  4  2013     1     1      544        545      -1    1004    1022     -18 B6     
#>  5  2013     1     1      554        600      -6     812     837     -25 DL     
#>  6  2013     1     1      554        558      -4     740     728      12 UA     
#>  7  2013     1     1      555        600      -5     913     854      19 B6     
#>  8  2013     1     1      557        600      -3     709     723     -14 EV     
#>  9  2013     1     1      557        600      -3     838     846      -8 B6     
#> 10  2013     1     1      558        600      -2     753     745       8 AA     
#> # … with 99,990 more rows, 9 more variables: flight <dbl>, tailnum <chr>,
#> #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> #   minute <dbl>, time_hour <chr>, and abbreviated variable names
#> #   ¹​sched_dep_time, ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay

Created on 2022-11-30 with reprex v2.0.2

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants