@@ -21,13 +21,17 @@ use std::sync::Arc;
21
21
22
22
use anyhow:: { anyhow, Context } ;
23
23
use async_trait:: async_trait;
24
- use iceberg:: arrow:: schema_to_arrow_schema;
24
+ use iceberg:: arrow:: { arrow_schema_to_schema , schema_to_arrow_schema} ;
25
25
use iceberg:: spec:: { DataFile , SerializedDataFile } ;
26
26
use iceberg:: table:: Table ;
27
27
use iceberg:: transaction:: Transaction ;
28
28
use iceberg:: writer:: base_writer:: data_file_writer:: DataFileWriterBuilder ;
29
- use iceberg:: writer:: base_writer:: equality_delete_writer:: EqualityDeleteFileWriterBuilder ;
30
- use iceberg:: writer:: base_writer:: sort_position_delete_writer:: SortPositionDeleteWriterBuilder ;
29
+ use iceberg:: writer:: base_writer:: equality_delete_writer:: {
30
+ EqualityDeleteFileWriterBuilder , EqualityDeleteWriterConfig ,
31
+ } ;
32
+ use iceberg:: writer:: base_writer:: sort_position_delete_writer:: {
33
+ SortPositionDeleteWriterBuilder , POSITION_DELETE_SCHEMA ,
34
+ } ;
31
35
use iceberg:: writer:: file_writer:: location_generator:: {
32
36
DefaultFileNameGenerator , DefaultLocationGenerator ,
33
37
} ;
@@ -540,6 +544,7 @@ impl IcebergSinkWriter {
540
544
541
545
let parquet_writer_builder = ParquetWriterBuilder :: new (
542
546
WriterProperties :: new ( ) ,
547
+ schema. clone ( ) ,
543
548
table. file_io ( ) . clone ( ) ,
544
549
DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) )
545
550
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
@@ -549,8 +554,7 @@ impl IcebergSinkWriter {
549
554
iceberg:: spec:: DataFileFormat :: Parquet ,
550
555
) ,
551
556
) ;
552
- let data_file_builder =
553
- DataFileWriterBuilder :: new ( schema. clone ( ) , parquet_writer_builder, None ) ;
557
+ let data_file_builder = DataFileWriterBuilder :: new ( parquet_writer_builder, None ) ;
554
558
if let Some ( _extra_partition_col_idx) = extra_partition_col_idx {
555
559
Err ( SinkError :: Iceberg ( anyhow ! (
556
560
"Extra partition column is not supported in append-only mode"
@@ -586,8 +590,12 @@ impl IcebergSinkWriter {
586
590
} )
587
591
} else {
588
592
let partition_builder = MonitoredGeneralWriterBuilder :: new (
589
- FanoutPartitionWriterBuilder :: new ( data_file_builder, partition_spec. clone ( ) )
590
- . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
593
+ FanoutPartitionWriterBuilder :: new (
594
+ data_file_builder,
595
+ partition_spec. clone ( ) ,
596
+ schema. clone ( ) ,
597
+ )
598
+ . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
591
599
write_qps. clone ( ) ,
592
600
write_latency. clone ( ) ,
593
601
) ;
@@ -662,6 +670,7 @@ impl IcebergSinkWriter {
662
670
let data_file_builder = {
663
671
let parquet_writer_builder = ParquetWriterBuilder :: new (
664
672
WriterProperties :: new ( ) ,
673
+ schema. clone ( ) ,
665
674
table. file_io ( ) . clone ( ) ,
666
675
DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) )
667
676
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
@@ -671,11 +680,12 @@ impl IcebergSinkWriter {
671
680
iceberg:: spec:: DataFileFormat :: Parquet ,
672
681
) ,
673
682
) ;
674
- DataFileWriterBuilder :: new ( schema . clone ( ) , parquet_writer_builder. clone ( ) , None )
683
+ DataFileWriterBuilder :: new ( parquet_writer_builder. clone ( ) , None )
675
684
} ;
676
685
let position_delete_builder = {
677
686
let parquet_writer_builder = ParquetWriterBuilder :: new (
678
687
WriterProperties :: new ( ) ,
688
+ POSITION_DELETE_SCHEMA . clone ( ) ,
679
689
table. file_io ( ) . clone ( ) ,
680
690
DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) )
681
691
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
@@ -691,8 +701,18 @@ impl IcebergSinkWriter {
691
701
)
692
702
} ;
693
703
let equality_delete_builder = {
704
+ let config = EqualityDeleteWriterConfig :: new (
705
+ unique_column_ids. clone ( ) ,
706
+ table. metadata ( ) . current_schema ( ) . clone ( ) ,
707
+ None ,
708
+ )
709
+ . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?;
694
710
let parquet_writer_builder = ParquetWriterBuilder :: new (
695
711
WriterProperties :: new ( ) ,
712
+ Arc :: new (
713
+ arrow_schema_to_schema ( config. projected_arrow_schema_ref ( ) )
714
+ . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
715
+ ) ,
696
716
table. file_io ( ) . clone ( ) ,
697
717
DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) )
698
718
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
@@ -703,13 +723,7 @@ impl IcebergSinkWriter {
703
723
) ,
704
724
) ;
705
725
706
- EqualityDeleteFileWriterBuilder :: new (
707
- parquet_writer_builder. clone ( ) ,
708
- unique_column_ids. clone ( ) ,
709
- table. metadata ( ) . current_schema ( ) . clone ( ) ,
710
- None ,
711
- )
712
- . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?
726
+ EqualityDeleteFileWriterBuilder :: new ( parquet_writer_builder. clone ( ) , config)
713
727
} ;
714
728
let delta_builder = EqualityDeltaWriterBuilder :: new (
715
729
data_file_builder,
@@ -762,19 +776,6 @@ impl IcebergSinkWriter {
762
776
} ,
763
777
} )
764
778
} else {
765
- let partition_builder = MonitoredGeneralWriterBuilder :: new (
766
- FanoutPartitionWriterBuilder :: new ( delta_builder, partition_spec. clone ( ) )
767
- . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
768
- write_qps. clone ( ) ,
769
- write_latency. clone ( ) ,
770
- ) ;
771
- let inner_writer = Some ( Box :: new (
772
- partition_builder
773
- . clone ( )
774
- . build ( )
775
- . await
776
- . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
777
- ) as Box < dyn IcebergWriter > ) ;
778
779
let original_arrow_schema = Arc :: new (
779
780
schema_to_arrow_schema ( table. metadata ( ) . current_schema ( ) )
780
781
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
@@ -788,6 +789,23 @@ impl IcebergSinkWriter {
788
789
) ) ) ;
789
790
Arc :: new ( ArrowSchema :: new ( new_fields) )
790
791
} ;
792
+ let partition_builder = MonitoredGeneralWriterBuilder :: new (
793
+ FanoutPartitionWriterBuilder :: new_with_custom_schema (
794
+ delta_builder,
795
+ schema_with_extra_op_column. clone ( ) ,
796
+ partition_spec. clone ( ) ,
797
+ table. metadata ( ) . current_schema ( ) . clone ( ) ,
798
+ ) ,
799
+ write_qps. clone ( ) ,
800
+ write_latency. clone ( ) ,
801
+ ) ;
802
+ let inner_writer = Some ( Box :: new (
803
+ partition_builder
804
+ . clone ( )
805
+ . build ( )
806
+ . await
807
+ . map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?,
808
+ ) as Box < dyn IcebergWriter > ) ;
791
809
Ok ( Self {
792
810
arrow_schema : original_arrow_schema,
793
811
metrics : IcebergWriterMetrics {
@@ -1020,11 +1038,7 @@ impl SinkWriter for IcebergSinkWriter {
1020
1038
match close_result {
1021
1039
Some ( Ok ( result) ) => {
1022
1040
let version = self . table . metadata ( ) . format_version ( ) as u8 ;
1023
- let partition_type = self
1024
- . table
1025
- . metadata ( )
1026
- . default_partition_spec ( )
1027
- . partition_type ( ) ;
1041
+ let partition_type = self . table . metadata ( ) . default_partition_type ( ) ;
1028
1042
let data_files = result
1029
1043
. into_iter ( )
1030
1044
. map ( |f| {
@@ -1216,17 +1230,17 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
1216
1230
write_results[ 0 ] . partition_spec_id
1217
1231
) ) ) ;
1218
1232
} ;
1219
- let bound_partition_spec = partition_spec
1233
+ let partition_type = partition_spec
1220
1234
. as_ref ( )
1221
1235
. clone ( )
1222
- . bind ( schema. clone ( ) )
1236
+ . partition_type ( schema)
1223
1237
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) ) ?;
1224
1238
1225
1239
let data_files = write_results
1226
1240
. into_iter ( )
1227
1241
. flat_map ( |r| {
1228
1242
r. data_files . into_iter ( ) . map ( |f| {
1229
- f. try_into ( bound_partition_spec . partition_type ( ) , schema)
1243
+ f. try_into ( & partition_type, schema)
1230
1244
. map_err ( |err| SinkError :: Iceberg ( anyhow ! ( err) ) )
1231
1245
} )
1232
1246
} )
0 commit comments