diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 603db6660f45..49a299b859a9 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -62,6 +62,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = Some(64); pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false; /// Default values for [`WriterProperties::coerce_types`] pub const DEFAULT_COERCE_TYPES: bool = false; +/// Default values for [`WriterProperties::internal_buffer_enabled`] +pub const DEFAULT_INTERNAL_BUFFER_ENABLED: bool = true; /// Parquet writer version. /// @@ -169,6 +171,7 @@ pub struct WriterProperties { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + internal_buffer_enabled: bool, #[cfg(feature = "encryption")] pub(crate) file_encryption_properties: Option, } @@ -335,6 +338,13 @@ impl WriterProperties { self.coerce_types } + /// Returns `true` if internal buffer is enabled. + /// + /// For more details see [`WriterPropertiesBuilder::set_internal_buffer_enabled`] + pub fn internal_buffer_enabled(&self) -> bool { + self.internal_buffer_enabled + } + /// Returns encoding for a data page, when dictionary encoding is enabled. /// /// This is not configurable. @@ -458,6 +468,7 @@ pub struct WriterPropertiesBuilder { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + internal_buffer_enabled: bool, #[cfg(feature = "encryption")] file_encryption_properties: Option, } @@ -481,6 +492,7 @@ impl Default for WriterPropertiesBuilder { column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, coerce_types: DEFAULT_COERCE_TYPES, + internal_buffer_enabled: DEFAULT_INTERNAL_BUFFER_ENABLED, #[cfg(feature = "encryption")] file_encryption_properties: None, } @@ -506,6 +518,7 @@ impl WriterPropertiesBuilder { column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, coerce_types: self.coerce_types, + internal_buffer_enabled: self.internal_buffer_enabled, #[cfg(feature = "encryption")] file_encryption_properties: self.file_encryption_properties, } @@ -700,6 +713,21 @@ impl WriterPropertiesBuilder { self } + /// Enables an internal buffer to improve small write operations. + /// + /// If `true` (default), small writes will be collected into an internal + /// buffer before calling the underlying `Write::write` method. This is + /// essential for maintaining good performance when the underlying writer + /// is an I/O-sensitive sink (like a network socket or raw file handle). + /// + /// Buffering is often redundant and can be slightly less performant when + /// the underlying writer is already buffered (e.g., writing to a `Vec` + /// or a `std::io::BufWriter`). In such cases, this option can be set to `false`. + pub fn set_internal_buffer_enabled(mut self, internal_buffer_enabled: bool) -> Self { + self.internal_buffer_enabled = internal_buffer_enabled; + self + } + /// Sets FileEncryptionProperties (defaults to `None`) #[cfg(feature = "encryption")] pub fn with_file_encryption_properties( @@ -958,6 +986,7 @@ impl From for WriterPropertiesBuilder { sorting_columns: props.sorting_columns, column_index_truncate_length: props.column_index_truncate_length, statistics_truncate_length: props.statistics_truncate_length, + internal_buffer_enabled: props.internal_buffer_enabled, coerce_types: props.coerce_types, #[cfg(feature = "encryption")] file_encryption_properties: props.file_encryption_properties, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index fa72b060ea84..22e88b79a275 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -45,20 +45,35 @@ use crate::file::PARQUET_MAGIC_ENCR_FOOTER; use crate::file::{metadata::*, PARQUET_MAGIC}; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; +/// WriterMode for the [`TrackedWrite`] +enum WriterMode { + Buffered(BufWriter), + Unbuffered(W), +} + /// A wrapper around a [`Write`] that keeps track of the number -/// of bytes that have been written. The given [`Write`] is wrapped -/// with a [`BufWriter`] to optimize writing performance. +/// of bytes that have been written. In buffered mode the given +/// [`Write`] is wrapped with a [`BufWriter`] to optimize writing +/// performance for I/O-sensitive sinks. pub struct TrackedWrite { - inner: BufWriter, + inner: WriterMode, bytes_written: usize, } impl TrackedWrite { - /// Create a new [`TrackedWrite`] from a [`Write`] + /// Create a new [`TrackedWrite`] from a [`Write`] with buffer pub fn new(inner: W) -> Self { let buf_write = BufWriter::new(inner); Self { - inner: buf_write, + inner: WriterMode::Buffered(buf_write), + bytes_written: 0, + } + } + + /// Create a new unbuffered [`TrackedWrite`] from a [`Write`] + pub fn new_unbuffered(inner: W) -> Self { + Self { + inner: WriterMode::Unbuffered(inner), bytes_written: 0, } } @@ -70,7 +85,10 @@ impl TrackedWrite { /// Returns a reference to the underlying writer. pub fn inner(&self) -> &W { - self.inner.get_ref() + match &self.inner { + WriterMode::Buffered(w) => w.get_ref(), + WriterMode::Unbuffered(w) => w, + } } /// Returns a mutable reference to the underlying writer. @@ -78,39 +96,53 @@ impl TrackedWrite { /// It is inadvisable to directly write to the underlying writer, doing so /// will likely result in data corruption pub fn inner_mut(&mut self) -> &mut W { - self.inner.get_mut() + match &mut self.inner { + WriterMode::Buffered(w) => w.get_mut(), + WriterMode::Unbuffered(w) => w, + } } /// Returns the underlying writer. pub fn into_inner(self) -> Result { - self.inner.into_inner().map_err(|err| { - ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string())) - }) + match self.inner { + WriterMode::Buffered(w) => w.into_inner().map_err(|err| { + ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string())) + }), + WriterMode::Unbuffered(w) => Ok(w), + } + } + + /// Returns the selected writer for write operations + fn writer(&mut self) -> &mut dyn Write { + match &mut self.inner { + WriterMode::Buffered(w) => w, + WriterMode::Unbuffered(w) => w, + } } } impl Write for TrackedWrite { fn write(&mut self, buf: &[u8]) -> std::io::Result { - let bytes = self.inner.write(buf)?; + let bytes = self.writer().write(buf)?; self.bytes_written += bytes; Ok(bytes) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result { - let bytes = self.inner.write_vectored(bufs)?; + let bytes = self.writer().write_vectored(bufs)?; self.bytes_written += bytes; Ok(bytes) } fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { - self.inner.write_all(buf)?; + self.writer().write_all(buf)?; self.bytes_written += buf.len(); Ok(()) } fn flush(&mut self) -> std::io::Result<()> { - self.inner.flush() + self.writer().flush() } } @@ -185,7 +217,11 @@ impl Debug for SerializedFileWriter { impl SerializedFileWriter { /// Creates new file writer. pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result { - let mut buf = TrackedWrite::new(buf); + let mut buf = if properties.internal_buffer_enabled() { + TrackedWrite::new(buf) + } else { + TrackedWrite::new_unbuffered(buf) + }; let schema_descriptor = SchemaDescriptor::new(schema.clone());