Skip to content

Commit

Permalink
Fix bug for dmfile is remove unexpectly (release-7.0) (#7130)
Browse files Browse the repository at this point in the history
close #7128
  • Loading branch information
JaySon-Huang authored Mar 22, 2023
1 parent ce52266 commit 348d424
Show file tree
Hide file tree
Showing 21 changed files with 284 additions and 59 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ namespace DB
M(invalid_mpp_version) \
M(force_fail_in_flush_region_data) \
M(force_use_dmfile_format_v3) \
M(force_set_mocked_s3_object_mtime)

M(force_set_mocked_s3_object_mtime) \
M(force_stop_background_checkpoint_upload)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ void DAGStorageInterpreter::prepare()

// Do learner read
const DAGContext & dag_context = *context.getDAGContext();
if (dag_context.isBatchCop() || dag_context.isMPPTask())
if (dag_context.isBatchCop() || dag_context.isMPPTask() || dag_context.is_disaggregated_task)
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/Disaggregated/WNEstablishDisaggTaskHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,9 @@ void WNEstablishDisaggTaskHandler::execute(disaggregated::EstablishDisaggTaskRes
}

using DM::Remote::Serializer;
for (const auto & [table_id, table_tasks] : snap->tableSnapshots())
{
response->add_tables(Serializer::serializeTo(table_tasks, task_id).SerializeAsString());
}
snap->iterateTableSnapshots([&](const DM::Remote::DisaggPhysicalTableReadSnapshotPtr & snap) {
response->add_tables(Serializer::serializeTo(snap, task_id).SerializeAsString());
});
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ TrackedMppDataPacketPtr ToCompressedPacket(
assert(uncompressed_source);
for ([[maybe_unused]] const auto & chunk : uncompressed_source->getPacket().chunks())
{
assert(chunk.empty());
assert(!chunk.empty());
assert(static_cast<CompressionMethodByte>(chunk[0]) == CompressionMethodByte::NONE);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,11 +1495,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
// So, stop threads explicitly before `TiFlashTestEnv::shutdown()`.
DB::DM::SegmentReaderPoolManager::instance().stop();
FileCache::shutdown();
global_context->shutdown();
if (storage_config.s3_config.isS3Enabled())
{
S3::ClientFactory::instance().shutdown();
}
global_context->shutdown();
LOG_DEBUG(log, "Shutted down storages.");
});

Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ class ColumnFileInMemory : public ColumnFile
explicit ColumnFileInMemory(const ColumnFileSchemaPtr & schema_, const CachePtr & cache_ = nullptr)
: schema(schema_)
, cache(cache_ ? cache_ : std::make_shared<Cache>(schema_->getSchema()))
{}
{
rows = cache->block.rows();
bytes = cache->block.bytes();
}

Type getType() const override { return Type::INMEMORY_FILE; }

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class DeltaMergeStore : private boost::noncopyable
bool ingestSegmentDataIntoSegmentUsingSplit(
DMContext & dm_context,
const SegmentPtr & segment,
const RowKeyRange & ingest_range,
const SegmentPtr & segment_to_ingest);

void ingestSegmentsFromCheckpointInfo(const DMContextPtr & dm_context,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ std::vector<SegmentPtr> DeltaMergeStore::ingestSegmentsUsingSplit(
segment->simpleInfo(),
segment_ingest_range.toDebugString());

const bool succeeded = ingestSegmentDataIntoSegmentUsingSplit(*dm_context, segment, target_segments[segment_idx]);
const bool succeeded = ingestSegmentDataIntoSegmentUsingSplit(*dm_context, segment, segment_ingest_range, target_segments[segment_idx]);
if (succeeded)
{
updated_segments.insert(segment);
Expand Down Expand Up @@ -904,10 +904,10 @@ std::vector<SegmentPtr> DeltaMergeStore::ingestSegmentsUsingSplit(
bool DeltaMergeStore::ingestSegmentDataIntoSegmentUsingSplit(
DMContext & dm_context,
const SegmentPtr & segment,
const RowKeyRange & ingest_range,
const SegmentPtr & segment_to_ingest)
{
const auto & segment_range = segment->getRowKeyRange();
const auto & ingest_range = segment_to_ingest->getRowKeyRange();

// The ingest_range must fall in segment's range.
RUNTIME_CHECK(
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ SegmentPagesFetchTask DisaggReadSnapshot::popSegTask(TableID physical_table_id,
return task;
}

void DisaggReadSnapshot::iterateTableSnapshots(std::function<void(const DisaggPhysicalTableReadSnapshotPtr &)> fn) const
{
std::shared_lock read_lock(mtx);
for (const auto & [_, table_snapshot] : table_snapshots)
fn(table_snapshot);
}

bool DisaggReadSnapshot::empty() const
{
std::shared_lock read_lock(mtx);
for (const auto & tbl : table_snapshots)
{
if (!tbl.second->empty())
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,25 @@ class DisaggReadSnapshot
DisaggReadSnapshot() = default;

// Add read tasks for a physical table.
// This function is not thread safe
void addTask(TableID physical_table_id, DisaggPhysicalTableReadSnapshotPtr && task)
{
if (!task)
return;
std::unique_lock lock(mtx);
table_snapshots.emplace(physical_table_id, std::move(task));
}

// Pop one segment task for reading
SegmentPagesFetchTask popSegTask(TableID physical_table_id, UInt64 segment_id);

void iterateTableSnapshots(std::function<void(const DisaggPhysicalTableReadSnapshotPtr &)>) const;

bool empty() const;
const TableSnapshotMap & tableSnapshots() const { return table_snapshots; }

DISALLOW_COPY(DisaggReadSnapshot);

private:
mutable std::mutex mtx;
mutable std::shared_mutex mtx;
TableSnapshotMap table_snapshots;
};

Expand All @@ -105,7 +106,11 @@ class DisaggPhysicalTableReadSnapshot

SegmentReadTaskPtr popTask(UInt64 segment_id);

ALWAYS_INLINE bool empty() const { return tasks.empty(); }
ALWAYS_INLINE bool empty() const
{
std::shared_lock read_lock(mtx);
return tasks.empty();
}

DISALLOW_COPY(DisaggPhysicalTableReadSnapshot);

Expand All @@ -118,7 +123,7 @@ class DisaggPhysicalTableReadSnapshot
std::shared_ptr<std::vector<tipb::FieldType>> output_field_types;

private:
mutable std::mutex mtx;
mutable std::shared_mutex mtx;
// segment_id -> SegmentReadTaskPtr
std::unordered_map<UInt64, SegmentReadTaskPtr> tasks;
};
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ message ColumnFileInMemory {
repeated bytes block_columns = 2;

uint64 rows = 3;
// uint64 bytes = 4;
}

message ColumnFileTiny {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ Block RNRemoteSegmentThreadInputStream::readImpl(FilterPtr & res_filter, bool re
while (!cur_stream)
{
watch.restart();
auto task = read_tasks->nextReadyTask();
cur_read_task = read_tasks->nextReadyTask();
seconds_pop += watch.elapsedSeconds();
watch.restart();
if (!task)
if (!cur_read_task)
{
// There is no task left or error happen
done = true;
Expand All @@ -127,17 +127,17 @@ Block RNRemoteSegmentThreadInputStream::readImpl(FilterPtr & res_filter, bool re
}

// Note that the segment task could come from different physical tables
cur_segment_id = task->segment_id;
physical_table_id = task->table_id;
cur_segment_id = cur_read_task->segment_id;
physical_table_id = cur_read_task->table_id;
UNUSED(read_mode); // TODO: support more read mode
cur_stream = task->getInputStream(
cur_stream = cur_read_task->getInputStream(
columns_to_read,
task->getReadRanges(),
cur_read_task->getReadRanges(),
max_version,
filter,
expected_block_size);
seconds_build += watch.elapsedSeconds();
LOG_TRACE(log, "Read blocks from remote segment begin, segment={} state={}", cur_segment_id, magic_enum::enum_name(task->state));
LOG_TRACE(log, "Read blocks from remote segment begin, segment={} state={}", cur_segment_id, magic_enum::enum_name(cur_read_task->state));
}

Block res = cur_stream->read(res_filter, return_filter);
Expand All @@ -146,6 +146,7 @@ Block RNRemoteSegmentThreadInputStream::readImpl(FilterPtr & res_filter, bool re
LOG_TRACE(log, "Read blocks from remote segment end, segment={}", cur_segment_id);
cur_segment_id = 0;
cur_stream = {};
cur_read_task = nullptr;
// try read from next task
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class RNRemoteSegmentThreadInputStream : public IProfilingBlockInputStream
bool done = false;

BlockInputStreamPtr cur_stream;
RNRemoteSegmentReadTaskPtr cur_read_task; // When reading from cur_stream we need cur_read_task is alive.

UInt64 cur_segment_id;

LoggerPtr log;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace DB::DM::Remote
RemotePb::RemotePhysicalTable
Serializer::serializeTo(const DisaggPhysicalTableReadSnapshotPtr & snap, const DisaggTaskId & task_id)
{
std::shared_lock read_lock(snap->mtx);
RemotePb::RemotePhysicalTable remote_table;
remote_table.set_snapshot_id(task_id.toMeta().SerializeAsString());
remote_table.set_table_id(snap->physical_table_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace DB
namespace FailPoints
{
extern const char force_use_dmfile_format_v3[];
extern const char force_stop_background_checkpoint_upload[];
} // namespace FailPoints
namespace DM
{
Expand All @@ -62,6 +63,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic
void SetUp() override
{
FailPointHelper::enableFailPoint(FailPoints::force_use_dmfile_format_v3);
FailPointHelper::enableFailPoint(FailPoints::force_stop_background_checkpoint_upload);
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client));
TiFlashStorageTestBasic::SetUp();
Expand Down Expand Up @@ -95,6 +97,7 @@ class DeltaMergeStoreTestFastAddPeer : public DB::base::TiFlashStorageTestBasic
void TearDown() override
{
FailPointHelper::disableFailPoint(FailPoints::force_use_dmfile_format_v3);
FailPointHelper::disableFailPoint(FailPoints::force_stop_background_checkpoint_upload);
auto & global_context = TiFlashTestEnv::getGlobalContext();
if (!already_initialize_data_store)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Poco/File.h>
Expand All @@ -32,6 +33,10 @@

namespace DB
{
namespace FailPoints
{
extern const char force_stop_background_checkpoint_upload[];
}

UniversalPageStorageService::UniversalPageStorageService(Context & global_context_)
: global_context(global_context_)
Expand Down Expand Up @@ -103,6 +108,12 @@ bool UniversalPageStorageService::uploadCheckpoint()
if (!is_checkpoint_uploading.compare_exchange_strong(v, true))
return false;

fiu_do_on(FailPoints::force_stop_background_checkpoint_upload, {
// Disable background upload checkpoint process in unit tests
LOG_WARNING(log, "!!!force disable UniversalPageStorageService::uploadCheckpoint!!!");
return false;
});

SCOPE_EXIT({
bool is_running = true;
is_checkpoint_uploading.compare_exchange_strong(is_running, false);
Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Storages/S3/MockS3Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/GetObjectResult.h>
#include <aws/s3/model/GetObjectTaggingRequest.h>
#include <aws/s3/model/GetObjectTaggingResult.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Request.h>
Expand Down Expand Up @@ -142,12 +143,26 @@ Model::CopyObjectOutcome MockS3Client::CopyObject(const Model::CopyObjectRequest
Model::GetObjectTaggingOutcome MockS3Client::GetObjectTagging(const Model::GetObjectTaggingRequest & request) const
{
std::lock_guard lock(mtx);
auto itr = storage_tagging.find(request.GetBucket());
if (itr == storage_tagging.end())
bool object_exist = false;
{
return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuckBucket");
auto itr = storage.find(request.GetBucket());
if (itr == storage.end())
{
return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuckBucket");
}
object_exist = itr->second.count(normalizedKey(request.GetKey())) > 0;
}

auto object_tagging_iter = storage_tagging[request.GetBucket()].find(normalizedKey(request.GetKey()));
RUNTIME_CHECK_MSG(object_exist, "try to get tagging of non-exist object, bucket={} key={}", request.GetBucket(), request.GetKey());

// object exist but tag not exist, consider it as empty
if (object_tagging_iter == storage_tagging[request.GetBucket()].end())
{
return Model::GetObjectTaggingResult{};
}
auto taggings = storage_tagging[request.GetBucket()][normalizedKey(request.GetKey())];

auto taggings = object_tagging_iter->second;
auto pos = taggings.find('=');
RUNTIME_CHECK(pos != String::npos, taggings, pos, taggings.size());
Aws::S3::Model::Tag tag;
Expand Down
Loading

0 comments on commit 348d424

Please sign in to comment.