diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index bd7bea5b2b5..e58821cb20d 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -8,7 +8,7 @@ namespace DB { std::unordered_map> FailPointHelper::fail_point_wait_channels; -#define APPLY_FOR_FAILPOINTS(M) \ +#define APPLY_FOR_FAILPOINTS_ONCE(M) \ M(exception_between_drop_meta_and_data) \ M(exception_between_alter_data_and_meta) \ M(exception_drop_table_during_remove_meta) \ @@ -41,19 +41,26 @@ std::unordered_map> FailPointHelper::f M(force_set_sst_to_dtfile_block_size) \ M(force_set_sst_decode_rand) -#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \ - M(pause_after_learner_read) \ - M(hang_in_execution) \ - M(pause_before_dt_background_delta_merge) \ - M(pause_until_dt_background_delta_merge) \ - M(pause_before_apply_raft_cmd) \ - M(pause_before_apply_raft_snapshot) \ +#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \ + M(pause_after_learner_read) \ + M(hang_in_execution) \ + M(pause_before_dt_background_delta_merge) \ + M(pause_until_dt_background_delta_merge) \ + M(pause_before_apply_raft_cmd) \ + M(pause_before_apply_raft_snapshot) \ M(pause_until_apply_raft_snapshot) +#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \ + M(pause_when_reading_from_dt_stream) \ + M(pause_when_writing_to_dt_store) \ + M(pause_when_ingesting_to_dt_store) \ + M(pause_when_altering_dt_store) + namespace FailPoints { #define M(NAME) extern const char NAME[] = #NAME ""; -APPLY_FOR_FAILPOINTS(M) +APPLY_FOR_FAILPOINTS_ONCE(M) +APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) #undef M } // namespace FailPoints @@ -92,20 +99,28 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name) return; \ } - APPLY_FOR_FAILPOINTS(M) + APPLY_FOR_FAILPOINTS_ONCE(M) #undef M -#define M(NAME) \ +#define SUB_M(NAME, flags) \ if (fail_point_name == FailPoints::NAME) \ { \ /* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \ - fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \ + fiu_enable(FailPoints::NAME, 1, nullptr, flags); \ fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared()); \ return; \ } +#define M(NAME) SUB_M(NAME, FIU_ONETIME) + + APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) +#undef M + +#define M(NAME) SUB_M(NAME, 0) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) #undef M +#undef SUB_M + throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR); } diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 3594764513e..11aa3c5af70 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -75,6 +75,7 @@ DBGInvoker::DBGInvoker() regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService); regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas); + regSchemalessFunc("gc_schemas", dbgFuncGcSchemas); regSchemalessFunc("reset_schemas", dbgFuncResetSchemas); regSchemalessFunc("is_tombstone", dbgFuncIsTombstone); diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index e2adf76e9e5..ee0b4af4fac 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -50,6 +50,24 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer output(ss.str()); } +// Trigger gc on all databases / tables. +// Usage: +// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point])" +void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output) +{ + auto & service = context.getSchemaSyncService(); + Timestamp gc_safe_point = 0; + if (args.size() == 0) + gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); + else + gc_safe_point = safeGet(typeid_cast(*args[0]).value); + service->gc(gc_safe_point); + + std::stringstream ss; + ss << "schemas gc done"; + output(ss.str()); +} + void dbgFuncResetSchemas(Context & context, const ASTs &, DBGInvoker::Printer output) { TMTContext & tmt = context.getTMTContext(); diff --git a/dbms/src/Debug/dbgFuncSchema.h b/dbms/src/Debug/dbgFuncSchema.h index fd9f87f9224..de75ce895c8 100644 --- a/dbms/src/Debug/dbgFuncSchema.h +++ b/dbms/src/Debug/dbgFuncSchema.h @@ -18,6 +18,11 @@ void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInv // ./storage-client.sh "DBGInvoke refresh_schemas()" void dbgFuncRefreshSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output); +// Trigger gc on all databases / tables. +// Usage: +// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point])" +void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output); + // Reset schemas. // Usage: // ./storages-client.sh "DBGInvoke reset_schemas()" diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index c554dd08a53..ade9d2f59f2 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -93,6 +93,65 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(Context & context_, const std } } +static std::tuple, std::optional> // +buildRemoteTS(const std::unordered_map & region_retry, const DAGQueryBlock & query_block, + const tipb::TableScan & ts, const String & handle_column_name, const TableStructureLockHolder &, const ManageableStoragePtr & storage, + Context & context, Poco::Logger * log) +{ + if (region_retry.empty()) + return std::make_tuple(std::nullopt, std::nullopt); + + for (auto it : region_retry) + { + context.getQueryContext().getDAGContext()->retry_regions.push_back(it.second); + } + LOG_DEBUG(log, ({ + std::stringstream ss; + ss << "Start to retry " << region_retry.size() << " regions ("; + for (auto & r : region_retry) + ss << r.first << ","; + ss << ")"; + ss.str(); + })); + + DAGSchema schema; + ::tipb::DAGRequest dag_req; + + { + const auto & table_info = storage->getTableInfo(); + tipb::Executor * ts_exec = dag_req.add_executors(); + ts_exec->set_tp(tipb::ExecType::TypeTableScan); + ts_exec->set_executor_id(query_block.source->executor_id()); + *(ts_exec->mutable_tbl_scan()) = ts; + + for (int i = 0; i < ts.columns().size(); ++i) + { + const auto & col = ts.columns(i); + auto col_id = col.column_id(); + + if (col_id == DB::TiDBPkColumnID) + { + ColumnInfo ci; + ci.tp = TiDB::TypeLongLong; + ci.setPriKeyFlag(); + ci.setNotNullFlag(); + schema.emplace_back(std::make_pair(handle_column_name, std::move(ci))); + } + else + { + auto & col_info = table_info.getColumnInfo(col_id); + schema.emplace_back(std::make_pair(col_info.name, col_info)); + } + dag_req.add_output_offsets(i); + } + dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock); + } + /// do not collect execution summaries because in this case because the execution summaries + /// will be collected by CoprocessorBlockInputStream + dag_req.set_collect_execution_summaries(false); + return std::make_tuple(dag_req, schema); +} + // the flow is the same as executeFetchcolumns void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) { @@ -238,9 +297,13 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & bool need_local_read = !query_info.mvcc_query_info->regions_query_info.empty(); if (need_local_read) { - readFromLocalStorage(table_id, required_columns, query_info, max_block_size, learner_read_snapshot, pipeline, region_retry); + readFromLocalStorage( + table_structure_lock, table_id, required_columns, query_info, max_block_size, learner_read_snapshot, pipeline, region_retry); } + // Should build these vars under protect of `table_structure_lock`. + auto [dag_req, schema] = buildRemoteTS(region_retry, query_block, ts, handle_column_name, table_structure_lock, storage, context, log); + auto null_stream_if_empty = std::make_shared(storage->getSampleBlockForColumns(required_columns)); // The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result // from those streams even if DDL operations are applied. Release the alter lock so that reading does not // block DDL operations, keep the drop lock so that the storage not to be dropped during reading. @@ -249,55 +312,11 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & // For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop mode. if (!region_retry.empty()) { - for (auto it : region_retry) - { - context.getQueryContext().getDAGContext()->retry_regions.push_back(it.second); - } - LOG_DEBUG(log, ({ - std::stringstream ss; - ss << "Start to retry " << region_retry.size() << " regions ("; - for (auto & r : region_retry) - ss << r.first << ","; - ss << ")"; - ss.str(); - })); - - DAGSchema schema; - ::tipb::DAGRequest dag_req; - - { - const auto & table_info = storage->getTableInfo(); - tipb::Executor * ts_exec = dag_req.add_executors(); - ts_exec->set_tp(tipb::ExecType::TypeTableScan); - ts_exec->set_executor_id(query_block.source->executor_id()); - *(ts_exec->mutable_tbl_scan()) = ts; - - for (int i = 0; i < ts.columns().size(); ++i) - { - const auto & col = ts.columns(i); - auto col_id = col.column_id(); - - if (col_id == DB::TiDBPkColumnID) - { - ColumnInfo ci; - ci.tp = TiDB::TypeLongLong; - ci.setPriKeyFlag(); - ci.setNotNullFlag(); - schema.emplace_back(std::make_pair(handle_column_name, std::move(ci))); - } - else - { - auto & col_info = table_info.getColumnInfo(col_id); - schema.emplace_back(std::make_pair(col_info.name, col_info)); - } - dag_req.add_output_offsets(i); - } - dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock); - } - /// do not collect execution summaries because in this case because the execution summaries - /// will be collected by CoprocessorBlockInputStream - dag_req.set_collect_execution_summaries(false); - +#ifndef NDEBUG + if (unlikely(!dag_req.has_value() || !schema.has_value())) + throw TiFlashException( + "Try to read from remote but can not build DAG request. Should not happen!", Errors::Coprocessor::Internal); +#endif std::vector ranges; for (auto & info : region_retry) { @@ -305,12 +324,12 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & ranges.emplace_back(*range.first, *range.second); } sort(ranges.begin(), ranges.end()); - executeRemoteQueryImpl(pipeline, ranges, dag_req, schema); + executeRemoteQueryImpl(pipeline, ranges, *dag_req, *schema); } if (pipeline.streams.empty()) { - pipeline.streams.emplace_back(std::make_shared(storage->getSampleBlockForColumns(required_columns))); + pipeline.streams.emplace_back(null_stream_if_empty); } pipeline.transform([&](auto & stream) { stream->addTableLock(table_drop_lock); }); @@ -346,6 +365,7 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & } void DAGQueryBlockInterpreter::readFromLocalStorage( // + const TableStructureLockHolder &, // const TableID table_id, const Names & required_columns, SelectQueryInfo & query_info, const size_t max_block_size, const LearnerReadSnapshot & learner_read_snapshot, // Pipeline & pipeline, std::unordered_map & region_retry) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 1e8bde17930..76bf68e2304 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -117,7 +117,8 @@ class DAGQueryBlockInterpreter TiDB::TiDBCollators & collators, AggregateDescriptions & aggregate_descriptions); void executeProject(Pipeline & pipeline, NamesWithAliases & project_cols); - void readFromLocalStorage( // + void readFromLocalStorage( // + const TableStructureLockHolder &, // const TableID table_id, const Names & required_columns, SelectQueryInfo & query_info, const size_t max_block_size, const LearnerReadSnapshot & learner_read_snapshot, // Pipeline & pipeline, std::unordered_map & region_retry); diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 0ea90f6c1ac..b6fe3354a9e 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -1,11 +1,17 @@ #pragma once +#include #include #include #include namespace DB { +namespace FailPoints +{ +extern const char pause_when_reading_from_dt_stream[]; +} // namespace FailPoints + namespace DM { @@ -80,6 +86,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream } LOG_TRACE(log, "Start to read segment [" + DB::toString(cur_segment->segmentId()) + "]"); } + FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream); Block res = cur_stream->read(res_filter, return_filter); @@ -122,4 +129,4 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream }; } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index e3433a6cb4a..e40c833b17e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -61,6 +61,9 @@ namespace FailPoints { extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; +extern const char pause_when_writing_to_dt_store[]; +extern const char pause_when_ingesting_to_dt_store[]; +extern const char pause_when_altering_dt_store[]; extern const char force_triggle_background_merge_delta[]; extern const char force_triggle_foreground_flush[]; } // namespace FailPoints @@ -458,6 +461,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ segment = segment_it->second; } + FAIL_POINT_PAUSE(FailPoints::pause_when_writing_to_dt_store); waitForWrite(dm_context, segment); if (segment->hasAbandoned()) continue; @@ -610,6 +614,7 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, segment = segment_it->second; } + FAIL_POINT_PAUSE(FailPoints::pause_when_ingesting_to_dt_store); waitForWrite(dm_context, segment); if (segment->hasAbandoned()) continue; @@ -1422,10 +1427,13 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (segment->getLastCheckGCSafePoint() >= gc_safe_point) continue; + const auto segment_id = segment->segmentId(); RowKeyRange segment_range = segment->getRowKeyRange(); if (segment->getDelta()->isUpdating()) { - LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); + LOG_DEBUG(log, + "GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name + << "]"); continue; } @@ -1458,15 +1466,20 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) checkSegmentUpdate(dm_context, segment, type); gc_segments_num++; finish_gc_on_segment = true; - LOG_INFO(log, "GC-merge-delta done [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); + LOG_INFO(log, + "GC-merge-delta done Segment [" << segment_id << "] [range=" << segment_range.toDebugString() + << "] [table=" << table_name << "]"); } } if (!finish_gc_on_segment) - LOG_DEBUG(log, "GC is skipped [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); + LOG_DEBUG(log, + "GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() + << "] [table=" << table_name << "]"); } catch (Exception & e) { - e.addMessage("while apply gc [range=" + segment_range.toDebugString() + "] [table=" + table_name + "]"); + e.addMessage("while apply gc Segment [" + DB::toString(segment_id) + "] [range=" + segment_range.toDebugString() + + "] [table=" + table_name + "]"); e.rethrow(); } } @@ -1901,6 +1914,8 @@ void DeltaMergeStore::applyAlters(const AlterCommands & commands, { std::unique_lock lock(read_write_mutex); + FAIL_POINT_PAUSE(FailPoints::pause_when_altering_dt_store); + ColumnDefines new_original_table_columns(original_table_columns.begin(), original_table_columns.end()); for (const auto & command : commands) { diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.cpp b/dbms/src/Storages/Transaction/SchemaBuilder.cpp index f31755d1140..984aede5296 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.cpp +++ b/dbms/src/Storages/Transaction/SchemaBuilder.cpp @@ -822,7 +822,7 @@ void SchemaBuilder::applyDropSchema(DatabaseID schema_id) if (unlikely(it == databases.end())) { LOG_INFO( - log, "Syncer wants to drop database: " << std::to_string(schema_id) << " . But database is not found, may has been dropped."); + log, "Syncer wants to drop database [id=" << std::to_string(schema_id) << "], but database is not found, may has been dropped."); return; } applyDropSchema(name_mapper.mapDatabaseName(*it->second)); diff --git a/dbms/src/Storages/Transaction/SchemaSyncService.cpp b/dbms/src/Storages/Transaction/SchemaSyncService.cpp index 0cb18a985e8..06b768bb146 100644 --- a/dbms/src/Storages/Transaction/SchemaSyncService.cpp +++ b/dbms/src/Storages/Transaction/SchemaSyncService.cpp @@ -8,9 +8,14 @@ #include #include #include +#include namespace DB { +namespace ErrorCodes +{ +extern const int DEADLOCK_AVOIDED; +} // namespace ErrorCodes SchemaSyncService::SchemaSyncService(DB::Context & context_) : context(context_), background_pool(context_.getBackgroundPool()), log(&Logger::get("SchemaSyncService")) @@ -68,11 +73,10 @@ inline bool isSafeForGC(const DatabaseOrTablePtr & ptr, Timestamp gc_safe_point) bool SchemaSyncService::gc(Timestamp gc_safe_point) { auto & tmt_context = context.getTMTContext(); - auto pd_client = tmt_context.getPDClient(); if (gc_safe_point == gc_context.last_gc_safe_point) return false; - LOG_DEBUG(log, "Performing GC using safe point " << gc_safe_point); + LOG_INFO(log, "Performing GC using safe point " << gc_safe_point); // The storages that are ready for gc std::vector> storages_to_gc; @@ -98,6 +102,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) } // Physically drop tables + bool succeeded = true; for (auto & storage_ : storages_to_gc) { // Get a shared_ptr from weak_ptr, it should always success. @@ -129,8 +134,13 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) } catch (DB::Exception & e) { + succeeded = false; + String err_msg; // Maybe a read lock of a table is held for a long time, just ignore it this round. - auto err_msg = getCurrentExceptionMessage(true); + if (e.code() == ErrorCodes::DEADLOCK_AVOIDED) + err_msg = "locking attempt has timed out!"; // ignore verbose stack for this error + else + err_msg = getCurrentExceptionMessage(true); LOG_INFO(log, "Physically drop table " << canonical_name << " is skipped, reason: " << err_msg); } } @@ -169,14 +179,26 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point) } catch (DB::Exception & e) { - auto err_msg = getCurrentExceptionMessage(true); + succeeded = false; + String err_msg; + if (e.code() == ErrorCodes::DEADLOCK_AVOIDED) + err_msg = "locking attempt has timed out!"; // ignore verbose stack for this error + else + err_msg = getCurrentExceptionMessage(true); LOG_INFO(log, "Physically drop database " << db_name << " is skipped, reason: " << err_msg); } } - gc_context.last_gc_safe_point = gc_safe_point; - - LOG_DEBUG(log, "Performed GC using safe point " << gc_safe_point); + if (succeeded) + { + gc_context.last_gc_safe_point = gc_safe_point; + LOG_INFO(log, "Performed GC using safe point " << gc_safe_point); + } + else + { + // Don't update last_gc_safe_point and retry later + LOG_INFO(log, "Performed GC using safe point " << gc_safe_point << " meet error, will try again later"); + } return true; } diff --git a/dbms/src/Storages/Transaction/SchemaSyncService.h b/dbms/src/Storages/Transaction/SchemaSyncService.h index 8b2d385fec8..b7841b2daed 100644 --- a/dbms/src/Storages/Transaction/SchemaSyncService.h +++ b/dbms/src/Storages/Transaction/SchemaSyncService.h @@ -2,17 +2,27 @@ #include #include -#include #include #include +namespace Poco +{ +class Logger; +} + namespace DB { class Context; class BackgroundProcessingPool; +class IAST; +using ASTPtr = std::shared_ptr; +using ASTs = std::vector; +using DBGInvokerPrinter = std::function; +extern void dbgFuncGcSchemas(Context &, const ASTs &, DBGInvokerPrinter); + class SchemaSyncService : public std::enable_shared_from_this, private boost::noncopyable { public: @@ -32,10 +42,12 @@ class SchemaSyncService : public std::enable_shared_from_this private: Context & context; + friend void dbgFuncGcSchemas(Context &, const ASTs &, DBGInvokerPrinter); + BackgroundProcessingPool & background_pool; BackgroundProcessingPool::TaskHandle handle; - Logger * log; + Poco::Logger * log; }; using SchemaSyncServicePtr = std::shared_ptr;