Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 186 additions & 20 deletions crates/duckdb/src/appender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{ffi::c_void, fmt, os::raw::c_char};
use crate::{
Error,
error::result_from_duckdb_appender,
types::{ToSql, ToSqlOutput},
types::{ToSql, ToSqlOutput, value_ref_from_value},
};

/// Appender for fast import data
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Appender<'_> {
///
/// # Failure
///
/// Will return `Err` if append column count not the same with the table schema
/// Returns `Err` if a row cannot be appended.
#[inline]
pub fn append_rows<P, I>(&mut self, rows: I) -> Result<()>
where
Expand Down Expand Up @@ -108,38 +108,61 @@ impl Appender<'_> {
///
/// # Failure
///
/// Will return `Err` if append column count not the same with the table schema
/// Returns `Err` if the row cannot be appended.
#[inline]
pub fn append_row<P: AppenderParams>(&mut self, params: P) -> Result<()> {
let _ = unsafe { ffi::duckdb_appender_begin_row(self.app) };
params.__bind_in(self)?;
// NOTE: we only check end_row return value
let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
result_from_duckdb_appender(rc, &mut self.app)
// AppenderParams normalizes arrays, tuples, slices, and iterators into
// append_parameter_row, which validates up-front, then wraps binding
// with begin_row/end_row.
params.__bind_in(self)
}

#[inline]
pub(crate) fn bind_parameters<P>(&mut self, params: P) -> Result<()>
pub(crate) fn append_parameter_row<P>(&mut self, params: P) -> Result<()>
where
P: IntoIterator,
P::Item: ToSql,
{
for p in params.into_iter() {
self.bind_parameter(&p)?;
let params = params.into_iter().collect::<Vec<_>>();
let values = params
.iter()
.map(ToSql::to_sql)
.collect::<Result<Vec<ToSqlOutput<'_>>>>()?;

self.validate_parameter_values(&values)?;

let _ = unsafe { ffi::duckdb_appender_begin_row(self.app) };
if let Err(err) = self.bind_parameter_values(&values) {
// validate_parameter_values catches unsupported types up-front; this guards
// against an unmapped variant slipping through bind_parameter.
let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
// Prefer cleanup failure here: result_from_duckdb_appender destroys
// an appender that DuckDB reports as invalid after a partial row.
result_from_duckdb_appender(rc, &mut self.app)?;
return Err(err);
}
let rc = unsafe { ffi::duckdb_appender_end_row(self.app) };
result_from_duckdb_appender(rc, &mut self.app)
}

fn validate_parameter_values(&self, values: &[ToSqlOutput<'_>]) -> Result<()> {
for value in values {
let value = to_value_ref(value)?;
validate_appender_value_ref(value)?;
}
Ok(())
}

fn bind_parameter<P: ?Sized + ToSql>(&self, param: &P) -> Result<()> {
let value = param.to_sql()?;
fn bind_parameter_values(&self, values: &[ToSqlOutput<'_>]) -> Result<()> {
for value in values {
self.bind_parameter(value)?;
}
Ok(())
}

fn bind_parameter(&self, value: &ToSqlOutput<'_>) -> Result<()> {
let ptr = self.app;
let value = match value {
ToSqlOutput::Borrowed(v) => v,
ToSqlOutput::Owned(ref v) => ValueRef::from(v),
};
// NOTE: we ignore the return value here
// because if anything failed, end_row will fail
let value = to_value_ref(value)?;
// TODO: append more
let rc = match value {
ValueRef::Null => unsafe { ffi::duckdb_append_null(ptr) },
Expand Down Expand Up @@ -193,7 +216,11 @@ impl Appender<'_> {
ffi::duckdb_destroy_value(&mut value);
res
},
_ => unreachable!("not supported"),
_ => {
return Err(Error::ToSqlConversionFailure(
appending_unsupported_value(value.data_type()).into(),
));
}
};
if rc != 0 {
return Err(Error::AppendError);
Expand Down Expand Up @@ -257,6 +284,48 @@ impl fmt::Debug for Appender<'_> {
}
}

fn appending_unsupported_value(value_type: impl fmt::Display) -> String {
format!("appending {value_type} values is not yet supported")
}

fn to_value_ref<'value, 'output>(value: &'value ToSqlOutput<'output>) -> Result<ValueRef<'value>>
where
'output: 'value,
{
match *value {
ToSqlOutput::Borrowed(v) => Ok(v),
ToSqlOutput::Owned(ref v) => value_ref_from_value(v, appending_unsupported_value),
}
}

fn validate_appender_value_ref(value: ValueRef<'_>) -> Result<()> {
match value {
ValueRef::Null
| ValueRef::Boolean(_)
| ValueRef::TinyInt(_)
| ValueRef::SmallInt(_)
| ValueRef::Int(_)
| ValueRef::BigInt(_)
| ValueRef::HugeInt(_)
| ValueRef::UTinyInt(_)
| ValueRef::USmallInt(_)
| ValueRef::UInt(_)
| ValueRef::UBigInt(_)
| ValueRef::Float(_)
| ValueRef::Double(_)
| ValueRef::Text(_)
| ValueRef::Timestamp(_, _)
| ValueRef::Blob(_)
| ValueRef::Date32(_)
| ValueRef::Time64(_, _)
| ValueRef::Interval { .. }
| ValueRef::Decimal(_) => Ok(()),
_ => Err(Error::ToSqlConversionFailure(
appending_unsupported_value(value.data_type()).into(),
)),
}
}

#[cfg(test)]
mod test {
use rust_decimal::Decimal;
Expand All @@ -278,6 +347,103 @@ mod test {
Ok(())
}

#[test]
fn test_append_unsupported_container_type_returns_error() -> Result<()> {
use arrow::{array::ListArray, datatypes::Int32Type};

use crate::{
ToSql,
types::{ListType, ToSqlOutput, Value, ValueRef},
};

struct OwnedList;
impl ToSql for OwnedList {
fn to_sql(&self) -> Result<ToSqlOutput<'_>> {
Ok(ToSqlOutput::Owned(Value::List(vec![Value::Int(1), Value::Int(2)])))
}
}

struct BorrowedList(ListArray);
impl BorrowedList {
fn new() -> Self {
Self(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![
Some(1),
Some(2),
])]))
}
}
impl ToSql for BorrowedList {
fn to_sql(&self) -> Result<ToSqlOutput<'_>> {
Ok(ToSqlOutput::Borrowed(ValueRef::List(ListType::Regular(&self.0), 0)))
}
}

fn assert_unsupported_list_error(err: Error) {
match err {
Error::ToSqlConversionFailure(e) => {
assert!(
e.to_string().contains("appending List values is not yet supported"),
"unexpected message: {e}"
);
}
other => panic!("expected ToSqlConversionFailure, got {other:?}"),
}
}

let db = Connection::open_in_memory()?;
db.execute_batch("CREATE TABLE foo(id INTEGER, name TEXT)")?;

let list = OwnedList;
let mut app = db.appender("foo")?;
app.append_row(params![10, "before"])?;
app.append_row(params![11, "also before"])?;
let err = app.append_row(params![1, list]).unwrap_err();
assert_unsupported_list_error(err);

let borrowed_list = BorrowedList::new();
let err = app.append_row(params![3, borrowed_list]).unwrap_err();
assert_unsupported_list_error(err);
app.append_row(params![2, "ok"])?;
app.flush()?;

let rows = db
.prepare("SELECT id, name FROM foo ORDER BY id")?
.query_map([], |row| Ok((row.get::<_, i32>(0)?, row.get::<_, String>(1)?)))?
.collect::<Result<Vec<_>>>()?;
assert_eq!(
rows,
vec![
(2, "ok".to_string()),
(10, "before".to_string()),
(11, "also before".to_string())
]
);
let count: i32 = db.query_row("SELECT COUNT(*) FROM foo", [], |row| row.get(0))?;
assert_eq!(count, 3);
Ok(())
}

#[test]
fn test_append_bind_failure_prefers_cleanup_error() -> Result<()> {
let db = Connection::open_in_memory()?;
db.execute_batch("CREATE TABLE foo(id INTEGER, value UUID)")?;

let mut app = db.appender("foo")?;
let err = app.append_row(params![1, 2]).unwrap_err();

match err {
Error::DuckDBFailure(_, Some(msg)) => {
assert!(
msg.contains("Call to EndRow before all columns have been appended to"),
"unexpected message: {msg}"
);
}
other => panic!("expected DuckDBFailure from appender cleanup, got {other:?}"),
}
assert!(app.app.is_null());
Ok(())
}

#[test]
fn test_append_rows() -> Result<()> {
let db = Connection::open_in_memory()?;
Expand Down
21 changes: 10 additions & 11 deletions crates/duckdb/src/appender_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ use sealed::Sealed;
pub trait AppenderParams: Sealed {
// XXX not public api, might not need to expose.
//
// Binds the parameters to the statement. It is unlikely calling this
// explicitly will do what you want. Please use `Statement::query` or
// Binds the parameters to the appender. It is unlikely calling this
// explicitly will do what you want. Please use `Appender::append_row` or
// similar directly.
//
// For now, just hide the function in the docs...
Expand All @@ -134,18 +134,17 @@ impl Sealed for [&dyn ToSql; 0] {}
impl AppenderParams for [&dyn ToSql; 0] {
#[inline]
fn __bind_in(self, stmt: &mut Appender<'_>) -> Result<()> {
// Note: Can't just return `Ok(())` — `Statement::bind_parameters`
// checks that the right number of params were passed too.
// TODO: we should have tests for `Error::InvalidParameterCount`...
stmt.bind_parameters(&[] as &[&dyn ToSql])
// Route through the normal append path so DuckDB can reject empty rows
// for tables that require values.
stmt.append_parameter_row(&[] as &[&dyn ToSql])
}
}

impl Sealed for &[&dyn ToSql] {}
impl AppenderParams for &[&dyn ToSql] {
#[inline]
fn __bind_in(self, stmt: &mut Appender<'_>) -> Result<()> {
stmt.bind_parameters(self)
stmt.append_parameter_row(self)
}
}

Expand All @@ -156,14 +155,14 @@ macro_rules! impl_for_array_ref {
impl<T: ToSql + ?Sized> Sealed for &[&T; $N] {}
impl<T: ToSql + ?Sized> AppenderParams for &[&T; $N] {
fn __bind_in(self, stmt: &mut Appender<'_>) -> Result<()> {
stmt.bind_parameters(self)
stmt.append_parameter_row(self)
}
}
impl<T: ToSql> Sealed for [T; $N] {}
impl<T: ToSql> AppenderParams for [T; $N] {
#[inline]
fn __bind_in(self, stmt: &mut Appender<'_>) -> Result<()> {
stmt.bind_parameters(&self)
stmt.append_parameter_row(&self)
}
}
)+};
Expand All @@ -189,7 +188,7 @@ macro_rules! impl_appender_params_for_tuple {
fn __bind_in(self, stmt: &mut Appender<'_>) -> Result<()> {
#[allow(non_snake_case)]
let ($($T,)+) = &self;
stmt.bind_parameters(&[$($T as &dyn ToSql),+])
stmt.append_parameter_row([$($T as &dyn ToSql),+])
}
}
};
Expand Down Expand Up @@ -340,7 +339,7 @@ where
{
#[inline]
fn __bind_in(self, stmt: &mut Appender<'_>) -> Result<()> {
stmt.bind_parameters(self.0)
stmt.append_parameter_row(self.0)
}
}

Expand Down
27 changes: 24 additions & 3 deletions crates/duckdb/src/pragma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
Connection, DatabaseName, Result, Row,
error::Error,
ffi,
types::{ToSql, ToSqlOutput, ValueRef},
types::{ToSql, ToSqlOutput, ValueRef, binding_unsupported_value, value_ref_from_value},
};

pub struct Sql {
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Sql {
let value = value.to_sql()?;
let value = match value {
ToSqlOutput::Borrowed(v) => v,
ToSqlOutput::Owned(ref v) => ValueRef::from(v),
ToSqlOutput::Owned(ref v) => value_ref_from_value(v, binding_unsupported_value)?,
};
match value {
ValueRef::BigInt(i) => {
Expand All @@ -76,7 +76,7 @@ impl Sql {
_ => {
return Err(Error::DuckDBFailure(
ffi::Error::new(ffi::DuckDBError),
Some(format!("Unsupported value \"{value:?}\"")),
Some(format!("Unsupported value type {}", value.data_type())),
));
}
};
Expand Down Expand Up @@ -305,6 +305,27 @@ mod test {
Ok(())
}

#[test]
fn pragma_rejects_unsupported_container_type() -> Result<()> {
use crate::{Error, types::Value};

let db = Connection::open_in_memory()?;
let err = db
.pragma(None, "table_info", &Value::List(vec![Value::Int(1)]), |_| Ok(()))
.unwrap_err();

match err {
Error::ToSqlConversionFailure(e) => {
assert!(
e.to_string().contains("binding List parameters is not yet supported"),
"unexpected message: {e}"
);
}
other => panic!("expected ToSqlConversionFailure, got {other:?}"),
}
Ok(())
}

#[test]
fn pragma_update() -> Result<()> {
let db = Connection::open_in_memory()?;
Expand Down
Loading
Loading