Skip to content

Commit

Permalink
ddl: Fix zero safepoint make schema gc run repeatly (#8361) (#8363)
Browse files Browse the repository at this point in the history
close #8356
  • Loading branch information
ti-chi-bot authored Nov 13, 2023
1 parent ade8b50 commit 922f80c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 22 deletions.
9 changes: 6 additions & 3 deletions dbms/src/TestUtils/FunctionTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,13 @@ ColumnWithTypeAndName executeFunction(
for (size_t i = 0; i < columns.size(); ++i)
argument_column_numbers.push_back(i);

/// Replace `std::random_device` with `std::chrono::system_clock` here to avoid
/// exceptions like 'random_device failed to open /dev/urandom: Operation not permitted'.
/// The reason of exceptions is unknown, but the probability of its occurrence in unittests
/// TestDateTimeDayMonthYear.dayMonthYearTest is not low.
/// Since this function is just used for testing, using current timestamp as a random seed is not a problem.
std::mt19937 g(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
/// shuffle input columns to assure function correctly use physical offsets instead of logical offsets
std::random_device rd;
std::mt19937 g(rd());

std::shuffle(argument_column_numbers.begin(), argument_column_numbers.end(), g);
const auto columns_reordered = toColumnsReordered(columns, argument_column_numbers);

Expand Down
57 changes: 41 additions & 16 deletions dbms/src/TiDB/Schema/SchemaSyncService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <common/logger_useful.h>

#include <optional>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -54,9 +56,10 @@ SchemaSyncService::SchemaSyncService(DB::Context & context_)
void SchemaSyncService::addKeyspaceGCTasks()
{
const auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces();
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);

UInt64 num_add_tasks = 0;
// Add new sync schema task for new keyspace.
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);
for (auto const iter : keyspaces)
{
auto keyspace = iter.first;
Expand All @@ -75,14 +78,14 @@ void SchemaSyncService::addKeyspaceGCTasks()
/// They must be performed synchronously,
/// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively.
// GC safe point must be obtained ahead of syncing schema.
auto gc_safe_point
= PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace);
stage = "Sync schemas";
done_anything = syncSchemas(keyspace);
if (done_anything)
GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment();

stage = "GC";
auto gc_safe_point
= PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace);
done_anything = gc(gc_safe_point, keyspace);

return done_anything;
Expand All @@ -91,34 +94,40 @@ void SchemaSyncService::addKeyspaceGCTasks()
{
LOG_ERROR(
ks_log,
"{} failed by {} \n stack : {}",
"{}, keyspace={} failed by {} \n stack : {}",
stage,
keyspace,
e.displayText(),
e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(ks_log, "{} failed by {}", stage, e.displayText());
LOG_ERROR(ks_log, "{}, keyspace={} failed by {}", stage, keyspace, e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(ks_log, "{} failed by {}", stage, e.what());
LOG_ERROR(ks_log, "{}, keyspace={} failed by {}", stage, keyspace, e.what());
}
return false;
},
false,
context.getSettingsRef().ddl_sync_interval_seconds * 1000);

keyspace_handle_map.emplace(keyspace, task_handle);
num_add_tasks += 1;
}

auto log_level = num_add_tasks > 0 ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(log, log_level, "add sync schema task for keyspaces done, num_add_tasks={}", num_add_tasks);
}

void SchemaSyncService::removeKeyspaceGCTasks()
{
const auto keyspaces = context.getTMTContext().getStorages().getAllKeyspaces();
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);

UInt64 num_remove_tasks = 0;
// Remove stale sync schema task.
std::unique_lock<std::shared_mutex> lock(keyspace_map_mutex);
for (auto keyspace_handle_iter = keyspace_handle_map.begin(); keyspace_handle_iter != keyspace_handle_map.end();
/*empty*/)
{
Expand All @@ -128,24 +137,28 @@ void SchemaSyncService::removeKeyspaceGCTasks()
++keyspace_handle_iter;
continue;
}

auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace));
LOG_INFO(keyspace_log, "remove sync schema task");
background_pool.removeTask(keyspace_handle_iter->second);
keyspace_handle_iter = keyspace_handle_map.erase(keyspace_handle_iter);

context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace);
PDClientHelper::remove_ks_gc_sp(keyspace);

keyspace_gc_context.erase(keyspace);
keyspace_gc_context.erase(keyspace); // clear the last gc safepoint
}

auto log_level = num_remove_tasks > 0 ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
LOG_IMPL(log, log_level, "remove sync schema task for keyspaces done, num_remove_tasks={}", num_remove_tasks);
}

SchemaSyncService::~SchemaSyncService()
void SchemaSyncService::shutdown()
{
if (handle)
{
// stop the root handle first
background_pool.removeTask(handle);
handle = nullptr;
}

for (auto const & iter : keyspace_handle_map)
Expand All @@ -156,6 +169,11 @@ SchemaSyncService::~SchemaSyncService()
LOG_INFO(log, "SchemaSyncService stopped");
}

SchemaSyncService::~SchemaSyncService()
{
shutdown();
}

bool SchemaSyncService::syncSchemas(KeyspaceID keyspace_id)
{
return context.getTMTContext().getSchemaSyncerManager()->syncSchemas(context, keyspace_id);
Expand All @@ -168,12 +186,12 @@ inline std::tuple<bool, Timestamp> isSafeForGC(const DatabaseOrTablePtr & ptr, T
return {tombstone_ts != 0 && tombstone_ts < gc_safepoint, tombstone_ts};
}

Timestamp SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const
std::optional<Timestamp> SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const
{
std::shared_lock lock(keyspace_map_mutex);
auto iter = keyspace_gc_context.find(keyspace_id);
if (iter == keyspace_gc_context.end())
return 0;
return std::nullopt;
return iter->second.last_gc_safepoint;
}

Expand All @@ -185,12 +203,19 @@ void SchemaSyncService::updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp

bool SchemaSyncService::gc(Timestamp gc_safepoint, KeyspaceID keyspace_id)
{
const Timestamp last_gc_safepoint = lastGcSafePoint(keyspace_id);
if (last_gc_safepoint != 0 && gc_safepoint == last_gc_safepoint)
const std::optional<Timestamp> last_gc_safepoint = lastGcSafePoint(keyspace_id);
// for new deploy cluster, there is an interval that gc_safepoint return 0, skip it
if (gc_safepoint == 0)
return false;
// the gc safepoint is not changed since last schema gc run, skip it
if (last_gc_safepoint.has_value() && gc_safepoint == *last_gc_safepoint)
return false;

String last_gc_safepoint_str = "none";
if (last_gc_safepoint.has_value())
last_gc_safepoint_str = fmt::format("{}", *last_gc_safepoint);
auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace_id));
LOG_INFO(keyspace_log, "Schema GC begin, last_safepoint={} safepoint={}", last_gc_safepoint, gc_safepoint);
LOG_INFO(keyspace_log, "Schema GC begin, last_safepoint={} safepoint={}", last_gc_safepoint_str, gc_safepoint);

size_t num_tables_removed = 0;
size_t num_databases_removed = 0;
Expand Down Expand Up @@ -363,7 +388,7 @@ bool SchemaSyncService::gc(Timestamp gc_safepoint, KeyspaceID keyspace_id)
LOG_INFO(
keyspace_log,
"Schema GC meet error, will try again later, last_safepoint={} safepoint={}",
last_gc_safepoint,
last_gc_safepoint_str,
gc_safepoint);
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/TiDB/Schema/SchemaSyncService.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ class SchemaSyncService

bool gc(Timestamp gc_safepoint, KeyspaceID keyspace_id);

void shutdown();

private:
bool syncSchemas(KeyspaceID keyspace_id);

void addKeyspaceGCTasks();
void removeKeyspaceGCTasks();

Timestamp lastGcSafePoint(KeyspaceID keyspace_id) const;
std::optional<Timestamp> lastGcSafePoint(KeyspaceID keyspace_id) const;
void updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint);

private:
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ try
auto table_ids = MockTiDB::instance().newTables(db_name, tables, pd_client->getTS(), "dt");

refreshSchema();

for (auto table_id : table_ids)
{
refreshTableSchema(table_id);
Expand All @@ -286,15 +285,26 @@ try
MockTiDB::instance().dropTable(global_ctx, db_name, "t1", true);

refreshSchema();
for (auto table_id : table_ids)
{
refreshTableSchema(table_id);
}

global_ctx.initializeSchemaSyncService();
auto sync_service = global_ctx.getSchemaSyncService();
// run gc with safepoint == 0, will be skip
ASSERT_FALSE(sync_service->gc(0, NullspaceID));
ASSERT_TRUE(sync_service->gc(10000000, NullspaceID));
// run gc with the same safepoint, will be skip
ASSERT_FALSE(sync_service->gc(10000000, NullspaceID));
// run gc for another keyspace
// run gc for another keyspace with same safepoint, will be executed
ASSERT_TRUE(sync_service->gc(10000000, 1024));
// run gc with changed safepoint
ASSERT_TRUE(sync_service->gc(20000000, 1024));
// run gc with the same safepoint
ASSERT_FALSE(sync_service->gc(20000000, 1024));

sync_service->shutdown();
}
CATCH

Expand Down

0 comments on commit 922f80c

Please sign in to comment.