Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ crate-type = ["cdylib"]

[dependencies]
arrow = { path = "../arrow", features = ["pyarrow"] }
pyo3 = { version = "0.26.0", features = ["extension-module"] }
pyo3 = { version = "0.27.1", features = ["extension-module"] }
2 changes: 1 addition & 1 deletion arrow-pyarrow-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ publish = false
# Note no dependency on arrow, to ensure arrow-pyarrow can be used by itself
arrow-array = { path = "../arrow-array" }
arrow-pyarrow = { path = "../arrow-pyarrow" }
pyo3 = { version = "0.26.0", default-features = false }
pyo3 = { version = "0.27.1", default-features = false }
1 change: 1 addition & 0 deletions arrow-pyarrow-testing/tests/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ fn test_to_pyarrow() {
let res = Python::attach(|py| {
let py_input = input.to_pyarrow(py)?;
let records = RecordBatch::from_pyarrow_bound(&py_input)?;
println!("records: {records:#?}");
let py_records = records.to_pyarrow(py)?;
RecordBatch::from_pyarrow_bound(&py_records)
})
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ all-features = true
arrow-array = { workspace = true, features = ["ffi"] }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
pyo3 = { version = "0.26.0", default-features = false }
pyo3 = { version = "0.27.1", default-features = false }
104 changes: 73 additions & 31 deletions arrow-pyarrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
//! and then importing the reader as a [ArrowArrayStreamReader].

use std::convert::{From, TryFrom};
use std::ffi::CStr;
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;

Expand All @@ -80,6 +81,10 @@ import_exception!(pyarrow, ArrowException);
/// Represents an exception raised by PyArrow.
pub type PyArrowException = ArrowException;

const ARROW_ARRAY_STREAM_CAPSULE_NAME: &CStr = c"arrow_array_stream";
const ARROW_SCHEMA_CAPSULE_NAME: &CStr = c"arrow_schema";
const ARROW_ARRAY_CAPSULE_NAME: &CStr = c"arrow_array";

fn to_py_err(err: ArrowError) -> PyErr {
PyArrowException::new_err(err.to_string())
}
Expand Down Expand Up @@ -136,7 +141,7 @@ fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
));
}

let capsule_name = capsule_name.unwrap().to_str()?;
let capsule_name = unsafe { capsule_name.unwrap().as_cstr().to_str()? };
if capsule_name != name {
return Err(PyValueError::new_err(format!(
"Expected name '{name}' in PyCapsule, instead got '{capsule_name}'",
Expand All @@ -153,12 +158,16 @@ impl FromPyArrow for DataType {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
return Ok(dtype);
let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let dtype = DataType::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(dtype);
}
}

validate_class("DataType", value)?;
Expand Down Expand Up @@ -189,12 +198,16 @@ impl FromPyArrow for Field {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
return Ok(field);
let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let field = Field::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(field);
}
}

validate_class("Field", value)?;
Expand Down Expand Up @@ -225,12 +238,16 @@ impl FromPyArrow for Schema {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
return Ok(schema);
let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let schema = Schema::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(schema);
}
}

validate_class("Schema", value)?;
Expand Down Expand Up @@ -269,16 +286,25 @@ impl FromPyArrow for ArrayData {
}

let schema_capsule = tuple.get_item(0)?;
let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
let schema_capsule = schema_capsule.cast::<PyCapsule>()?;
let array_capsule = tuple.get_item(1)?;
let array_capsule = array_capsule.downcast::<PyCapsule>()?;
let array_capsule = array_capsule.cast::<PyCapsule>()?;

validate_pycapsule(schema_capsule, "arrow_schema")?;
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
let schema_ptr = schema_capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
let array = unsafe {
FFI_ArrowArray::from_raw(
array_capsule
.pointer_checked(Some(ARROW_ARRAY_CAPSULE_NAME))?
.cast::<FFI_ArrowArray>()
.as_ptr(),
)
};
return unsafe { ffi::from_ffi(array, schema_ptr.as_ref()) }.map_err(to_py_err);
}

validate_class("Array", value)?;
Expand Down Expand Up @@ -322,7 +348,7 @@ impl ToPyArrow for ArrayData {

impl<T: FromPyArrow> FromPyArrow for Vec<T> {
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
let list = value.downcast::<PyList>()?;
let list = value.cast::<PyList>()?;
list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
}
}
Expand All @@ -342,6 +368,7 @@ impl FromPyArrow for RecordBatch {
// Newer versions of PyArrow as well as other libraries with Arrow data implement this
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

if value.hasattr("__arrow_c_array__")? {
let tuple = value.getattr("__arrow_c_array__")?.call0()?;

Expand All @@ -352,17 +379,22 @@ impl FromPyArrow for RecordBatch {
}

let schema_capsule = tuple.get_item(0)?;
let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
let schema_capsule = schema_capsule.cast::<PyCapsule>()?;
let array_capsule = tuple.get_item(1)?;
let array_capsule = array_capsule.downcast::<PyCapsule>()?;
let array_capsule = array_capsule.cast::<PyCapsule>()?;

validate_pycapsule(schema_capsule, "arrow_schema")?;
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let schema_ptr = schema_capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
let array_ptr = array_capsule
.pointer_checked(Some(ARROW_ARRAY_CAPSULE_NAME))?
.cast::<FFI_ArrowArray>();
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_ptr.as_ptr()) };
let mut array_data =
unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
unsafe { ffi::from_ffi(ffi_array, schema_ptr.as_ref()) }.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
"Expected Struct type from __arrow_c_array.",
Expand All @@ -377,7 +409,8 @@ impl FromPyArrow for RecordBatch {
let array = StructArray::from(array_data);
// StructArray does not embed metadata from schema. We need to override
// the output schema with the schema from the capsule.
let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
let schema =
unsafe { Arc::new(Schema::try_from(schema_ptr.as_ref()).map_err(to_py_err)?) };
let (_fields, columns, nulls) = array.into_parts();
assert_eq!(
nulls.map(|n| n.null_count()).unwrap_or_default(),
Expand All @@ -394,7 +427,7 @@ impl FromPyArrow for RecordBatch {

let arrays = value.getattr("columns")?;
let arrays = arrays
.downcast::<PyList>()?
.cast::<PyList>()?
.iter()
.map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
.collect::<PyResult<_>>()?;
Expand Down Expand Up @@ -429,10 +462,17 @@ impl FromPyArrow for ArrowArrayStreamReader {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_stream__")? {
let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_array_stream")?;

let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
let stream = unsafe {
FFI_ArrowArrayStream::from_raw(
capsule
.pointer_checked(Some(ARROW_ARRAY_STREAM_CAPSULE_NAME))?
.cast::<FFI_ArrowArrayStream>()
.as_ptr(),
)
};

let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
Expand Down Expand Up @@ -492,9 +532,11 @@ impl IntoPyArrow for ArrowArrayStreamReader {
#[derive(Debug)]
pub struct PyArrowType<T>(pub T);

impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
Ok(Self(T::from_pyarrow_bound(value)?))
impl<'source, T: FromPyArrow> FromPyObject<'source, 'source> for PyArrowType<T> {
type Error = PyErr;

fn extract(value: Borrowed<'source, 'source, PyAny>) -> PyResult<Self> {
Ok(Self(T::from_pyarrow_bound(&*value)?))
}
}

Expand Down
Loading