diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index f83b1357026..3c01991fd34 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -97,8 +97,8 @@ namespace DB M(invalid_mpp_version) \ M(force_fail_in_flush_region_data) \ M(force_use_dmfile_format_v3) \ - M(force_set_mocked_s3_object_mtime) - + M(force_set_mocked_s3_object_mtime) \ + M(force_stop_background_checkpoint_upload) #define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \ M(pause_with_alter_locks_acquired) \ diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 07e62424847..e4c41364bf5 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -378,7 +378,7 @@ void DAGStorageInterpreter::prepare() // Do learner read const DAGContext & dag_context = *context.getDAGContext(); - if (dag_context.isBatchCop() || dag_context.isMPPTask()) + if (dag_context.isBatchCop() || dag_context.isMPPTask() || dag_context.is_disaggregated_task) learner_read_snapshot = doBatchCopLearnerRead(); else learner_read_snapshot = doCopLearnerRead(); diff --git a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp index 5d1670bc230..4cf7f879b54 100644 --- a/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp +++ b/dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp @@ -101,10 +101,9 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes } using DM::Remote::Serializer; - for (const auto & [table_id, table_tasks] : snap->tableSnapshots()) - { - response->add_tables(Serializer::serializeTo(table_tasks, task_id).SerializeAsString()); - } + snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) { + response->add_tables(Serializer::serializeTo(snap, task_id).SerializeAsString()); + }); } } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp index b5f47b7bc78..5499a9cad68 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp @@ -175,7 +175,7 @@ TrackedMppDataPacketPtr ToCompressedPacket( assert(uncompressed_source); for ([[maybe_unused]] const auto & chunk : uncompressed_source->getPacket().chunks()) { - assert(chunk.empty()); + assert(!chunk.empty()); assert(static_cast(chunk[0]) == CompressionMethodByte::NONE); } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 67daacea5cf..1a435308970 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1495,11 +1495,11 @@ int Server::main(const std::vector & /*args*/) // So, stop threads explicitly before `TiFlashTestEnv::shutdown()`. DB::DM::SegmentReaderPoolManager::instance().stop(); FileCache::shutdown(); + global_context->shutdown(); if (storage_config.s3_config.isS3Enabled()) { S3::ClientFactory::instance().shutdown(); } - global_context->shutdown(); LOG_DEBUG(log, "Shutted down storages."); }); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 9d50850b843..4d40da30d85 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -55,7 +55,10 @@ class ColumnFileInMemory : public ColumnFile explicit ColumnFileInMemory(const ColumnFileSchemaPtr & schema_, const CachePtr & cache_ = nullptr) : schema(schema_) , cache(cache_ ? cache_ : std::make_shared(schema_->getSchema())) - {} + { + rows = cache->block.rows(); + bytes = cache->block.bytes(); + } Type getType() const override { return Type::INMEMORY_FILE; } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 28b71c2024c..d7c1e659159 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -310,6 +310,7 @@ class DeltaMergeStore : private boost::noncopyable bool ingestSegmentDataIntoSegmentUsingSplit( DMContext & dm_context, const SegmentPtr & segment, + const RowKeyRange & ingest_range, const SegmentPtr & segment_to_ingest); void ingestSegmentsFromCheckpointInfo(const DMContextPtr & dm_context, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index 0a27138ef29..6cef65f1e05 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -875,7 +875,7 @@ std::vector DeltaMergeStore::ingestSegmentsUsingSplit( segment->simpleInfo(), segment_ingest_range.toDebugString()); - const bool succeeded = ingestSegmentDataIntoSegmentUsingSplit(*dm_context, segment, target_segments[segment_idx]); + const bool succeeded = ingestSegmentDataIntoSegmentUsingSplit(*dm_context, segment, segment_ingest_range, target_segments[segment_idx]); if (succeeded) { updated_segments.insert(segment); @@ -904,10 +904,10 @@ std::vector DeltaMergeStore::ingestSegmentsUsingSplit( bool DeltaMergeStore::ingestSegmentDataIntoSegmentUsingSplit( DMContext & dm_context, const SegmentPtr & segment, + const RowKeyRange & ingest_range, const SegmentPtr & segment_to_ingest) { const auto & segment_range = segment->getRowKeyRange(); - const auto & ingest_range = segment_to_ingest->getRowKeyRange(); // The ingest_range must fall in segment's range. RUNTIME_CHECK( diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp index 3904e915d4d..20dd65e4e85 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp @@ -51,8 +51,16 @@ SegmentPagesFetchTask DisaggReadSnapshot::popSegTask(TableID physical_table_id, return task; } +void DisaggReadSnapshot::iterateTableSnapshots(std::function fn) const +{ + std::shared_lock read_lock(mtx); + for (const auto & [_, table_snapshot] : table_snapshots) + fn(table_snapshot); +} + bool DisaggReadSnapshot::empty() const { + std::shared_lock read_lock(mtx); for (const auto & tbl : table_snapshots) { if (!tbl.second->empty()) diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index ebc61d24634..4c57499b4ca 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -74,24 +74,25 @@ class DisaggReadSnapshot DisaggReadSnapshot() = default; // Add read tasks for a physical table. - // This function is not thread safe void addTask(TableID physical_table_id, DisaggPhysicalTableReadSnapshotPtr && task) { if (!task) return; + std::unique_lock lock(mtx); table_snapshots.emplace(physical_table_id, std::move(task)); } // Pop one segment task for reading SegmentPagesFetchTask popSegTask(TableID physical_table_id, UInt64 segment_id); + void iterateTableSnapshots(std::function) const; + bool empty() const; - const TableSnapshotMap & tableSnapshots() const { return table_snapshots; } DISALLOW_COPY(DisaggReadSnapshot); private: - mutable std::mutex mtx; + mutable std::shared_mutex mtx; TableSnapshotMap table_snapshots; }; @@ -105,7 +106,11 @@ class DisaggPhysicalTableReadSnapshot SegmentReadTaskPtr popTask(UInt64 segment_id); - ALWAYS_INLINE bool empty() const { return tasks.empty(); } + ALWAYS_INLINE bool empty() const + { + std::shared_lock read_lock(mtx); + return tasks.empty(); + } DISALLOW_COPY(DisaggPhysicalTableReadSnapshot); @@ -118,7 +123,7 @@ class DisaggPhysicalTableReadSnapshot std::shared_ptr> output_field_types; private: - mutable std::mutex mtx; + mutable std::shared_mutex mtx; // segment_id -> SegmentReadTaskPtr std::unordered_map tasks; }; diff --git a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto index f622b2f08c9..c6cd3e3801b 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto +++ b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto @@ -66,7 +66,6 @@ message ColumnFileInMemory { repeated bytes block_columns = 2; uint64 rows = 3; - // uint64 bytes = 4; } message ColumnFileTiny { diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.cpp index 9d694d510a2..e5b8b832f17 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.cpp @@ -111,10 +111,10 @@ Block RNRemoteSegmentThreadInputStream::readImpl(FilterPtr & res_filter, bool re while (!cur_stream) { watch.restart(); - auto task = read_tasks->nextReadyTask(); + cur_read_task = read_tasks->nextReadyTask(); seconds_pop += watch.elapsedSeconds(); watch.restart(); - if (!task) + if (!cur_read_task) { // There is no task left or error happen done = true; @@ -127,17 +127,17 @@ Block RNRemoteSegmentThreadInputStream::readImpl(FilterPtr & res_filter, bool re } // Note that the segment task could come from different physical tables - cur_segment_id = task->segment_id; - physical_table_id = task->table_id; + cur_segment_id = cur_read_task->segment_id; + physical_table_id = cur_read_task->table_id; UNUSED(read_mode); // TODO: support more read mode - cur_stream = task->getInputStream( + cur_stream = cur_read_task->getInputStream( columns_to_read, - task->getReadRanges(), + cur_read_task->getReadRanges(), max_version, filter, expected_block_size); seconds_build += watch.elapsedSeconds(); - LOG_TRACE(log, "Read blocks from remote segment begin, segment={} state={}", cur_segment_id, magic_enum::enum_name(task->state)); + LOG_TRACE(log, "Read blocks from remote segment begin, segment={} state={}", cur_segment_id, magic_enum::enum_name(cur_read_task->state)); } Block res = cur_stream->read(res_filter, return_filter); @@ -146,6 +146,7 @@ Block RNRemoteSegmentThreadInputStream::readImpl(FilterPtr & res_filter, bool re LOG_TRACE(log, "Read blocks from remote segment end, segment={}", cur_segment_id); cur_segment_id = 0; cur_stream = {}; + cur_read_task = nullptr; // try read from next task continue; } diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.h index 81c5811ee2c..281b702a516 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNRemoteSegmentThreadInputStream.h @@ -102,6 +102,8 @@ class RNRemoteSegmentThreadInputStream : public IProfilingBlockInputStream bool done = false; BlockInputStreamPtr cur_stream; + RNRemoteSegmentReadTaskPtr cur_read_task; // When reading from cur_stream we need cur_read_task is alive. + UInt64 cur_segment_id; LoggerPtr log; diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 492cfcb3424..58fb84afaf5 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -46,6 +46,7 @@ namespace DB::DM::Remote RemotePb::RemotePhysicalTable Serializer::serializeTo(const DisaggPhysicalTableReadSnapshotPtr & snap, const DisaggTaskId & task_id) { + std::shared_lock read_lock(snap->mtx); RemotePb::RemotePhysicalTable remote_table; remote_table.set_snapshot_id(task_id.toMeta().SerializeAsString()); remote_table.set_table_id(snap->physical_table_id); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index e9fcff84318..800ee362e71 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -45,6 +45,7 @@ namespace DB namespace FailPoints { extern const char force_use_dmfile_format_v3[]; +extern const char force_stop_background_checkpoint_upload[]; } // namespace FailPoints namespace DM { @@ -62,6 +63,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic void SetUp() override { FailPointHelper::enableFailPoint(FailPoints::force_use_dmfile_format_v3); + FailPointHelper::enableFailPoint(FailPoints::force_stop_background_checkpoint_upload); auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); TiFlashStorageTestBasic::SetUp(); @@ -95,6 +97,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic void TearDown() override { FailPointHelper::disableFailPoint(FailPoints::force_use_dmfile_format_v3); + FailPointHelper::disableFailPoint(FailPoints::force_stop_background_checkpoint_upload); auto & global_context = TiFlashTestEnv::getGlobalContext(); if (!already_initialize_data_store) { diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp index 87590b10edd..32eececb826 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorageService.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -32,6 +33,10 @@ namespace DB { +namespace FailPoints +{ +extern const char force_stop_background_checkpoint_upload[]; +} UniversalPageStorageService::UniversalPageStorageService(Context & global_context_) : global_context(global_context_) @@ -103,6 +108,12 @@ bool UniversalPageStorageService::uploadCheckpoint() if (!is_checkpoint_uploading.compare_exchange_strong(v, true)) return false; + fiu_do_on(FailPoints::force_stop_background_checkpoint_upload, { + // Disable background upload checkpoint process in unit tests + LOG_WARNING(log, "!!!force disable UniversalPageStorageService::uploadCheckpoint!!!"); + return false; + }); + SCOPE_EXIT({ bool is_running = true; is_checkpoint_uploading.compare_exchange_strong(is_running, false); diff --git a/dbms/src/Storages/S3/MockS3Client.cpp b/dbms/src/Storages/S3/MockS3Client.cpp index 56f1e40b856..ebd5c1d3b37 100644 --- a/dbms/src/Storages/S3/MockS3Client.cpp +++ b/dbms/src/Storages/S3/MockS3Client.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -142,12 +143,26 @@ Model::CopyObjectOutcome MockS3Client::CopyObject(const Model::CopyObjectRequest Model::GetObjectTaggingOutcome MockS3Client::GetObjectTagging(const Model::GetObjectTaggingRequest & request) const { std::lock_guard lock(mtx); - auto itr = storage_tagging.find(request.GetBucket()); - if (itr == storage_tagging.end()) + bool object_exist = false; { - return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuckBucket"); + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) + { + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuckBucket"); + } + object_exist = itr->second.count(normalizedKey(request.GetKey())) > 0; + } + + auto object_tagging_iter = storage_tagging[request.GetBucket()].find(normalizedKey(request.GetKey())); + RUNTIME_CHECK_MSG(object_exist, "try to get tagging of non-exist object, bucket={} key={}", request.GetBucket(), request.GetKey()); + + // object exist but tag not exist, consider it as empty + if (object_tagging_iter == storage_tagging[request.GetBucket()].end()) + { + return Model::GetObjectTaggingResult{}; } - auto taggings = storage_tagging[request.GetBucket()][normalizedKey(request.GetKey())]; + + auto taggings = object_tagging_iter->second; auto pos = taggings.find('='); RUNTIME_CHECK(pos != String::npos, taggings, pos, taggings.size()); Aws::S3::Model::Tag tag; diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index eca40faffc4..cc264e432ef 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -182,6 +182,7 @@ bool ClientFactory::isEnabled() const void ClientFactory::init(const StorageS3Config & config_, bool mock_s3_) { config = config_; + RUNTIME_CHECK(!config.root.starts_with("//"), config.root); config.root = normalizedRoot(config.root); Aws::InitAPI(aws_options); Aws::Utils::Logging::InitializeAWSLogging(std::make_shared()); @@ -379,12 +380,18 @@ void ensureLifecycleRuleExist(const TiFlashS3Client & client, Int32 expire_days) { bool lifecycle_rule_has_been_set = false; Aws::Vector old_rules; - { + do { Aws::S3::Model::GetBucketLifecycleConfigurationRequest req; req.SetBucket(client.bucket()); auto outcome = client.GetBucketLifecycleConfiguration(req); if (!outcome.IsSuccess()) { + const auto & error = outcome.GetError(); + // The life cycle is not added at all + if (error.GetExceptionName() == "NoSuchLifecycleConfiguration") + { + break; + } throw fromS3Error(outcome.GetError(), "GetBucketLifecycle fail"); } @@ -413,7 +420,7 @@ void ensureLifecycleRuleExist(const TiFlashS3Client & client, Int32 expire_days) break; } } - } + } while (false); if (lifecycle_rule_has_been_set) { @@ -471,11 +478,13 @@ void listPrefix( { Stopwatch sw; Aws::S3::Model::ListObjectsV2Request req; - req.WithBucket(client.bucket()).WithPrefix(client.root() + prefix); + bool is_root_single_slash = client.root() == "/"; + // If the `root == '/'`, don't prepend the root to the prefix, otherwise S3 list doesn't work. + req.WithBucket(client.bucket()).WithPrefix(is_root_single_slash ? prefix : client.root() + prefix); // If the `root == '/'`, then the return result will cut it off // else we need to cut the root in the following codes - bool need_cut = client.root() != "/"; + bool need_cut = !is_root_single_slash; size_t cut_size = client.root().size(); bool done = false; @@ -534,7 +543,9 @@ void listPrefixWithDelimiter( { Stopwatch sw; Aws::S3::Model::ListObjectsV2Request req; - req.WithBucket(client.bucket()).WithPrefix(client.root() + prefix); + bool is_root_single_slash = client.root() == "/"; + // If the `root == '/'`, don't prepend the root to the prefix, otherwise S3 list doesn't work. + req.WithBucket(client.bucket()).WithPrefix(is_root_single_slash ? prefix : client.root() + prefix); if (!delimiter.empty()) { req.SetDelimiter(String(delimiter)); @@ -542,7 +553,7 @@ void listPrefixWithDelimiter( // If the `root == '/'`, then the return result will cut it off // else we need to cut the root in the following codes - bool need_cut = client.root() != "/"; + bool need_cut = !is_root_single_slash; size_t cut_size = client.root().size(); bool done = false; diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index a58ccbafa7c..0dc15c7bcda 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -70,7 +70,9 @@ class TiFlashS3Client : public Aws::S3::S3Client template void setBucketAndKeyWithRoot(Request & req, const String & key) const { - req.WithBucket(bucket_name).WithKey(key_root + key); + bool is_root_single_slash = key_root == "/"; + // If the `root == '/'`, don't prepend the root to the prefix, otherwise S3 list doesn't work. + req.WithBucket(bucket_name).WithKey(is_root_single_slash ? key : key_root + key); } private: diff --git a/dbms/src/Storages/S3/S3GCManager.cpp b/dbms/src/Storages/S3/S3GCManager.cpp index 47a76308e8a..21542da8ac3 100644 --- a/dbms/src/Storages/S3/S3GCManager.cpp +++ b/dbms/src/Storages/S3/S3GCManager.cpp @@ -423,26 +423,29 @@ void S3GCManager::lifecycleMarkDataFileDeleted(const String & datafile_key) assert(config.method == S3GCMethod::Lifecycle); auto view = S3FilenameView::fromKey(datafile_key); + RUNTIME_CHECK(view.isDataFile(), magic_enum::enum_name(view.type), datafile_key); auto client = S3::ClientFactory::instance().sharedTiFlashClient(); + auto sub_logger = log->getChild(fmt::format("remove_key={}", datafile_key)); if (!view.isDMFile()) { // CheckpointDataFile is a single object, add tagging for it and update its mtime rewriteObjectWithTagging(*client, datafile_key, String(TaggingObjectIsDeleted)); - LOG_INFO(log, "datafile deleted by lifecycle tagging, key={}", datafile_key); + LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging", datafile_key); } else { // DMFile is composed by multiple objects, need extra work to remove all of them. - // Rewrite all objects with tagging belong to this DMFile + // Rewrite all objects with tagging belong to this DMFile. Note "/" is need for + // scanning only the sub objects of given key of this DMFile // TODO: If GCManager unexpectedly exit in the middle, it will leave some broken // sub file for DMFile, try clean them later. - S3::listPrefix(*client, datafile_key, [this, &client, &datafile_key](const Aws::S3::Model::Object & object) { + S3::listPrefix(*client, datafile_key + "/", [&client, &datafile_key, &sub_logger](const Aws::S3::Model::Object & object) { const auto & sub_key = object.GetKey(); rewriteObjectWithTagging(*client, sub_key, String(TaggingObjectIsDeleted)); - LOG_INFO(log, "datafile deleted by lifecycle tagging, key={} sub_key={}", datafile_key, sub_key); + LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging, sub_key={}", datafile_key, sub_key); return PageResult{.num_keys = 1, .more = true}; }); - LOG_INFO(log, "datafile deleted by lifecycle tagging, all sub keys are deleted, key={}", datafile_key); + LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging, all sub keys are deleted", datafile_key); } } @@ -451,26 +454,29 @@ void S3GCManager::physicalRemoveDataFile(const String & datafile_key) assert(config.method == S3GCMethod::ScanThenDelete); auto view = S3FilenameView::fromKey(datafile_key); + RUNTIME_CHECK(view.isDataFile(), magic_enum::enum_name(view.type), datafile_key); auto client = S3::ClientFactory::instance().sharedTiFlashClient(); + auto sub_logger = log->getChild(fmt::format("remove_key={}", datafile_key)); if (!view.isDMFile()) { // CheckpointDataFile is a single object, remove it. deleteObject(*client, datafile_key); - LOG_INFO(log, "datafile deleted, key={}", datafile_key); + LOG_INFO(sub_logger, "datafile deleted, key={}", datafile_key); } else { // DMFile is composed by multiple objects, need extra work to remove all of them. - // Remove all objects belong to this DMFile + // Remove all objects belong to this DMFile. Note suffix "/" is need for scanning + // only the sub objects of given key of this DMFile. // TODO: If GCManager unexpectedly exit in the middle, it will leave some broken // sub file for DMFile, try clean them later. - S3::listPrefix(*client, datafile_key, [this, &client, &datafile_key](const Aws::S3::Model::Object & object) { + S3::listPrefix(*client, datafile_key + "/", [&client, &datafile_key, &sub_logger](const Aws::S3::Model::Object & object) { const auto & sub_key = object.GetKey(); deleteObject(*client, sub_key); - LOG_INFO(log, "datafile deleted, key={} sub_key={}", datafile_key, sub_key); + LOG_INFO(sub_logger, "datafile deleted, sub_key={}", datafile_key, sub_key); return PageResult{.num_keys = 1, .more = true}; }); - LOG_INFO(log, "datafile deleted, all sub keys are deleted, key={}", datafile_key); + LOG_INFO(sub_logger, "datafile deleted, all sub keys are deleted", datafile_key); } } diff --git a/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp b/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp index a8244b7e7e8..1390842bb8e 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp @@ -185,33 +185,98 @@ try CATCH -TEST_F(S3GCManagerByScanDeleteTest, RemoveDataFile) +TEST_F(S3GCManagerByScanDeleteTest, RemoveCheckpointData) try { auto timepoint = Aws::Utils::DateTime("2023-02-01T08:00:00Z", Aws::Utils::DateFormat::ISO_8601); + StoreID store_id = 100; + UInt64 upload_seq = 99; + UInt64 file_idx = 5; + + // test for checkpoint data remove { - uploadEmptyFile(*mock_s3_client, "datafile_key"); - uploadEmptyFile(*mock_s3_client, "datafile_key.del"); + const auto cp_data = S3Filename::newCheckpointData(store_id, upload_seq, file_idx); + const auto df_key = cp_data.toFullKey(); + const auto delmark_key = cp_data.toView().getDelMarkKey(); + uploadEmptyFile(*mock_s3_client, df_key); + uploadEmptyFile(*mock_s3_client, delmark_key); // delmark expired auto delmark_mtime = timepoint - std::chrono::milliseconds(3601 * 1000); - gc_mgr->removeDataFileIfDelmarkExpired("datafile_key", "datafile_key.del", timepoint, delmark_mtime); + gc_mgr->removeDataFileIfDelmarkExpired(df_key, delmark_key, timepoint, delmark_mtime); // removed - ASSERT_FALSE(S3::objectExists(*mock_s3_client, "datafile_key")); - ASSERT_FALSE(S3::objectExists(*mock_s3_client, "datafile_key.del")); + ASSERT_FALSE(S3::objectExists(*mock_s3_client, df_key)); + ASSERT_FALSE(S3::objectExists(*mock_s3_client, delmark_key)); } { - uploadEmptyFile(*mock_s3_client, "datafile_key"); - uploadEmptyFile(*mock_s3_client, "datafile_key.del"); + const auto cp_data = S3Filename::newCheckpointData(store_id, upload_seq, file_idx); + const auto df_key = cp_data.toFullKey(); + const auto delmark_key = cp_data.toView().getDelMarkKey(); + uploadEmptyFile(*mock_s3_client, df_key); + uploadEmptyFile(*mock_s3_client, delmark_key); // delmark not expired auto delmark_mtime = timepoint - std::chrono::milliseconds(3599 * 1000); - gc_mgr->removeDataFileIfDelmarkExpired("datafile_key", "datafile_key.del", timepoint, delmark_mtime); + gc_mgr->removeDataFileIfDelmarkExpired(df_key, delmark_key, timepoint, delmark_mtime); + + // not removed + ASSERT_TRUE(S3::objectExists(*mock_s3_client, df_key)); + ASSERT_TRUE(S3::objectExists(*mock_s3_client, delmark_key)); + } +} +CATCH + +TEST_F(S3GCManagerByScanDeleteTest, RemoveDMFile) +try +{ + auto timepoint = Aws::Utils::DateTime("2023-02-01T08:00:00Z", Aws::Utils::DateFormat::ISO_8601); + // test for dmfile remove + StoreID store_id = 100; + TableID table_id = 1000; + UInt64 file_id2 = 2; + UInt64 file_id27 = 27; + { + const auto cp_dmf2 = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = table_id, .file_id = file_id2}); + const auto df2_key = cp_dmf2.toFullKey(); + const auto delmark_key = cp_dmf2.toView().getDelMarkKey(); + uploadEmptyFile(*mock_s3_client, df2_key + "/meta"); + uploadEmptyFile(*mock_s3_client, delmark_key); + + const auto cp_dmf27 = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = table_id, .file_id = file_id27}); + const auto df27_key = cp_dmf27.toFullKey(); + uploadEmptyFile(*mock_s3_client, df27_key + "/meta"); + + // delmark expired + auto delmark_mtime = timepoint - std::chrono::milliseconds(3601 * 1000); + gc_mgr->removeDataFileIfDelmarkExpired(df2_key, delmark_key, timepoint, delmark_mtime); // removed - ASSERT_TRUE(S3::objectExists(*mock_s3_client, "datafile_key")); - ASSERT_TRUE(S3::objectExists(*mock_s3_client, "datafile_key.del")); + ASSERT_FALSE(S3::objectExists(*mock_s3_client, df2_key + "/meta")); + ASSERT_FALSE(S3::objectExists(*mock_s3_client, delmark_key)); + // dmf_27 is not removed + ASSERT_TRUE(S3::objectExists(*mock_s3_client, df27_key + "/meta")); + } + { + const auto cp_dmf2 = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = table_id, .file_id = file_id2}); + const auto df2_key = cp_dmf2.toFullKey(); + const auto delmark_key = cp_dmf2.toView().getDelMarkKey(); + uploadEmptyFile(*mock_s3_client, df2_key + "/meta"); + uploadEmptyFile(*mock_s3_client, delmark_key); + + const auto cp_dmf27 = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = table_id, .file_id = file_id27}); + const auto df27_key = cp_dmf27.toFullKey(); + uploadEmptyFile(*mock_s3_client, df27_key + "/meta"); + + // delmark not expired + auto delmark_mtime = timepoint - std::chrono::milliseconds(3599 * 1000); + gc_mgr->removeDataFileIfDelmarkExpired(df2_key, delmark_key, timepoint, delmark_mtime); + + // not removed + ASSERT_TRUE(S3::objectExists(*mock_s3_client, df2_key + "/meta")); + ASSERT_TRUE(S3::objectExists(*mock_s3_client, delmark_key)); + // dmf_27 is not removed + ASSERT_TRUE(S3::objectExists(*mock_s3_client, df27_key + "/meta")); } } CATCH @@ -297,7 +362,7 @@ try CATCH -TEST_F(S3GCManagerTest, RemoveLock) +TEST_F(S3GCManagerTest, RemoveLockOfCheckpointData) try { StoreID store_id = 20; @@ -324,7 +389,7 @@ try gc_mgr->cleanOneLock(lock_key, lock_view, timepoint); - // lock is deleted and delmark is created, object is rewrite with tagging + // lock is deleted, delmark is created, object is rewrite with tagging ASSERT_FALSE(S3::objectExists(*mock_s3_client, lock_key)); ASSERT_TRUE(S3::objectExists(*mock_s3_client, delmark_key)); ASSERT_TRUE(S3::objectExists(*mock_s3_client, df.toFullKey())); @@ -357,6 +422,99 @@ try } CATCH +TEST_F(S3GCManagerTest, RemoveLockOfDMFile) +try +{ + StoreID store_id = 20; + TableID table_id = 1000; + UInt64 file_id2 = 2; + UInt64 file_id27 = 27; + + const auto cp_dmf2 = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = table_id, .file_id = file_id2}); + const auto dmf2_key = cp_dmf2.toFullKey(); + auto lock_key = cp_dmf2.toView().getLockKey(store_id, 400); + auto lock_view = S3FilenameView::fromKey(lock_key); + auto delmark_key = cp_dmf2.toView().getDelMarkKey(); + + const auto cp_dmf27 = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = table_id, .file_id = file_id27}); + const auto dmf27_key = cp_dmf27.toFullKey(); + + auto timepoint = Aws::Utils::DateTime("2023-02-01T08:00:00Z", Aws::Utils::DateFormat::ISO_8601); + auto clear_bucket = [&] { + DB::tests::TiFlashTestEnv::deleteBucket(*mock_s3_client); + DB::tests::TiFlashTestEnv::createBucketIfNotExist(*mock_s3_client); + }; + + { + clear_bucket(); + // delmark not exist, and no more lockfile + S3::uploadEmptyFile(*mock_s3_client, dmf2_key + "/meta"); + S3::uploadEmptyFile(*mock_s3_client, dmf27_key + "/meta"); + S3::uploadEmptyFile(*mock_s3_client, lock_key); + + ASSERT_FALSE(S3::objectExists(*mock_s3_client, delmark_key)); + + gc_mgr->cleanOneLock(lock_key, lock_view, timepoint); + + // lock is deleted, delmark is created, object is rewrite with tagging + ASSERT_FALSE(S3::objectExists(*mock_s3_client, lock_key)); + ASSERT_TRUE(S3::objectExists(*mock_s3_client, delmark_key)); + ASSERT_TRUE(S3::objectExists(*mock_s3_client, dmf2_key + "/meta")); + ASSERT_TRUE(S3::objectExists(*mock_s3_client, dmf27_key + "/meta")); + + // dmf2 is rewritten + { + auto req = Aws::S3::Model::GetObjectTaggingRequest(); // + mock_s3_client->setBucketAndKeyWithRoot(req, dmf2_key + "/meta"); + const auto res = mock_s3_client->GetObjectTagging(req); + + auto tags = res.GetResult().GetTagSet(); + ASSERT_EQ(tags.size(), 1); + EXPECT_EQ(tags[0].GetKey(), "tiflash_deleted"); + EXPECT_EQ(tags[0].GetValue(), "true"); + } + + // dmf27 is not rewritten + { + auto req = Aws::S3::Model::GetObjectTaggingRequest(); // + mock_s3_client->setBucketAndKeyWithRoot(req, dmf27_key + "/meta"); + const auto res = mock_s3_client->GetObjectTagging(req); + ASSERT_TRUE(res.GetResult().GetTagSet().empty()); + } + } + { + clear_bucket(); + // delmark not exist, but still locked by another lockfile + S3::uploadEmptyFile(*mock_s3_client, dmf2_key + "/meta"); + S3::uploadEmptyFile(*mock_s3_client, dmf27_key + "/meta"); + S3::uploadEmptyFile(*mock_s3_client, lock_key); + // another lock + auto another_lock_key = cp_dmf2.toView().getLockKey(store_id + 1, 450); + S3::uploadEmptyFile(*mock_s3_client, another_lock_key); + gc_mgr->cleanOneLock(lock_key, lock_view, timepoint); + + // lock is deleted but delmark is not created + ASSERT_FALSE(S3::objectExists(*mock_s3_client, lock_key)); + ASSERT_FALSE(S3::objectExists(*mock_s3_client, delmark_key)); + ASSERT_TRUE(S3::objectExists(*mock_s3_client, another_lock_key)); + // dmf2 is not rewritten + { + auto req = Aws::S3::Model::GetObjectTaggingRequest(); // + mock_s3_client->setBucketAndKeyWithRoot(req, dmf2_key + "/meta"); + const auto res = mock_s3_client->GetObjectTagging(req); + ASSERT_TRUE(res.GetResult().GetTagSet().empty()); + } + // dmf27 is not rewritten + { + auto req = Aws::S3::Model::GetObjectTaggingRequest(); // + mock_s3_client->setBucketAndKeyWithRoot(req, dmf27_key + "/meta"); + const auto res = mock_s3_client->GetObjectTagging(req); + ASSERT_TRUE(res.GetResult().GetTagSet().empty()); + } + } +} +CATCH + TEST_F(S3GCManagerTest, ScanLocks) try {