Skip to content

Commit 5dc94f1

Browse files
author
ZENOTME
committed
update iceberg
1 parent 99e74d7 commit 5dc94f1

File tree

4 files changed

+73
-56
lines changed

4 files changed

+73
-56
lines changed

Cargo.lock

+15-15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f
153153
"prometheus",
154154
] }
155155
# branch dev-rebase-main-20241030
156-
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" }
157-
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" }
158-
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" }
156+
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d34ec0c8071dc1ea72147a4a66b44c4065ddaba9" }
157+
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d34ec0c8071dc1ea72147a4a66b44c4065ddaba9" }
158+
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "d34ec0c8071dc1ea72147a4a66b44c4065ddaba9" }
159159
opendal = "0.49"
160160
# used only by arrow-udf-flight
161161
arrow-flight = "53"

src/batch/executors/src/executor/iceberg_scan.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ impl IcebergScanExecutor {
190190
.build();
191191
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
192192

193-
let mut record_batch_stream = reader.read(Box::pin(file_scan_stream))?.enumerate();
193+
let mut record_batch_stream =
194+
reader.read(Box::pin(file_scan_stream)).await?.enumerate();
194195

195196
while let Some((index, record_batch)) = record_batch_stream.next().await {
196197
let record_batch = record_batch?;
@@ -321,7 +322,9 @@ impl PositionDeleteFilter {
321322

322323
let reader = table.reader_builder().with_batch_size(batch_size).build();
323324

324-
let mut record_batch_stream = reader.read(Box::pin(position_delete_file_scan_stream))?;
325+
let mut record_batch_stream = reader
326+
.read(Box::pin(position_delete_file_scan_stream))
327+
.await?;
325328

326329
while let Some(record_batch) = record_batch_stream.next().await {
327330
let record_batch = record_batch?;

src/connector/src/sink/iceberg/mod.rs

+50-36
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ use std::sync::Arc;
2121

2222
use anyhow::{anyhow, Context};
2323
use async_trait::async_trait;
24-
use iceberg::arrow::schema_to_arrow_schema;
24+
use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
2525
use iceberg::spec::{DataFile, SerializedDataFile};
2626
use iceberg::table::Table;
2727
use iceberg::transaction::Transaction;
2828
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
29-
use iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder;
30-
use iceberg::writer::base_writer::sort_position_delete_writer::SortPositionDeleteWriterBuilder;
29+
use iceberg::writer::base_writer::equality_delete_writer::{
30+
EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
31+
};
32+
use iceberg::writer::base_writer::sort_position_delete_writer::{
33+
SortPositionDeleteWriterBuilder, POSITION_DELETE_SCHEMA,
34+
};
3135
use iceberg::writer::file_writer::location_generator::{
3236
DefaultFileNameGenerator, DefaultLocationGenerator,
3337
};
@@ -540,6 +544,7 @@ impl IcebergSinkWriter {
540544

541545
let parquet_writer_builder = ParquetWriterBuilder::new(
542546
WriterProperties::new(),
547+
schema.clone(),
543548
table.file_io().clone(),
544549
DefaultLocationGenerator::new(table.metadata().clone())
545550
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
@@ -549,8 +554,7 @@ impl IcebergSinkWriter {
549554
iceberg::spec::DataFileFormat::Parquet,
550555
),
551556
);
552-
let data_file_builder =
553-
DataFileWriterBuilder::new(schema.clone(), parquet_writer_builder, None);
557+
let data_file_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
554558
if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
555559
Err(SinkError::Iceberg(anyhow!(
556560
"Extra partition column is not supported in append-only mode"
@@ -586,8 +590,12 @@ impl IcebergSinkWriter {
586590
})
587591
} else {
588592
let partition_builder = MonitoredGeneralWriterBuilder::new(
589-
FanoutPartitionWriterBuilder::new(data_file_builder, partition_spec.clone())
590-
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
593+
FanoutPartitionWriterBuilder::new(
594+
data_file_builder,
595+
partition_spec.clone(),
596+
schema.clone(),
597+
)
598+
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
591599
write_qps.clone(),
592600
write_latency.clone(),
593601
);
@@ -662,6 +670,7 @@ impl IcebergSinkWriter {
662670
let data_file_builder = {
663671
let parquet_writer_builder = ParquetWriterBuilder::new(
664672
WriterProperties::new(),
673+
schema.clone(),
665674
table.file_io().clone(),
666675
DefaultLocationGenerator::new(table.metadata().clone())
667676
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
@@ -671,11 +680,12 @@ impl IcebergSinkWriter {
671680
iceberg::spec::DataFileFormat::Parquet,
672681
),
673682
);
674-
DataFileWriterBuilder::new(schema.clone(), parquet_writer_builder.clone(), None)
683+
DataFileWriterBuilder::new(parquet_writer_builder.clone(), None)
675684
};
676685
let position_delete_builder = {
677686
let parquet_writer_builder = ParquetWriterBuilder::new(
678687
WriterProperties::new(),
688+
POSITION_DELETE_SCHEMA.clone(),
679689
table.file_io().clone(),
680690
DefaultLocationGenerator::new(table.metadata().clone())
681691
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
@@ -691,8 +701,18 @@ impl IcebergSinkWriter {
691701
)
692702
};
693703
let equality_delete_builder = {
704+
let config = EqualityDeleteWriterConfig::new(
705+
unique_column_ids.clone(),
706+
table.metadata().current_schema().clone(),
707+
None,
708+
)
709+
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
694710
let parquet_writer_builder = ParquetWriterBuilder::new(
695711
WriterProperties::new(),
712+
Arc::new(
713+
arrow_schema_to_schema(config.projected_arrow_schema_ref())
714+
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
715+
),
696716
table.file_io().clone(),
697717
DefaultLocationGenerator::new(table.metadata().clone())
698718
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
@@ -703,13 +723,7 @@ impl IcebergSinkWriter {
703723
),
704724
);
705725

706-
EqualityDeleteFileWriterBuilder::new(
707-
parquet_writer_builder.clone(),
708-
unique_column_ids.clone(),
709-
table.metadata().current_schema().clone(),
710-
None,
711-
)
712-
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?
726+
EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
713727
};
714728
let delta_builder = EqualityDeltaWriterBuilder::new(
715729
data_file_builder,
@@ -762,19 +776,6 @@ impl IcebergSinkWriter {
762776
},
763777
})
764778
} else {
765-
let partition_builder = MonitoredGeneralWriterBuilder::new(
766-
FanoutPartitionWriterBuilder::new(delta_builder, partition_spec.clone())
767-
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
768-
write_qps.clone(),
769-
write_latency.clone(),
770-
);
771-
let inner_writer = Some(Box::new(
772-
partition_builder
773-
.clone()
774-
.build()
775-
.await
776-
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
777-
) as Box<dyn IcebergWriter>);
778779
let original_arrow_schema = Arc::new(
779780
schema_to_arrow_schema(table.metadata().current_schema())
780781
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
@@ -788,6 +789,23 @@ impl IcebergSinkWriter {
788789
)));
789790
Arc::new(ArrowSchema::new(new_fields))
790791
};
792+
let partition_builder = MonitoredGeneralWriterBuilder::new(
793+
FanoutPartitionWriterBuilder::new_with_custom_schema(
794+
delta_builder,
795+
schema_with_extra_op_column.clone(),
796+
partition_spec.clone(),
797+
table.metadata().current_schema().clone(),
798+
),
799+
write_qps.clone(),
800+
write_latency.clone(),
801+
);
802+
let inner_writer = Some(Box::new(
803+
partition_builder
804+
.clone()
805+
.build()
806+
.await
807+
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
808+
) as Box<dyn IcebergWriter>);
791809
Ok(Self {
792810
arrow_schema: original_arrow_schema,
793811
metrics: IcebergWriterMetrics {
@@ -1020,11 +1038,7 @@ impl SinkWriter for IcebergSinkWriter {
10201038
match close_result {
10211039
Some(Ok(result)) => {
10221040
let version = self.table.metadata().format_version() as u8;
1023-
let partition_type = self
1024-
.table
1025-
.metadata()
1026-
.default_partition_spec()
1027-
.partition_type();
1041+
let partition_type = self.table.metadata().default_partition_type();
10281042
let data_files = result
10291043
.into_iter()
10301044
.map(|f| {
@@ -1216,17 +1230,17 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
12161230
write_results[0].partition_spec_id
12171231
)));
12181232
};
1219-
let bound_partition_spec = partition_spec
1233+
let partition_type = partition_spec
12201234
.as_ref()
12211235
.clone()
1222-
.bind(schema.clone())
1236+
.partition_type(schema)
12231237
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
12241238

12251239
let data_files = write_results
12261240
.into_iter()
12271241
.flat_map(|r| {
12281242
r.data_files.into_iter().map(|f| {
1229-
f.try_into(bound_partition_spec.partition_type(), schema)
1243+
f.try_into(&partition_type, schema)
12301244
.map_err(|err| SinkError::Iceberg(anyhow!(err)))
12311245
})
12321246
})

0 commit comments

Comments
 (0)