Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
/// Default values for [`WriterProperties::coerce_types`]
pub const DEFAULT_COERCE_TYPES: bool = false;
/// Default values for [`WriterProperties::internal_buffer_enabled`]
pub const DEFAULT_INTERNAL_BUFFER_ENABLED: bool = true;

/// Parquet writer version.
///
Expand Down Expand Up @@ -169,6 +171,7 @@ pub struct WriterProperties {
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
internal_buffer_enabled: bool,
#[cfg(feature = "encryption")]
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
}
Expand Down Expand Up @@ -335,6 +338,13 @@ impl WriterProperties {
self.coerce_types
}

/// Returns `true` if internal buffer is enabled.
///
/// For more details see [`WriterPropertiesBuilder::set_internal_buffer_enabled`]
pub fn internal_buffer_enabled(&self) -> bool {
self.internal_buffer_enabled
}

/// Returns encoding for a data page, when dictionary encoding is enabled.
///
/// This is not configurable.
Expand Down Expand Up @@ -458,6 +468,7 @@ pub struct WriterPropertiesBuilder {
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
internal_buffer_enabled: bool,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<FileEncryptionProperties>,
}
Expand All @@ -481,6 +492,7 @@ impl Default for WriterPropertiesBuilder {
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
coerce_types: DEFAULT_COERCE_TYPES,
internal_buffer_enabled: DEFAULT_INTERNAL_BUFFER_ENABLED,
#[cfg(feature = "encryption")]
file_encryption_properties: None,
}
Expand All @@ -506,6 +518,7 @@ impl WriterPropertiesBuilder {
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
internal_buffer_enabled: self.internal_buffer_enabled,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
}
Expand Down Expand Up @@ -700,6 +713,21 @@ impl WriterPropertiesBuilder {
self
}

/// Enables an internal buffer to improve small write operations.
///
/// If `true` (default), small writes will be collected into an internal
/// buffer before calling the underlying `Write::write` method. This is
/// essential for maintaining good performance when the underlying writer
/// is an I/O-sensitive sink (like a network socket or raw file handle).
///
/// Buffering is often redundant and can be slightly less performant when
/// the underlying writer is already buffered (e.g., writing to a `Vec<u8>`
/// or a `std::io::BufWriter`). In such cases, this option can be set to `false`.
pub fn set_internal_buffer_enabled(mut self, internal_buffer_enabled: bool) -> Self {
self.internal_buffer_enabled = internal_buffer_enabled;
self
}

/// Sets FileEncryptionProperties (defaults to `None`)
#[cfg(feature = "encryption")]
pub fn with_file_encryption_properties(
Expand Down Expand Up @@ -958,6 +986,7 @@ impl From<WriterProperties> for WriterPropertiesBuilder {
sorting_columns: props.sorting_columns,
column_index_truncate_length: props.column_index_truncate_length,
statistics_truncate_length: props.statistics_truncate_length,
internal_buffer_enabled: props.internal_buffer_enabled,
coerce_types: props.coerce_types,
#[cfg(feature = "encryption")]
file_encryption_properties: props.file_encryption_properties,
Expand Down
66 changes: 51 additions & 15 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,35 @@ use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
use crate::file::{metadata::*, PARQUET_MAGIC};
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};

/// WriterMode for the [`TrackedWrite`]
enum WriterMode<W: Write> {
Buffered(BufWriter<W>),
Unbuffered(W),
}

/// A wrapper around a [`Write`] that keeps track of the number
/// of bytes that have been written. The given [`Write`] is wrapped
/// with a [`BufWriter`] to optimize writing performance.
/// of bytes that have been written. In buffered mode the given
/// [`Write`] is wrapped with a [`BufWriter`] to optimize writing
/// performance for I/O-sensitive sinks.
pub struct TrackedWrite<W: Write> {
inner: BufWriter<W>,
inner: WriterMode<W>,
bytes_written: usize,
}

impl<W: Write> TrackedWrite<W> {
/// Create a new [`TrackedWrite`] from a [`Write`]
/// Create a new [`TrackedWrite`] from a [`Write`] with buffer
pub fn new(inner: W) -> Self {
let buf_write = BufWriter::new(inner);
Self {
inner: buf_write,
inner: WriterMode::Buffered(buf_write),
bytes_written: 0,
}
}

/// Create a new unbuffered [`TrackedWrite`] from a [`Write`]
pub fn new_unbuffered(inner: W) -> Self {
Self {
inner: WriterMode::Unbuffered(inner),
bytes_written: 0,
}
}
Expand All @@ -70,47 +85,64 @@ impl<W: Write> TrackedWrite<W> {

/// Returns a reference to the underlying writer.
pub fn inner(&self) -> &W {
self.inner.get_ref()
match &self.inner {
WriterMode::Buffered(w) => w.get_ref(),
WriterMode::Unbuffered(w) => w,
}
}

/// Returns a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer, doing so
/// will likely result in data corruption
pub fn inner_mut(&mut self) -> &mut W {
self.inner.get_mut()
match &mut self.inner {
WriterMode::Buffered(w) => w.get_mut(),
WriterMode::Unbuffered(w) => w,
}
}

/// Returns the underlying writer.
pub fn into_inner(self) -> Result<W> {
self.inner.into_inner().map_err(|err| {
ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
})
match self.inner {
WriterMode::Buffered(w) => w.into_inner().map_err(|err| {
ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
}),
WriterMode::Unbuffered(w) => Ok(w),
}
}

/// Returns the selected writer for write operations
fn writer(&mut self) -> &mut dyn Write {
match &mut self.inner {
WriterMode::Buffered(w) => w,
WriterMode::Unbuffered(w) => w,
}
}
}

impl<W: Write> Write for TrackedWrite<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bytes = self.inner.write(buf)?;
let bytes = self.writer().write(buf)?;
self.bytes_written += bytes;
Ok(bytes)
}

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
let bytes = self.inner.write_vectored(bufs)?;
let bytes = self.writer().write_vectored(bufs)?;
self.bytes_written += bytes;
Ok(bytes)
}

fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
self.inner.write_all(buf)?;
self.writer().write_all(buf)?;
self.bytes_written += buf.len();

Ok(())
}

fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
self.writer().flush()
}
}

Expand Down Expand Up @@ -185,7 +217,11 @@ impl<W: Write> Debug for SerializedFileWriter<W> {
impl<W: Write + Send> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
let mut buf = TrackedWrite::new(buf);
let mut buf = if properties.internal_buffer_enabled() {
TrackedWrite::new(buf)
} else {
TrackedWrite::new_unbuffered(buf)
};

let schema_descriptor = SchemaDescriptor::new(schema.clone());

Expand Down
Loading