2020#include " arrow/result.h"
2121#include " arrow/status.h"
2222#include " iceberg/common/error.h"
23+ #include " iceberg/common/logger.h"
2324#include " iceberg/common/threadpool.h"
2425#include " iceberg/experimental_representations.h"
2526#include " iceberg/filter/representation/value.h"
@@ -685,13 +686,14 @@ std::optional<ManifestEntry> AllEntriesStream::ReadNext() {
685686arrow::Result<ScanMetadata> GetScanMetadataMultiThreaded (std::shared_ptr<arrow::fs::FileSystem> fs,
686687 std::shared_ptr<TableMetadataV2> table_metadata,
687688 bool use_reader_schema, uint32_t threads_num,
688- const ManifestEntryDeserializerConfig& config);
689+ const ManifestEntryDeserializerConfig& config,
690+ std::shared_ptr<ILogger> logger);
689691
690692arrow::Result<ScanMetadata> GetScanMetadata (std::shared_ptr<arrow::fs::FileSystem> fs,
691693 const std::string& metadata_location,
692694 std::function<bool (iceberg::Schema& schema)> use_avro_reader_schema,
693695 std::shared_ptr<filter::StatsFilter> stats_filter, uint32_t threads_num,
694- const GetScanMetadataConfig& config) {
696+ const GetScanMetadataConfig& config, std::shared_ptr<ILogger> logger ) {
695697 auto data = ValueSafe (ReadFile (fs, metadata_location));
696698 std::shared_ptr<TableMetadataV2> table_metadata = ReadTableMetadataV2 (data);
697699 if (!table_metadata) {
@@ -705,35 +707,91 @@ arrow::Result<ScanMetadata> GetScanMetadata(std::shared_ptr<arrow::fs::FileSyste
705707 auto entries_stream =
706708 AllEntriesStream::Make (fs, table_metadata, use_avro_reader_schema (*table_metadata->GetCurrentSchema ()),
707709 stats_filter, config.manifest_entry_deserializer_config );
708- return GetScanMetadata (*entries_stream, *table_metadata);
710+ return GetScanMetadata (*entries_stream, *table_metadata, logger );
709711 }
710712 return GetScanMetadataMultiThreaded (fs, table_metadata, use_avro_reader_schema (*table_metadata->GetCurrentSchema ()),
711- threads_num, config.manifest_entry_deserializer_config );
713+ threads_num, config.manifest_entry_deserializer_config , logger );
712714}
713715
714716class ScanMetadataBuilder {
715717 public:
716- explicit ScanMetadataBuilder (const TableMetadataV2& table_metadata)
717- : table_metadata_(table_metadata), schema_(table_metadata_.GetCurrentSchema()) {}
718+ explicit ScanMetadataBuilder (const TableMetadataV2& table_metadata, std::shared_ptr<ILogger> logger )
719+ : table_metadata_(table_metadata), schema_(table_metadata_.GetCurrentSchema()), logger_(std::move(logger)) {}
718720
719721 ScanMetadata GetResult () {
720722 ScanMetadata result;
721723 result.schema = table_metadata_.GetCurrentSchema ();
722724
723- for (auto & [partition_key, partition_map ] : partitions) {
725+ for (auto & [partition_key, layers ] : partitions) {
724726 for (const auto & [seqnum, equality_delete_entries] : global_equality_deletes) {
725- auto & deletes_at_current_layer = partition_map [seqnum].equality_delete_entries_ ;
727+ auto & deletes_at_current_layer = layers [seqnum].equality_delete_entries_ ;
726728 deletes_at_current_layer.insert (deletes_at_current_layer.end (), equality_delete_entries.begin (),
727729 equality_delete_entries.end ());
728730 }
729731 }
730732
731- for (auto & [partition_key, partition_map ] : partitions) {
733+ for (auto & [partition_key, layers ] : partitions) {
732734 ScanMetadata::Partition partition;
733- for (auto & [seqnum, layer] : partition_map) {
734- partition.emplace_back (std::move (layer));
735+
736+ std::optional<std::string> min_data_path;
737+ std::optional<std::string> max_data_path;
738+
739+ // to remove dangling positional delete file, we need to make sure that there are no data files in the range
740+ // [min_referenced_file, max_referenced_file]. Delete files in layer X are applied to data files in layers greater
741+ // than or equal to X. To find all dangling deletes in one pass, we start from max layer
742+ for (auto it = layers.rbegin (); it != layers.rend (); ++it) {
743+ auto & [seqno, layer] = *it;
744+
745+ for (const auto & data_entry : layer.data_entries_ ) {
746+ if (!min_data_path.has_value () || *min_data_path > data_entry.path ) {
747+ min_data_path = data_entry.path ;
748+ }
749+ if (!max_data_path.has_value () || *max_data_path < data_entry.path ) {
750+ max_data_path = data_entry.path ;
751+ }
752+ }
753+
754+ ScanMetadata::Layer result_layer;
755+ result_layer.data_entries_ = std::move (layer.data_entries_ );
756+ result_layer.equality_delete_entries_ = std::move (result_layer.equality_delete_entries_ );
757+
758+ int64_t dangling_positional_delete_files = 0 ;
759+ for (const auto & pos_delete : layer.positional_delete_entries_ ) {
760+ bool has_stats =
761+ pos_delete.min_max_referenced_path_ .has_value () && min_data_path.has_value () && max_data_path.has_value ();
762+ if (has_stats) {
763+ const auto & [min_referenced_path, max_referenced_path] = *pos_delete.min_max_referenced_path_ ;
764+ if (*min_data_path > max_referenced_path || *max_data_path < min_referenced_path) {
765+ ++dangling_positional_delete_files;
766+ continue ;
767+ }
768+ }
769+ result_layer.positional_delete_entries_ .emplace_back (std::move (pos_delete.positional_delete_ .path ));
770+ }
771+
772+ if (logger_) {
773+ auto log_if_not_null = [logger = this ->logger_ ](int64_t value, const std::string& name) {
774+ if (value > 0 ) {
775+ logger->Log (std::to_string (value), name);
776+ }
777+ };
778+
779+ log_if_not_null (dangling_positional_delete_files, " metrics:plan:dangling_positional_files" );
780+ log_if_not_null (result_layer.data_entries_ .size (), " metrics:plan:data_files" );
781+ log_if_not_null (result_layer.positional_delete_entries_ .size (), " metrics:plan:positional_files" );
782+ log_if_not_null (result_layer.equality_delete_entries_ .size (), " metrics:plan:equality_files" );
783+ }
784+
785+ if (!result_layer.data_entries_ .empty () || !result_layer.positional_delete_entries_ .empty () ||
786+ !result_layer.equality_delete_entries_ .empty ()) {
787+ partition.emplace_back (std::move (result_layer));
788+ }
789+ }
790+
791+ std::reverse (partition.begin (), partition.end ());
792+ if (min_data_path.has_value ()) {
793+ result.partitions .emplace_back (std::move (partition));
735794 }
736- result.partitions .emplace_back (std::move (partition));
737795 }
738796
739797 return result;
@@ -779,7 +837,19 @@ class ScanMetadataBuilder {
779837 // - There is no deletion vector that must be applied to the data file (when added, such a vector must
780838 // contain
781839 // all deletes from existing position delete files)
782- AddPositionDeletes (serialized_partition_key, sequence_number, entry.data_file .file_path );
840+ std::optional<std::pair<std::string, std::string>> min_max_referenced_path;
841+ constexpr uint32_t kFilePathId = 2147483546 ;
842+ if (entry.data_file .lower_bounds .contains (kFilePathId ) && entry.data_file .upper_bounds .contains (kFilePathId )) {
843+ const std::vector<uint8_t >& min_bytes = entry.data_file .lower_bounds .at (kFilePathId );
844+ const std::vector<uint8_t >& max_bytes = entry.data_file .upper_bounds .at (kFilePathId );
845+
846+ std::string min_path (min_bytes.begin (), min_bytes.end ());
847+ std::string max_path (max_bytes.begin (), max_bytes.end ());
848+
849+ min_max_referenced_path.emplace (std::move (min_path), std::move (max_path));
850+ }
851+ AddPositionDeletes (serialized_partition_key, sequence_number, entry.data_file .file_path ,
852+ min_max_referenced_path);
783853 break ;
784854 }
785855
@@ -807,8 +877,10 @@ class ScanMetadataBuilder {
807877 }
808878
809879 virtual void AddPositionDeletes (const std::string& serialized_partition_key, SequenceNumber sequence_number,
810- const std::string& path) {
811- partitions[serialized_partition_key][sequence_number].positional_delete_entries_ .emplace_back (path);
880+ const std::string& path,
881+ const std::optional<std::pair<std::string, std::string>>& min_max_referenced_path) {
882+ partitions[serialized_partition_key][sequence_number].positional_delete_entries_ .emplace_back (
883+ path, min_max_referenced_path);
812884 }
813885
814886 virtual void AddGlobalEqualityDeletes (SequenceNumber sequence_number, const std::string& path,
@@ -850,15 +922,36 @@ class ScanMetadataBuilder {
850922 const TableMetadataV2& table_metadata_;
851923 std::shared_ptr<const iceberg::Schema> schema_;
852924
853- std::map<std::string, std::map<SequenceNumber, ScanMetadata::Layer>> partitions;
925+ struct PositionalDeleteWithExtraInfo {
926+ PositionalDeleteInfo positional_delete_;
927+ std::optional<std::pair<std::string, std::string>> min_max_referenced_path_;
928+
929+ PositionalDeleteWithExtraInfo (std::string path,
930+ std::optional<std::pair<std::string, std::string>> min_max_referenced_path)
931+ : positional_delete_(std::move(path)), min_max_referenced_path_(std::move(min_max_referenced_path)) {}
932+ };
933+
934+ struct LayerWithExtraInfo {
935+ std::vector<DataEntry> data_entries_;
936+ std::vector<PositionalDeleteWithExtraInfo> positional_delete_entries_;
937+ std::vector<EqualityDeleteInfo> equality_delete_entries_;
938+
939+ bool operator ==(const LayerWithExtraInfo& layer) const = default ;
940+
941+ bool Empty () const ;
942+ };
943+
944+ std::map<std::string, std::map<SequenceNumber, LayerWithExtraInfo>> partitions;
854945 // if there are k partitions and t global equality delete entries, k * t entries will be created
855946 // TODO(gmusya): improve
856947 std::map<SequenceNumber, std::vector<EqualityDeleteInfo>> global_equality_deletes;
948+ std::shared_ptr<ILogger> logger_;
857949};
858950
859951class ScanMetadataBuilderMT : public ScanMetadataBuilder {
860952 public:
861- explicit ScanMetadataBuilderMT (const TableMetadataV2& table_metadata) : ScanMetadataBuilder(table_metadata) {}
953+ explicit ScanMetadataBuilderMT (const TableMetadataV2& table_metadata, std::shared_ptr<ILogger> logger)
954+ : ScanMetadataBuilder(table_metadata, logger) {}
862955
863956 private:
864957 void AddDataFile (const std::string& serialized_partition_key, SequenceNumber sequence_number, const std::string& path,
@@ -868,9 +961,10 @@ class ScanMetadataBuilderMT : public ScanMetadataBuilder {
868961 }
869962
870963 void AddPositionDeletes (const std::string& serialized_partition_key, SequenceNumber sequence_number,
871- const std::string& path) override {
964+ const std::string& path,
965+ const std::optional<std::pair<std::string, std::string>>& min_max_referenced_path) override {
872966 std::lock_guard<std::mutex> guard (mutex_);
873- ScanMetadataBuilder::AddPositionDeletes (serialized_partition_key, sequence_number, path);
967+ ScanMetadataBuilder::AddPositionDeletes (serialized_partition_key, sequence_number, path, min_max_referenced_path );
874968 }
875969
876970 void AddGlobalEqualityDeletes (SequenceNumber sequence_number, const std::string& path,
@@ -892,8 +986,8 @@ class ScanMetadataBuilderMT : public ScanMetadataBuilder {
892986
893987class ReferencedDataFileAwareScanPlanner {
894988 public:
895- ReferencedDataFileAwareScanPlanner (const TableMetadataV2& table_metadata)
896- : builder_(std::make_shared<ScanMetadataBuilder>(table_metadata)) {}
989+ ReferencedDataFileAwareScanPlanner (const TableMetadataV2& table_metadata, std::shared_ptr<ILogger> logger )
990+ : builder_(std::make_shared<ScanMetadataBuilder>(table_metadata, std::move(logger) )) {}
897991
898992 arrow::Status AddEntry (const iceberg::ManifestEntry& entry) {
899993 if (entry.status == iceberg::ManifestEntry::Status::kDeleted ) {
@@ -1050,8 +1144,8 @@ class ReferencedDataFileAwareScanPlanner {
10501144
10511145class ReferencedDataFileAwareScanPlannerMT : public ReferencedDataFileAwareScanPlanner {
10521146 public:
1053- explicit ReferencedDataFileAwareScanPlannerMT (const TableMetadataV2& table_metadata)
1054- : ReferencedDataFileAwareScanPlanner(std::make_shared<ScanMetadataBuilderMT>(table_metadata)) {}
1147+ explicit ReferencedDataFileAwareScanPlannerMT (const TableMetadataV2& table_metadata, std::shared_ptr<ILogger> logger )
1148+ : ReferencedDataFileAwareScanPlanner(std::make_shared<ScanMetadataBuilderMT>(table_metadata, logger )) {}
10551149
10561150 private:
10571151 void SetHasEqualityDelete () override {
@@ -1074,9 +1168,9 @@ class ReferencedDataFileAwareScanPlannerMT : public ReferencedDataFileAwareScanP
10741168 std::mutex mutex_;
10751169};
10761170
1077- arrow::Result<ScanMetadata> GetScanMetadata (IcebergEntriesStream& entries_stream,
1078- const TableMetadataV2& table_metadata ) {
1079- ReferencedDataFileAwareScanPlanner scan_metadata_builder (table_metadata);
1171+ arrow::Result<ScanMetadata> GetScanMetadata (IcebergEntriesStream& entries_stream, const TableMetadataV2& table_metadata,
1172+ std::shared_ptr<ILogger> logger ) {
1173+ ReferencedDataFileAwareScanPlanner scan_metadata_builder (table_metadata, logger );
10801174
10811175 while (true ) {
10821176 std::optional<ManifestEntry> maybe_entry = entries_stream.ReadNext ();
@@ -1142,11 +1236,12 @@ class ManifestFileTask {
11421236arrow::Result<ScanMetadata> GetScanMetadataMultiThreaded (std::shared_ptr<arrow::fs::FileSystem> fs,
11431237 std::shared_ptr<TableMetadataV2> table_metadata,
11441238 bool use_reader_schema, uint32_t threads_num,
1145- const ManifestEntryDeserializerConfig& config) {
1239+ const ManifestEntryDeserializerConfig& config,
1240+ std::shared_ptr<ILogger> logger) {
11461241 const std::string manifest_list_path = table_metadata->GetCurrentManifestListPathOrFail ();
11471242 auto manifest_metadatas = GetManifestFiles (fs, manifest_list_path);
11481243
1149- auto scan_metadata_builder = std::make_shared<ReferencedDataFileAwareScanPlannerMT>(*table_metadata);
1244+ auto scan_metadata_builder = std::make_shared<ReferencedDataFileAwareScanPlannerMT>(*table_metadata, logger );
11501245
11511246 auto pool = std::make_shared<ThreadPool>(threads_num);
11521247 std::vector<std::future<std::vector<std::future<arrow::Status>>>> futures;
0 commit comments