@@ -24,10 +24,11 @@ use uuid::Uuid;
2424use crate :: error:: Result ;
2525use crate :: io:: OutputFile ;
2626use crate :: spec:: {
27- DataFile , DataFileFormat , FormatVersion , MAIN_BRANCH , ManifestEntry , ManifestFile ,
28- ManifestListWriter , ManifestWriterBuilder , Operation , PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT ,
29- PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT , Snapshot , SnapshotReference , SnapshotRetention ,
30- SnapshotSummaryCollector , Struct , StructType , Summary , update_snapshot_summaries,
27+ DataContentType , DataFile , DataFileFormat , FormatVersion , MAIN_BRANCH , ManifestContentType ,
28+ ManifestEntry , ManifestFile , ManifestListWriter , ManifestWriterBuilder , Operation ,
29+ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT , PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT ,
30+ Snapshot , SnapshotReference , SnapshotRetention , SnapshotSummaryCollector , Struct , StructType ,
31+ Summary , update_snapshot_summaries,
3132} ;
3233use crate :: transaction:: Transaction ;
3334use crate :: { Error , ErrorKind , TableRequirement , TableUpdate } ;
@@ -66,6 +67,7 @@ pub(crate) struct SnapshotProduceAction<'a> {
6667 commit_uuid : Uuid ,
6768 snapshot_properties : HashMap < String , String > ,
6869 pub added_data_files : Vec < DataFile > ,
70+ pub added_delete_files : Vec < DataFile > ,
6971 // A counter used to generate unique manifest file names.
7072 // It starts from 0 and increments for each new manifest file.
7173 // Note: This counter is limited to the range of (0..u64::MAX).
@@ -86,6 +88,7 @@ impl<'a> SnapshotProduceAction<'a> {
8688 commit_uuid,
8789 snapshot_properties,
8890 added_data_files : vec ! [ ] ,
91+ added_delete_files : vec ! [ ] ,
8992 manifest_counter : ( 0 ..) ,
9093 key_metadata,
9194 } )
@@ -129,12 +132,14 @@ impl<'a> SnapshotProduceAction<'a> {
129132 ) -> Result < & mut Self > {
130133 let data_files: Vec < DataFile > = data_files. into_iter ( ) . collect ( ) ;
131134 for data_file in & data_files {
132- if data_file. content_type ( ) != crate :: spec:: DataContentType :: Data {
133- return Err ( Error :: new (
134- ErrorKind :: DataInvalid ,
135- "Only data content type is allowed for fast append" ,
136- ) ) ;
135+ match data_file. content_type ( ) {
136+ crate :: spec:: DataContentType :: Data => self . added_data_files . push ( data_file. clone ( ) ) ,
137+ crate :: spec:: DataContentType :: PositionDeletes
138+ | crate :: spec:: DataContentType :: EqualityDeletes => {
139+ self . added_delete_files . push ( data_file. clone ( ) )
140+ }
137141 }
142+
138143 // Check if the data file partition spec id matches the table default partition spec id.
139144 if self . tx . current_table . metadata ( ) . default_partition_spec_id ( )
140145 != data_file. partition_spec_id
@@ -149,7 +154,6 @@ impl<'a> SnapshotProduceAction<'a> {
149154 self . tx . current_table . metadata ( ) . default_partition_type ( ) ,
150155 ) ?;
151156 }
152- self . added_data_files . extend ( data_files) ;
153157 Ok ( self )
154158 }
155159
@@ -169,15 +173,41 @@ impl<'a> SnapshotProduceAction<'a> {
169173 }
170174
171175 // Write manifest file for added data files and return the ManifestFile for ManifestList.
172- async fn write_added_manifest ( & mut self ) -> Result < ManifestFile > {
173- let added_data_files = std:: mem:: take ( & mut self . added_data_files ) ;
176+ async fn write_added_manifest (
177+ & mut self ,
178+ added_data_files : Vec < DataFile > ,
179+ ) -> Result < ManifestFile > {
174180 if added_data_files. is_empty ( ) {
175181 return Err ( Error :: new (
176182 ErrorKind :: PreconditionFailed ,
177183 "No added data files found when write a manifest file" ,
178184 ) ) ;
179185 }
180186
187+ let file_count = added_data_files. len ( ) ;
188+ let manifest_content_type = {
189+ let mut data_num = 0 ;
190+ let mut delete_num = 0 ;
191+ for f in & added_data_files {
192+ match f. content_type ( ) {
193+ DataContentType :: Data => data_num += 1 ,
194+ DataContentType :: PositionDeletes | DataContentType :: EqualityDeletes => {
195+ delete_num += 1
196+ }
197+ }
198+ }
199+ if data_num == file_count {
200+ ManifestContentType :: Data
201+ } else if delete_num == file_count {
202+ ManifestContentType :: Deletes
203+ } else {
204+ return Err ( Error :: new (
205+ ErrorKind :: DataInvalid ,
206+ "added DataFile for a ManifestFile should be same type (Data or Delete)" ,
207+ ) ) ;
208+ }
209+ } ;
210+
181211 let snapshot_id = self . snapshot_id ;
182212 let format_version = self . tx . current_table . metadata ( ) . format_version ( ) ;
183213 let manifest_entries = added_data_files. into_iter ( ) . map ( |data_file| {
@@ -207,8 +237,10 @@ impl<'a> SnapshotProduceAction<'a> {
207237 ) ;
208238 if self . tx . current_table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
209239 builder. build_v1 ( )
210- } else {
240+ } else if manifest_content_type == ManifestContentType :: Data {
211241 builder. build_v2_data ( )
242+ } else {
243+ builder. build_v2_deletes ( )
212244 }
213245 } ;
214246 for entry in manifest_entries {
@@ -222,14 +254,21 @@ impl<'a> SnapshotProduceAction<'a> {
222254 snapshot_produce_operation : & OP ,
223255 manifest_process : & MP ,
224256 ) -> Result < Vec < ManifestFile > > {
225- let added_manifest = self . write_added_manifest ( ) . await ?;
226- let existing_manifests = snapshot_produce_operation. existing_manifest ( self ) . await ?;
227- // # TODO
228- // Support process delete entries.
229-
230- let mut manifest_files = vec ! [ added_manifest] ;
231- manifest_files. extend ( existing_manifests) ;
232- let manifest_files = manifest_process. process_manifests ( manifest_files) ;
257+ let mut existing_manifests = snapshot_produce_operation. existing_manifest ( self ) . await ?;
258+
259+ if !self . added_data_files . is_empty ( ) {
260+ let added_data_files = std:: mem:: take ( & mut self . added_data_files ) ;
261+ let added_manifest = self . write_added_manifest ( added_data_files) . await ?;
262+ existing_manifests. push ( added_manifest) ;
263+ }
264+
265+ if !self . added_delete_files . is_empty ( ) {
266+ let added_delete_files = std:: mem:: take ( & mut self . added_delete_files ) ;
267+ let added_manifest = self . write_added_manifest ( added_delete_files) . await ?;
268+ existing_manifests. push ( added_manifest) ;
269+ }
270+
271+ let manifest_files = manifest_process. process_manifests ( existing_manifests) ;
233272 Ok ( manifest_files)
234273 }
235274
0 commit comments