diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 42b111c53ab..73855588bb7 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -49,7 +49,8 @@ std::unordered_map> FailPointHelper::f M(force_legacy_or_checkpoint_page_file_exists) \ M(exception_in_creating_set_input_stream) \ M(exception_when_read_from_log) \ - M(exception_mpp_hash_build) + M(exception_mpp_hash_build) \ + M(exception_between_schema_change_in_the_same_diff) #define APPLY_FOR_FAILPOINTS(M) \ M(force_set_page_file_write_errno) \ diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index 876101c744e..3dd0db177c2 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -16,6 +16,7 @@ namespace DB { namespace ErrorCodes { +extern const int FAIL_POINT_ERROR; extern const int UNKNOWN_TABLE; } // namespace ErrorCodes @@ -44,7 +45,22 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer { TMTContext & tmt = context.getTMTContext(); auto schema_syncer = tmt.getSchemaSyncer(); - schema_syncer->syncSchemas(context); + try + { + schema_syncer->syncSchemas(context); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::FAIL_POINT_ERROR) + { + output(e.message()); + return; + } + else + { + throw; + } + } output("schemas refreshed"); } @@ -56,7 +72,7 @@ void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer { auto & service = context.getSchemaSyncService(); Timestamp gc_safe_point = 0; - if (args.size() == 0) + if (args.empty()) gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient()); else gc_safe_point = safeGet(typeid_cast(*args[0]).value); @@ -76,7 +92,7 @@ void dbgFuncResetSchemas(Context & context, const ASTs &, DBGInvoker::Printer ou void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printer output) { - if (args.size() < 1 || args.size() > 2) + if (args.empty() || args.size() > 2) throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS); const String & database_name = typeid_cast(*args[0]).name; diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index d31facd7b84..9d6f6153fc2 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -146,15 +146,18 @@ class IManageableStorage : public IStorage virtual size_t getRowKeyColumnSize() const { return 1; } - // when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr, - // and `releaseDecodingBlock` need to be called when the block is free - // when `need_block` is false, it will just return an nullptr - virtual std::pair getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) + /// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr, + /// and `releaseDecodingBlock` need to be called when the block is free + /// when `need_block` is false, it will just return an nullptr + /// This method must be called under the protection of table structure lock + virtual std::pair getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & /* table_structure_lock */, bool /* need_block */) { throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); }; - virtual void releaseDecodingBlock(Int64 /* schema_version */, BlockUPtr /* block */) + /// The `block_decoding_schema_version` is just an internal version for `DecodingStorageSchemaSnapshot`, + /// And it has no relation with the table schema version. + virtual void releaseDecodingBlock(Int64 /* block_decoding_schema_version */, BlockUPtr /* block */) { throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } diff --git a/dbms/src/Storages/PrimaryKeyNotMatchException.cpp b/dbms/src/Storages/PrimaryKeyNotMatchException.cpp index 7eaf272ae17..57c16a5237e 100644 --- a/dbms/src/Storages/PrimaryKeyNotMatchException.cpp +++ b/dbms/src/Storages/PrimaryKeyNotMatchException.cpp @@ -15,26 +15,29 @@ namespace DB { - String fixCreateStatementWithPriKeyNotMatchException( // - Context & context, const String old_definition, const String & table_metadata_path, const PrimaryKeyNotMatchException & ex, + Context & context, + const String & old_definition, + const String & table_metadata_path, + const PrimaryKeyNotMatchException & ex, Poco::Logger * log) { LOG_WARNING( - log, "Try to fix statement in " + table_metadata_path + ", primary key [" + ex.pri_key + "] -> [" + ex.actual_pri_key + "]"); + log, + "Try to fix statement in " + table_metadata_path + ", primary key [" + ex.pri_key + "] -> [" + ex.actual_pri_key + "]"); // Try to fix the create statement. ParserCreateQuery parser; ASTPtr ast = parseQuery(parser, old_definition.data(), old_definition.data() + old_definition.size(), "in file " + table_metadata_path, 0); ASTCreateQuery & ast_create_query = typeid_cast(*ast); auto args = ast_create_query.storage->engine->arguments; - if (args->children.size() >= 1) + if (!args->children.empty()) { ASTPtr pk_ast = std::make_shared(); pk_ast->children.emplace_back(std::make_shared(ex.actual_pri_key)); args->children[0] = pk_ast; } - const String statement = getTableDefinitionFromCreateQuery(ast); + String statement = getTableDefinitionFromCreateQuery(ast); const String table_metadata_tmp_path = table_metadata_path + ".tmp"; { @@ -47,9 +50,8 @@ String fixCreateStatementWithPriKeyNotMatchException( // EncryptionPath encryption_path = use_target_encrypt_info ? EncryptionPath(table_metadata_path, "") : EncryptionPath(table_metadata_tmp_path, ""); { - bool create_new_encryption_info = !use_target_encrypt_info && statement.size(); - WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info, - nullptr, statement.size(), O_WRONLY | O_CREAT | O_EXCL); + bool create_new_encryption_info = !use_target_encrypt_info && !statement.empty(); + WriteBufferFromFileProvider out(context.getFileProvider(), table_metadata_tmp_path, encryption_path, create_new_encryption_info, nullptr, statement.size(), O_WRONLY | O_CREAT | O_EXCL); writeString(statement, out); out.next(); if (context.getSettingsRef().fsync_metadata) @@ -60,8 +62,7 @@ String fixCreateStatementWithPriKeyNotMatchException( // try { /// rename atomically replaces the old file with the new one. - context.getFileProvider()->renameFile(table_metadata_tmp_path, encryption_path, table_metadata_path, - EncryptionPath(table_metadata_path, ""), !use_target_encrypt_info); + context.getFileProvider()->renameFile(table_metadata_tmp_path, encryption_path, table_metadata_path, EncryptionPath(table_metadata_path, ""), !use_target_encrypt_info); } catch (...) { diff --git a/dbms/src/Storages/PrimaryKeyNotMatchException.h b/dbms/src/Storages/PrimaryKeyNotMatchException.h index f16cad063f6..ae454202c9f 100644 --- a/dbms/src/Storages/PrimaryKeyNotMatchException.h +++ b/dbms/src/Storages/PrimaryKeyNotMatchException.h @@ -9,22 +9,21 @@ class Logger; namespace DB { - class Context; -struct PrimaryKeyNotMatchException +struct PrimaryKeyNotMatchException : public std::exception { // The primary key name in definition const String pri_key; // The actual primary key name in TiDB::TableInfo const String actual_pri_key; PrimaryKeyNotMatchException(const String & pri_key_, const String & actual_pri_key_) - : pri_key(pri_key_), actual_pri_key(actual_pri_key_) + : pri_key(pri_key_) + , actual_pri_key(actual_pri_key_) {} }; // This function will replace the primary key and update statement in `table_metadata_path`. The correct statement will be return. -String fixCreateStatementWithPriKeyNotMatchException(Context & context, const String old_definition, const String & table_metadata_path, - const PrimaryKeyNotMatchException & ex, Poco::Logger * log); +String fixCreateStatementWithPriKeyNotMatchException(Context & context, const String & old_definition, const String & table_metadata_path, const PrimaryKeyNotMatchException & ex, Poco::Logger * log); } // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 0c37525785e..64389d982cb 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -424,7 +424,7 @@ class DMBlockOutputStream : public IBlockOutputStream BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settings & settings) { - auto & insert_query = typeid_cast(*query); + const auto & insert_query = typeid_cast(*query); auto decorator = [&](const Block & block) { // return this->buildInsertBlock(insert_query.is_import, insert_query.is_delete, block); }; @@ -563,7 +563,7 @@ BlockInputStreams StorageDeltaMerge::read( // failed to parsed. ColumnDefines columns_to_read; auto header = store->getHeader(); - for (auto & n : column_names) + for (const auto & n : column_names) { ColumnDefine col_define; if (n == EXTRA_HANDLE_COLUMN_NAME) @@ -798,7 +798,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { - auto start_index = rand() % (total_rows - delete_rows + 1); + auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp) DM::RowKeyRange range = DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()); { @@ -849,14 +849,16 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t delete_rows) LOG_FMT_ERROR(log, "Rows after delete range not match, expected: {}, got: {}", (total_rows - delete_rows), after_delete_rows); } -std::pair StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(bool need_block) +std::pair StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool need_block) { + (void)table_structure_lock; std::lock_guard lock{decode_schema_mutex}; - if (!decoding_schema_snapshot || decoding_schema_snapshot->schema_version < tidb_table_info.schema_version) + if (!decoding_schema_snapshot || decoding_schema_changed) { auto & store = getAndMaybeInitStore(); - decoding_schema_snapshot = std::make_shared(store->getStoreColumns(), tidb_table_info, store->getHandle()); + decoding_schema_snapshot = std::make_shared(store->getStoreColumns(), tidb_table_info, store->getHandle(), decoding_schema_version++); cache_blocks.clear(); + decoding_schema_changed = false; } if (need_block) @@ -878,10 +880,10 @@ std::pair StorageDeltaMerg } } -void StorageDeltaMerge::releaseDecodingBlock(Int64 schema_version, BlockUPtr block_ptr) +void StorageDeltaMerge::releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block_ptr) { std::lock_guard lock{decode_schema_mutex}; - if (!decoding_schema_snapshot || schema_version < decoding_schema_snapshot->schema_version) + if (!decoding_schema_snapshot || block_decoding_schema_version < decoding_schema_snapshot->decoding_schema_version) return; if (cache_blocks.size() >= max_cached_blocks_num) return; @@ -931,12 +933,12 @@ static void updateDeltaMergeTableCreateStatement( const SortDescription & pk_names, const ColumnsDescription & columns, const OrderedNameSet & hidden_columns, - const OptionTableInfoConstRef table_info, + OptionTableInfoConstRef table_info, Timestamp tombstone, const Context & context); inline OptionTableInfoConstRef getTableInfoForCreateStatement( - const OptionTableInfoConstRef table_info_from_tidb, + OptionTableInfoConstRef table_info_from_tidb, TiDB::TableInfo & table_info_from_store, const ColumnDefines & store_table_columns, const OrderedNameSet & hidden_columns) @@ -1060,6 +1062,7 @@ try updateTableColumnInfo(); } } + decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); ColumnDefines store_columns = getStoreColumnDefines(); @@ -1416,7 +1419,7 @@ void StorageDeltaMerge::startup() tmt.getStorages().put(std::static_pointer_cast(shared_from_this())); } -void StorageDeltaMerge::shutdown() +void StorageDeltaMerge::shutdownImpl() { bool v = false; if (!shutdown_called.compare_exchange_strong(v, true)) @@ -1427,6 +1430,11 @@ void StorageDeltaMerge::shutdown() } } +void StorageDeltaMerge::shutdown() +{ + shutdownImpl(); +} + void StorageDeltaMerge::removeFromTMTContext() { // remove this table from TMTContext @@ -1437,7 +1445,7 @@ void StorageDeltaMerge::removeFromTMTContext() StorageDeltaMerge::~StorageDeltaMerge() { - shutdown(); + shutdownImpl(); } DataTypePtr StorageDeltaMerge::getPKTypeImpl() const diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index d38feced258..7b0a7745dce 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -87,7 +87,7 @@ class StorageDeltaMerge // Apply AlterCommands synced from TiDB should use `alterFromTiDB` instead of `alter(...)` void alterFromTiDB( const TableLockHolder &, - const AlterCommands & commands, + const AlterCommands & params, const String & database_name, const TiDB::TableInfo & table_info, const SchemaNameMapper & name_mapper, @@ -123,9 +123,9 @@ class StorageDeltaMerge size_t getRowKeyColumnSize() const override { return rowkey_column_size; } - std::pair getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) override; + std::pair getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool /* need_block */) override; - void releaseDecodingBlock(Int64 schema_version, BlockUPtr block) override; + void releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block) override; bool initStoreIfDataDirExist() override; @@ -168,6 +168,7 @@ class StorageDeltaMerge void updateTableColumnInfo(); DM::ColumnDefines getStoreColumnDefines() const; bool dataDirExist(); + void shutdownImpl(); #ifndef DBMS_PUBLIC_GTEST private: @@ -206,6 +207,11 @@ class StorageDeltaMerge mutable std::mutex decode_schema_mutex; DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot; + // The following two members must be used under the protection of table structure lock + bool decoding_schema_changed = false; + // internal version for `decoding_schema_snapshot` + Int64 decoding_schema_version = 1; + // avoid creating block every time when decoding row std::vector cache_blocks; // avoid creating too many cached blocks(the typical num should be less and equal than raft apply thread) diff --git a/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h b/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h index 8af93ee8db5..24590b9b71d 100644 --- a/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h +++ b/dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h @@ -52,13 +52,14 @@ struct DecodingStorageSchemaSnapshot bool pk_is_handle; bool is_common_handle; TMTPKType pk_type = TMTPKType::UNSPECIFIED; - Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION; + // an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version + Int64 decoding_schema_version; - DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_) + DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_, Int64 decoding_schema_version_) : column_defines{std::move(column_defines_)} , pk_is_handle{table_info_.pk_is_handle} , is_common_handle{table_info_.is_common_handle} - , schema_version{table_info_.schema_version} + , decoding_schema_version{decoding_schema_version_} { std::unordered_map column_lut; for (size_t i = 0; i < table_info_.columns.size(); i++) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index d2e5fbd2b75..c708490a6e4 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -100,14 +100,14 @@ static void writeRegionDataToStorage( /// Read region data as block. Stopwatch watch; - Int64 block_schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION; + Int64 block_decoding_schema_version = -1; BlockUPtr block_ptr = nullptr; if (need_decode) { LOG_FMT_TRACE(log, "{} begin to decode table {}, region {}", FUNCTION_NAME, table_id, region->id()); DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot; - std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(true); - block_schema_version = decoding_schema_snapshot->schema_version; + std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(lock, true); + block_decoding_schema_version = decoding_schema_snapshot->decoding_schema_version; auto reader = RegionBlockReader(decoding_schema_snapshot); if (!reader.read(*block_ptr, data_list_read, force_decode)) @@ -139,7 +139,7 @@ static void writeRegionDataToStorage( write_part_cost = watch.elapsedMilliseconds(); GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write).Observe(write_part_cost / 1000.0); if (need_decode) - storage->releaseDecodingBlock(block_schema_version, std::move(block_ptr)); + storage->releaseDecodingBlock(block_decoding_schema_version, std::move(block_ptr)); LOG_TRACE(log, FUNCTION_NAME << ": table " << table_id << ", region " << region->id() << ", cost [region decode " << region_decode_cost @@ -496,7 +496,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio } DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot; - std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false); + std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false); res_block = createBlockSortByColumnID(decoding_schema_snapshot); auto reader = RegionBlockReader(decoding_schema_snapshot); if (!reader.read(res_block, *data_list_read, force_decode)) @@ -548,7 +548,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) auto table_lock = storage->lockStructureForShare(getThreadName()); dm_storage = std::dynamic_pointer_cast(storage); // only dt storage engine support `getSchemaSnapshotAndBlockForDecoding`, other engine will throw exception - std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false); + std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false); std::tie(std::ignore, drop_lock) = std::move(table_lock).release(); return true; }; diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.cpp b/dbms/src/Storages/Transaction/SchemaBuilder.cpp index 84fed766ab7..161d5e08ea8 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.cpp +++ b/dbms/src/Storages/Transaction/SchemaBuilder.cpp @@ -45,6 +45,7 @@ extern const char exception_before_step_2_rename_in_exchange_partition[]; extern const char exception_after_step_2_in_exchange_partition[]; extern const char exception_before_step_3_rename_in_exchange_partition[]; extern const char exception_after_step_3_in_exchange_partition[]; +extern const char exception_between_schema_change_in_the_same_diff[]; } // namespace FailPoints bool isReservedDatabase(Context & context, const String & database_name) @@ -337,8 +338,16 @@ void SchemaBuilder::applyAlterPhysicalTable(DBInfoPtr db_inf // Using original table info with updated columns instead of using new_table_info directly, // so that other changes (RENAME commands) won't be saved. // Also, updating schema_version as altering column is structural. - for (const auto & schema_change : schema_changes) + for (size_t i = 0; i < schema_changes.size(); i++) { + if (i > 0) + { + /// If there are multiple schema change in the same diff, + /// the table schema version will be set to the latest schema version after the first schema change is applied. + /// Throw exception in the middle of the schema change to mock the case that there is a race between data decoding and applying different schema change. + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_schema_change_in_the_same_diff); + } + const auto & schema_change = schema_changes[i]; /// Update column infos by applying schema change in this step. schema_change.second(orig_table_info); /// Update schema version aggressively for the sake of correctness. @@ -564,14 +573,14 @@ void SchemaBuilder::applyPartitionDiff(TiDB::DBInfoPtr db_in updated_table_info.partition = table_info->partition; /// Apply changes to physical tables. - for (auto orig_def : orig_defs) + for (const auto & orig_def : orig_defs) { if (new_part_id_set.count(orig_def.id) == 0) { applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id); } } - for (auto new_def : new_defs) + for (const auto & new_def : new_defs) { if (orig_part_id_set.count(new_def.id) == 0) { @@ -979,7 +988,7 @@ void SchemaBuilder::applyCreatePhysicalTable(DBInfoPtr db_in /// Check if this is a RECOVER table. { auto & tmt_context = context.getTMTContext(); - if (auto storage = tmt_context.getStorages().get(table_info->id).get(); storage) + if (auto * storage = tmt_context.getStorages().get(table_info->id).get(); storage) { if (!storage->isTombstone()) { @@ -1093,7 +1102,7 @@ template void SchemaBuilder::applyDropTable(DBInfoPtr db_info, TableID table_id) { auto & tmt_context = context.getTMTContext(); - auto storage = tmt_context.getStorages().get(table_id).get(); + auto * storage = tmt_context.getStorages().get(table_id).get(); if (storage == nullptr) { LOG_DEBUG(log, "table " << table_id << " does not exist."); diff --git a/dbms/src/Storages/Transaction/SchemaBuilder.h b/dbms/src/Storages/Transaction/SchemaBuilder.h index 2f8e321a04b..17840b7a0af 100644 --- a/dbms/src/Storages/Transaction/SchemaBuilder.h +++ b/dbms/src/Storages/Transaction/SchemaBuilder.h @@ -37,7 +37,7 @@ struct SchemaBuilder void applyDropSchema(DatabaseID schema_id); /// Parameter schema_name should be mapped. - void applyDropSchema(const String & schema_name); + void applyDropSchema(const String & db_name); bool applyCreateSchema(DatabaseID schema_id); diff --git a/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h b/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h index faba530d27b..d05430b2426 100644 --- a/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h +++ b/dbms/src/Storages/Transaction/TiDBSchemaSyncer.h @@ -14,6 +14,11 @@ namespace DB { +namespace ErrorCodes +{ +extern const int FAIL_POINT_ERROR; +}; + template struct TiDBSchemaSyncer : public SchemaSyncer { @@ -165,6 +170,10 @@ struct TiDBSchemaSyncer : public SchemaSyncer } catch (Exception & e) { + if (e.code() == ErrorCodes::FAIL_POINT_ERROR) + { + throw; + } GET_METRIC(tiflash_schema_apply_count, type_failed).Increment(); LOG_WARNING(log, "apply diff meets exception : " << e.displayText() << " \n stack is " << e.getStackTrace().toString()); return false; diff --git a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h index ed046538a9b..31fad745eaf 100644 --- a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h +++ b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h @@ -271,11 +271,11 @@ inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(co if (handle_id != EXTRA_HANDLE_COLUMN_ID) { auto iter = std::find_if(store_columns.begin(), store_columns.end(), [&](const ColumnDefine & cd) { return cd.id == handle_id; }); - return std::make_shared(std::make_shared(store_columns), table_info, *iter); + return std::make_shared(std::make_shared(store_columns), table_info, *iter, /* decoding_schema_version_ */ 1); } else { - return std::make_shared(std::make_shared(store_columns), table_info, store_columns[0]); + return std::make_shared(std::make_shared(store_columns), table_info, store_columns[0], /* decoding_schema_version_ */ 1); } } diff --git a/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test b/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test index c6f366bfd07..189c1f48846 100644 --- a/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test +++ b/tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test @@ -23,9 +23,25 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t | 1 | world | 0.00 | 2 | NULL | +---+-------+------+------+------+ -# Need to apply a lossy type change to reorganize data. issue#3714 +=> DBGInvoke __enable_schema_sync_service('false') + +>> DBGInvoke __enable_fail_point(exception_between_schema_change_in_the_same_diff) + +# stop decoding data +>> DBGInvoke __enable_fail_point(pause_before_apply_raft_cmd) + +# Need to apply a lossy type change to reorganize data. issue#3714 mysql> alter table test.t modify c decimal(6,3) +# refresh schema and hit the `exception_between_schema_change_in_the_same_diff` failpoint +>> DBGInvoke __refresh_schemas() + +>> DBGInvoke __disable_fail_point(exception_between_schema_change_in_the_same_diff) + +>> DBGInvoke __disable_fail_point(pause_before_apply_raft_cmd) + +=> DBGInvoke __enable_schema_sync_service('true') + mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t +---+-------+-------+------+------+ | a | b | c | d | e |