From 3f2a0f5880ddb8a2b011b91f0a755338fbc39f90 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Wed, 9 Oct 2024 13:29:48 +0800 Subject: [PATCH] Fix alter (#1987) ### What problem does this PR solve? Fix some bugs when restart after alter table. Add test case to recurrence the bug. Issue link:#1967, #1989 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Test cases --- .../sparse/sparse_benchmark_util.h | 1 + python/restart_test/test_alter.py | 27 ++++++- python/test_pysdk/test_alter.py | 80 ++++++++++++++++++- src/executor/operator/physical_alter.cpp | 40 +--------- src/storage/buffer/buffer_obj.cpp | 6 +- src/storage/meta/catalog.cpp | 20 ++++- src/storage/meta/entry/block_column_entry.cpp | 2 +- src/storage/meta/entry/block_entry.cpp | 16 +++- src/storage/meta/entry/block_entry.cppm | 4 +- src/storage/meta/entry/table_entry.cpp | 43 +++++++++- src/storage/meta/entry/table_entry.cppm | 2 +- src/storage/txn/txn.cpp | 35 ++++---- src/storage/txn/txn.cppm | 2 +- src/storage/wal/catalog_delta_entry.cpp | 5 +- src/storage/wal/wal_manager.cpp | 20 +---- .../config/restart_test/test_alter/1.toml | 1 + .../config/restart_test/test_alter/2.toml | 1 + test/sql/ddl/alter/drop_columns.slt | 4 +- 18 files changed, 213 insertions(+), 96 deletions(-) diff --git a/benchmark/local_infinity/sparse/sparse_benchmark_util.h b/benchmark/local_infinity/sparse/sparse_benchmark_util.h index 87dec992f1..54af631300 100644 --- a/benchmark/local_infinity/sparse/sparse_benchmark_util.h +++ b/benchmark/local_infinity/sparse/sparse_benchmark_util.h @@ -24,6 +24,7 @@ import infinity_exception; import sparse_util; import compilation_config; import bmp_util; +import local_file_handle; using namespace infinity; diff --git a/python/restart_test/test_alter.py b/python/restart_test/test_alter.py index 96de3c0059..f97199270e 100644 --- a/python/restart_test/test_alter.py +++ b/python/restart_test/test_alter.py @@ -74,9 +74,18 @@ def part2(infinity_obj): part1() part2() - def test_alter_with_deltalog(self, infinity_runner: InfinityRunner): + @pytest.mark.parametrize( + "config, sleep, flush_mid", + [ + ("test/data/config/restart_test/test_alter/1.toml", 0, False), + ("test/data/config/restart_test/test_alter/2.toml", 2, False), + ("test/data/config/restart_test/test_alter/2.toml", 2, True), + ], + ) + def test_alter_complex( + self, infinity_runner: InfinityRunner, config: str, sleep: int, flush_mid: bool + ): table_name = "test_alter2" - config = "test/data/config/restart_test/test_alter/2.toml" infinity_runner.clear() uri = common_values.TEST_LOCAL_HOST @@ -102,6 +111,10 @@ def part1(infinity_obj): table_obj.insert([{"c1": 1, "c2": 2, "c3": "test"}]) + if sleep > 0 and flush_mid: + print(f"sleep {sleep} seconds") + time.sleep(sleep) # wait for delta log flush + res = table_obj.add_columns({"c4": {"type": "varchar", "default": "tttt"}}) assert res.error_code == ErrorCode.OK @@ -116,7 +129,9 @@ def part1(infinity_obj): res = table_obj.add_columns({"c5": {"type": "int", "default": 0}}) assert res.error_code == ErrorCode.OK - time.sleep(2) # wait for delta log flush + if sleep > 0: + print(f"sleep {sleep} seconds") + time.sleep(sleep) # wait for delta log flush @decorator def part2(infinity_obj): @@ -143,11 +158,15 @@ def part2(infinity_obj): } ), ) + dropped_column_dirs = pathlib.Path("/var/infinity/data").rglob("1.col") + assert len(list(dropped_column_dirs)) == 0 + + db_obj.drop_table(table_name) part1() part2() - def test_alter_cleanup(self, infinity_runner: InfinityRunner): + def test_alter_cleanup_simple(self, infinity_runner: InfinityRunner): table_name = "test_alter3" config = "test/data/config/restart_test/test_alter/3.toml" diff --git a/python/test_pysdk/test_alter.py b/python/test_pysdk/test_alter.py index a9d7baee53..be52eb233e 100644 --- a/python/test_pysdk/test_alter.py +++ b/python/test_pysdk/test_alter.py @@ -144,6 +144,84 @@ def test_simple_drop_columns(self): db_obj.drop_table(table_name) + def test_insert_after_drop_columns(self): + table_name = "testing_table" + self.suffix + db_obj = self.infinity_obj.get_database("default_db") + + db_obj.drop_table(table_name, infinity.common.ConflictType.Ignore) + + table_obj = db_obj.create_table( + table_name, + { + "num": {"type": "integer"}, + "body": {"type": "varchar"}, + "vec": {"type": "vector,4,float"}, + }, + ) + + table_obj.add_columns( + { + "column_name1": {"type": "integer", "default": 0}, + "column_name2": {"type": "float", "default": 0.0}, + "column_name3": {"type": "varchar", "default": ""}, + } + ) + table_obj.drop_columns(["column_name1"]) + table_obj.insert( + [ + { + "num": 1, + "body": "unnecessary and harmful", + "vec": [1.0, 1.2, 0.8, 0.9], + }, + { + "num": 2, + "body": "Office for Harmful Blooms", + "vec": [4.0, 4.2, 4.3, 4.5], + }, + { + "num": 3, + "body": "A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.", + "vec": [4.0, 4.2, 4.3, 4.2], + }, + ] + ) + + result = table_obj.output(["*"]).to_df() + print(result) + pd.testing.assert_frame_equal( + result, + pd.DataFrame( + { + "num": [1, 2, 3], + "body": [ + "unnecessary and harmful", + "Office for Harmful Blooms", + "A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.", + ], + "vec": [ + [1.0, 1.2, 0.8, 0.9], + [4.0, 4.2, 4.3, 4.5], + [4.0, 4.2, 4.3, 4.2], + ], + "column_name2": [0.0, 0.0, 0.0], + "column_name3": ["", "", ""], + } + ).astype( + { + "num": dtype("int32"), + "body": dtype("object"), + "vec": dtype("object"), + "column_name2": dtype("float32"), + "column_name3": dtype("object"), + } + ), + ) + db_obj.drop_table(table_name) + self.infinity_obj.disconnect() + + print("test done") + def test_add_drop_column_with_index(self): table_name = "test_add_drop_column_with_index" + self.suffix db_obj = self.infinity_obj.get_database("default_db") @@ -194,4 +272,4 @@ def test_add_drop_column_with_index(self): ), ) - db_obj.drop_table(table_name) \ No newline at end of file + db_obj.drop_table(table_name) diff --git a/src/executor/operator/physical_alter.cpp b/src/executor/operator/physical_alter.cpp index 12ba0f0d90..128825174c 100644 --- a/src/executor/operator/physical_alter.cpp +++ b/src/executor/operator/physical_alter.cpp @@ -25,17 +25,7 @@ import txn; import status; import infinity_exception; import value; -import bind_context; -import value_expression; -import expression_binder; import defer_op; -import cast_function; -import bound_cast_func; -import base_expression; -import cast_expression; -import expression_evaluator; -import column_vector; -import expression_state; namespace infinity { @@ -61,35 +51,7 @@ bool PhysicalAddColumns::Execute(QueryContext *query_context, OperatorState *ope Txn *txn = query_context->GetTxn(); - ExpressionBinder tmp_binder(nullptr); - Vector values; - for (const auto &column_def : column_defs_) { - if (!column_def->has_default_value()) { - UnrecoverableError(fmt::format("Column {} has no default value", column_def->name())); - } - SharedPtr default_expr = column_def->default_value(); - auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false); - auto *value_expr = static_cast(expr.get()); - - const SharedPtr &column_type = column_def->type(); - if (value_expr->Type() == *column_type) { - values.push_back(value_expr->GetValue()); - } else { - const SharedPtr &column_type = column_def->type(); - - BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *column_type); - SharedPtr cast_expr = MakeShared(cast, expr, *column_type); - SharedPtr expr_state = ExpressionState::CreateState(cast_expr); - SharedPtr output_column_vector = ColumnVector::Make(column_type); - output_column_vector->Initialize(ColumnVectorType::kConstant, 1); - ExpressionEvaluator evaluator; - evaluator.Init(nullptr); - evaluator.Execute(cast_expr, expr_state, output_column_vector); - - values.push_back(output_column_vector->GetValue(0)); - } - } - auto status = txn->AddColumns(table_entry_, column_defs_, values); + auto status = txn->AddColumns(table_entry_, column_defs_); if (!status.ok()) { RecoverableError(status); } diff --git a/src/storage/buffer/buffer_obj.cpp b/src/storage/buffer/buffer_obj.cpp index 86f7ae4b22..51a857bd3f 100644 --- a/src/storage/buffer/buffer_obj.cpp +++ b/src/storage/buffer/buffer_obj.cpp @@ -272,7 +272,11 @@ void BufferObj::SubObjRc() { if (obj_rc_ == 0) { UnrecoverableError(fmt::format("SubObjRc: obj_rc_ is 0, buffer: {}", GetFilename())); } - obj_rc_--; + --obj_rc_; + if (obj_rc_ == 0) { + status_ = BufferStatus::kClean; + buffer_mgr_->AddToCleanList(this, false/*do_free*/); + } } void BufferObj::CheckState() const { diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 5b68f7084a..e47cf48fe3 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -629,7 +629,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe auto db_name = String(decodes[0]); auto table_name = MakeShared(decodes[1]); const auto &table_entry_dir = add_table_entry_op->table_entry_dir_; - auto column_defs = add_table_entry_op->column_defs_; + const auto &column_defs = add_table_entry_op->column_defs_; auto entry_type = add_table_entry_op->table_entry_type_; auto row_count = add_table_entry_op->row_count_; SegmentID unsealed_id = add_table_entry_op->unsealed_id_; @@ -792,9 +792,21 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe auto *table_entry = db_entry->GetTableReplay(table_name, txn_id, begin_ts); auto *segment_entry = table_entry->segment_map_.at(segment_id).get(); auto *block_entry = segment_entry->GetBlockEntryByID(block_id).get(); - block_entry->AddColumnReplay( - BlockColumnEntry::NewReplayBlockColumnEntry(block_entry, column_id, buffer_mgr, next_outline_idx, last_chunk_offset, commit_ts), - column_id); + if (merge_flag == MergeFlag::kDelete) { + block_entry->DropColumnReplay(column_id); + } else if (merge_flag == MergeFlag::kNew) { + block_entry->AddColumnReplay(BlockColumnEntry::NewReplayBlockColumnEntry(block_entry, + column_id, + buffer_mgr, + next_outline_idx, + last_chunk_offset, + commit_ts), + column_id); + } else if (merge_flag == MergeFlag::kUpdate) { + // do nothing + } else { + UnrecoverableError(fmt::format("Unsupported merge flag {} for column entry {}", (i8)merge_flag, column_id)); + } break; } diff --git a/src/storage/meta/entry/block_column_entry.cpp b/src/storage/meta/entry/block_column_entry.cpp index ee818e9c9d..e939fc99fb 100644 --- a/src/storage/meta/entry/block_column_entry.cpp +++ b/src/storage/meta/entry/block_column_entry.cpp @@ -278,7 +278,7 @@ void BlockColumnEntry::FlushColumn(TxnTimeStamp checkpoint_ts) { BlockColumnEntry::Flush(this, 0, row_cnt); } -void BlockColumnEntry::DropColumn() { +void BlockColumnEntry::DropColumn() { buffer_->SubObjRc(); for (auto *outline_buffer : outline_buffers_) { outline_buffer->SubObjRc(); diff --git a/src/storage/meta/entry/block_entry.cpp b/src/storage/meta/entry/block_entry.cpp index a547004845..b8c4d86567 100644 --- a/src/storage/meta/entry/block_entry.cpp +++ b/src/storage/meta/entry/block_entry.cpp @@ -89,6 +89,7 @@ UniquePtr BlockEntry::Clone(SegmentEntry *segment_entry) const { UniquePtr BlockEntry::NewBlockEntry(const SegmentEntry *segment_entry, BlockID block_id, TxnTimeStamp checkpoint_ts, u64 column_count, Txn *txn) { + const TableEntry *table_entry = segment_entry->GetTableEntry(); auto block_entry = MakeUnique(segment_entry, block_id, checkpoint_ts); block_entry->begin_ts_ = txn->BeginTS(); @@ -96,7 +97,9 @@ BlockEntry::NewBlockEntry(const SegmentEntry *segment_entry, BlockID block_id, T block_entry->block_dir_ = BlockEntry::DetermineDir(*segment_entry->segment_dir(), block_id); block_entry->columns_.reserve(column_count); - for (SizeT column_id = 0; column_id < column_count; ++column_id) { + for (SizeT column_idx = 0; column_idx < column_count; ++column_idx) { + const SharedPtr column_def = table_entry->column_defs()[column_idx]; + ColumnID column_id = column_def->id(); auto column_entry = BlockColumnEntry::NewBlockColumnEntry(block_entry.get(), column_id, txn); block_entry->columns_.emplace_back(std::move(column_entry)); } @@ -594,6 +597,17 @@ void BlockEntry::AddColumnReplay(UniquePtr column_entry, Colum } } +void BlockEntry::DropColumnReplay(ColumnID column_id) { + auto iter = std::find_if(columns_.begin(), columns_.end(), [&](const auto &column) { return column->column_id() == column_id; }); + if (iter == columns_.end()) { + String error_message = fmt::format("BlockEntry::AddColumnReplay: column_id {} not found", column_id); + UnrecoverableError(error_message); + } + BlockColumnEntry *entry = iter->get(); + entry->DropColumn(); + columns_.erase(iter); +} + void BlockEntry::AppendBlock(const Vector &column_vectors, SizeT row_begin, SizeT read_size, BufferManager *buffer_mgr) { if (read_size + block_row_count_ > row_capacity_) { String error_message = "BlockEntry::AppendBlock: read_size + row_count_ > row_capacity_"; diff --git a/src/storage/meta/entry/block_entry.cppm b/src/storage/meta/entry/block_entry.cppm index e53c1d9883..43798056c4 100644 --- a/src/storage/meta/entry/block_entry.cppm +++ b/src/storage/meta/entry/block_entry.cppm @@ -95,6 +95,8 @@ public: void AddColumnReplay(UniquePtr column_entry, ColumnID column_id); + void DropColumnReplay(ColumnID column_id); + void AppendBlock(const Vector &column_vectors, SizeT row_begin, SizeT read_size, BufferManager *buffer_mgr); void Cleanup(CleanupInfoTracer *info_tracer = nullptr, bool dropped = true) override; @@ -146,7 +148,7 @@ public: // Relative to the `data_dir` config item const SharedPtr &block_dir() const { return block_dir_; } - BlockColumnEntry *GetColumnBlockEntry(SizeT column_id) const { return columns_[column_id].get(); } + BlockColumnEntry *GetColumnBlockEntry(SizeT column_idx) const { return columns_[column_idx].get(); } FastRoughFilter *GetFastRoughFilter() { return fast_rough_filter_.get(); } diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 6dd24ce006..d4ec74ce77 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -58,6 +58,16 @@ import infinity_context; import persistence_manager; import bg_task; import defer_op; +import bind_context; +import value_expression; +import expression_binder; +import cast_function; +import bound_cast_func; +import base_expression; +import cast_expression; +import expression_evaluator; +import column_vector; +import expression_state; namespace infinity { @@ -256,6 +266,7 @@ void TableEntry::UpdateEntryReplay(const SharedPtr &table_entry) { txn_id_ = table_entry->txn_id_; begin_ts_ = table_entry->begin_ts_; commit_ts_.store(table_entry->commit_ts_); + columns_ = table_entry->columns_; row_count_ = table_entry->row_count(); unsealed_id_ = table_entry->unsealed_id(); next_segment_id_ = table_entry->next_segment_id(); @@ -1222,7 +1233,7 @@ UniquePtr TableEntry::Deserialize(const nlohmann::json &table_entry_ SegmentID unsealed_id = table_entry_json["unsealed_id"]; SegmentID next_segment_id = table_entry_json["next_segment_id"]; - if(!table_entry_json.contains("next_column_id")) { + if (!table_entry_json.contains("next_column_id")) { String error_message = "No 'next_column_id in table entry of catalog file, maybe your catalog is generated before 0.4.0.'"; UnrecoverableError(error_message); } @@ -1451,7 +1462,35 @@ void TableEntry::SetUnlock() { locked_ = false; } -void TableEntry::AddColumns(const Vector> &column_defs, const Vector &default_values, TxnTableStore *txn_table_store) { +void TableEntry::AddColumns(const Vector> &column_defs, TxnTableStore *txn_table_store) { + ExpressionBinder tmp_binder(nullptr); + Vector default_values; + for (const auto &column_def : column_defs) { + if (!column_def->has_default_value()) { + UnrecoverableError(fmt::format("Column {} has no default value", column_def->name())); + } + SharedPtr default_expr = column_def->default_value(); + auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false); + auto *value_expr = static_cast(expr.get()); + + const SharedPtr &column_type = column_def->type(); + if (value_expr->Type() == *column_type) { + default_values.push_back(value_expr->GetValue()); + } else { + const SharedPtr &column_type = column_def->type(); + + BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *column_type); + SharedPtr cast_expr = MakeShared(cast, expr, *column_type); + SharedPtr expr_state = ExpressionState::CreateState(cast_expr); + SharedPtr output_column_vector = ColumnVector::Make(column_type); + output_column_vector->Initialize(ColumnVectorType::kConstant, 1); + ExpressionEvaluator evaluator; + evaluator.Init(nullptr); + evaluator.Execute(cast_expr, expr_state, output_column_vector); + + default_values.push_back(output_column_vector->GetValue(0)); + } + } Vector> columns_info; for (SizeT idx = 0; idx < column_defs.size(); ++idx) { const auto &column_def = column_defs[idx]; diff --git a/src/storage/meta/entry/table_entry.cppm b/src/storage/meta/entry/table_entry.cppm index db7f43bf5d..83220cab66 100644 --- a/src/storage/meta/entry/table_entry.cppm +++ b/src/storage/meta/entry/table_entry.cppm @@ -354,7 +354,7 @@ private: SizeT write_txn_num_ = 0; public: - void AddColumns(const Vector> &columns, const Vector &default_values, TxnTableStore *txn_store); + void AddColumns(const Vector> &columns, TxnTableStore *txn_store); void DropColumns(const Vector &column_names, TxnTableStore *txn_store); }; diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 747fa0dc76..c82285db67 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -87,7 +87,7 @@ Status Txn::Import(TableEntry *table_entry, SharedPtr segment_entr // build WalCmd WalSegmentInfo segment_info(segment_entry.get()); - AddWalCmd(MakeShared(db_name, table_name, std::move(segment_info))); + wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, std::move(segment_info))); TxnTableStore *table_store = this->GetTxnTableStore(table_entry); table_store->Import(std::move(segment_entry), this); @@ -102,7 +102,7 @@ Status Txn::Append(TableEntry *table_entry, const SharedPtr &input_bl this->CheckTxn(db_name); TxnTableStore *table_store = this->GetTxnTableStore(table_entry); - AddWalCmd(MakeShared(db_name, table_name, input_block)); + wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, input_block)); auto [err_msg, append_status] = table_store->Append(input_block); return append_status; } @@ -120,7 +120,7 @@ Status Txn::Delete(TableEntry *table_entry, const Vector &row_ids, bool c TxnTableStore *table_store = this->GetTxnTableStore(table_entry); - AddWalCmd(MakeShared(db_name, table_name, row_ids)); + wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, row_ids)); auto [err_msg, delete_status] = table_store->Delete(row_ids); return delete_status; } @@ -141,7 +141,7 @@ Status Txn::OptIndex(TableIndexEntry *table_index_entry, VectorGetTableName(); table_index_entry->OptIndex(txn_table_store, init_params, false /*replay*/); - AddWalCmd(MakeShared(db_name_, table_name, index_name, std::move(init_params))); + wal_entry_->cmds_.push_back(MakeShared(db_name_, table_name, index_name, std::move(init_params))); return Status::OK(); } @@ -178,7 +178,7 @@ Status Txn::CreateDatabase(const String &db_name, ConflictType conflict_type) { } txn_store_.AddDBStore(db_entry); - AddWalCmd(MakeShared(std::move(db_name), db_entry->GetPathNameTail())); + wal_entry_->cmds_.push_back(MakeShared(std::move(db_name), db_entry->GetPathNameTail())); return Status::OK(); } @@ -192,7 +192,7 @@ Status Txn::DropDatabase(const String &db_name, ConflictType conflict_type) { } txn_store_.DropDBStore(dropped_db_entry.get()); - AddWalCmd(MakeShared(db_name)); + wal_entry_->cmds_.push_back(MakeShared(db_name)); return Status::OK(); } @@ -244,7 +244,7 @@ Status Txn::CreateTable(const String &db_name, const SharedPtr &table_ } txn_store_.AddTableStore(table_entry); - AddWalCmd(MakeShared(std::move(db_name), table_entry->GetPathNameTail(), table_def)); + wal_entry_->cmds_.push_back(MakeShared(std::move(db_name), table_entry->GetPathNameTail(), table_def)); LOG_TRACE("Txn::CreateTable created table entry is inserted."); return Status::OK(); @@ -255,7 +255,7 @@ Status Txn::RenameTable(TableEntry *old_table_entry, const String &new_table_nam return Status::OK(); } -Status Txn::AddColumns(TableEntry *table_entry, const Vector> &column_defs, const Vector &default_values) { +Status Txn::AddColumns(TableEntry *table_entry, const Vector> &column_defs) { TxnTimeStamp begin_ts = txn_context_.GetBeginTS(); auto [db_entry, db_status] = catalog_->GetDatabase(*table_entry->GetDBName(), txn_id_, begin_ts); @@ -264,13 +264,13 @@ Status Txn::AddColumns(TableEntry *table_entry, const Vector new_table_entry = table_entry->Clone(); TxnTableStore *txn_table_store = txn_store_.GetTxnTableStore(new_table_entry.get()); - new_table_entry->AddColumns(column_defs, default_values, txn_table_store); + new_table_entry->AddColumns(column_defs, txn_table_store); auto add_status = db_entry->AddTable(std::move(new_table_entry), txn_id_, begin_ts, txn_mgr_, true/*add_if_found*/); if (!add_status.ok()) { return add_status; } - AddWalCmd(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs)); + wal_entry_->cmds_.push_back(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_defs)); return Status::OK(); } @@ -289,7 +289,7 @@ Status Txn::DropColumns(TableEntry *table_entry, const Vector &column_na return drop_status; } - AddWalCmd(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names)); + wal_entry_->cmds_.push_back(MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), column_names)); return Status::OK(); } @@ -306,7 +306,7 @@ Status Txn::DropTableCollectionByName(const String &db_name, const String &table } txn_store_.DropTableStore(table_entry.get()); - AddWalCmd(MakeShared(db_name, table_name)); + wal_entry_->cmds_.push_back(MakeShared(db_name, table_name)); LOG_TRACE("Txn::DropTableCollectionByName dropped table entry is inserted."); return Status::OK(); @@ -324,7 +324,7 @@ Tuple Txn::CreateIndexDef(TableEntry *table_entry, co txn_table_store->AddIndexStore(table_index_entry); String index_dir_tail = table_index_entry->GetPathNameTail(); - AddWalCmd( + wal_entry_->cmds_.push_back( MakeShared(*table_entry->GetDBName(), *table_entry->GetTableName(), std::move(index_dir_tail), index_base)); return {table_index_entry, index_status}; } @@ -412,7 +412,7 @@ Status Txn::DropIndexByName(const String &db_name, const String &table_name, con auto *txn_table_store = this->GetTxnTableStore(table_entry); txn_table_store->DropIndexStore(table_index_entry.get()); - AddWalCmd(MakeShared(db_name, table_name, index_name)); + wal_entry_->cmds_.push_back(MakeShared(db_name, table_name, index_name)); return index_status; } @@ -583,13 +583,12 @@ void Txn::Rollback() { } void Txn::AddWalCmd(const SharedPtr &cmd) { - // std::lock_guard guard(txn_store_.mtx_); + std::lock_guard guard(txn_store_.mtx_); auto state = txn_context_.GetTxnState(); if (state != TxnState::kStarted) { auto begin_ts = BeginTS(); UnrecoverableError(fmt::format("Should add wal cmd in started state, begin_ts: {}", begin_ts)); } - // LOG_TRACE(fmt::format("Add wal cmd {} to txn {}", cmd->ToString(), txn_id_)); wal_entry_->cmds_.push_back(cmd); } @@ -601,7 +600,7 @@ bool Txn::DeltaCheckpoint(TxnTimeStamp last_ckp_ts, TxnTimeStamp &max_commit_ts) if (!catalog_->SaveDeltaCatalog(last_ckp_ts, max_commit_ts, delta_path, delta_name)) { return false; } - AddWalCmd(MakeShared(max_commit_ts, false, delta_path, delta_name)); + wal_entry_->cmds_.push_back(MakeShared(max_commit_ts, false, delta_path, delta_name)); return true; } @@ -611,7 +610,7 @@ void Txn::FullCheckpoint(const TxnTimeStamp max_commit_ts) { String full_path, full_name; catalog_->SaveFullCatalog(max_commit_ts, full_path, full_name); - AddWalCmd(MakeShared(max_commit_ts, true, full_path, full_name)); + wal_entry_->cmds_.push_back(MakeShared(max_commit_ts, true, full_path, full_name)); } void Txn::AddWriteTxnNum(TableEntry *table_entry) { diff --git a/src/storage/txn/txn.cppm b/src/storage/txn/txn.cppm index 8c3292dfe2..7c91a099f2 100644 --- a/src/storage/txn/txn.cppm +++ b/src/storage/txn/txn.cppm @@ -123,7 +123,7 @@ public: Status RenameTable(TableEntry *old_table_entry, const String &new_table_name); - Status AddColumns(TableEntry *table_entry, const Vector> &column_defs, const Vector &default_values); + Status AddColumns(TableEntry *table_entry, const Vector> &column_defs); Status DropColumns(TableEntry *table_entry, const Vector &column_names); diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index 258e0cdfa7..80b1bb18f2 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -923,7 +923,10 @@ void AddColumnEntryOp::Merge(CatalogDeltaOperation &other) { String error_message = fmt::format("Merge failed, other type: {}", other.GetTypeStr()); UnrecoverableError(error_message); } - *this = std::move(static_cast(other)); + auto &add_column_op = static_cast(other); + MergeFlag flag = this->NextDeleteFlag(add_column_op.merge_flag_); + *this = std::move(add_column_op); + this->merge_flag_ = flag; } void AddTableIndexEntryOp::Merge(CatalogDeltaOperation &other) { diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index 486c2184bf..f23c503354 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -55,11 +55,6 @@ import default_values; import defer_op; import index_base; import base_table_ref; -import constant_expr; -import bind_context; -import value_expression; -import expression_binder; -import value; module wal_manager; @@ -1235,20 +1230,7 @@ void WalManager::WalCmdAddColumnsReplay(WalCmdAddColumns &cmd, TransactionID txn SharedPtr new_table_entry = table_entry->Clone(); auto fake_txn = Txn::NewReplayTxn(storage_->buffer_manager(), storage_->txn_manager(), storage_->catalog(), txn_id, commit_ts); auto *txn_table_store = fake_txn->GetTxnTableStore(table_entry); - Vector default_values; - ExpressionBinder tmp_binder(nullptr); - for (const auto &column_def : cmd.column_defs_) { - if (!column_def->has_default_value()) { - UnrecoverableError(fmt::format("Wal Replay: Add column {} without default value", column_def->name())); - } - - auto default_expr = column_def->default_value(); - auto expr = tmp_binder.BuildValueExpr(*default_expr, nullptr, 0, false); - auto value_expr = std::dynamic_pointer_cast(expr); - - default_values.push_back(value_expr->GetValue()); - } - new_table_entry->AddColumns(cmd.column_defs_, default_values, txn_table_store); + new_table_entry->AddColumns(cmd.column_defs_, txn_table_store); new_table_entry->commit_ts_ = commit_ts; db_entry->CreateTableReplay( table_entry->GetTableName(), diff --git a/test/data/config/restart_test/test_alter/1.toml b/test/data/config/restart_test/test_alter/1.toml index 5653818e8b..d80908845e 100644 --- a/test/data/config/restart_test/test_alter/1.toml +++ b/test/data/config/restart_test/test_alter/1.toml @@ -5,6 +5,7 @@ time_zone = "utc-8" [network] [log] log_to_stdout = true +log_level = "trace" [storage] data_dir = "/var/infinity/data" diff --git a/test/data/config/restart_test/test_alter/2.toml b/test/data/config/restart_test/test_alter/2.toml index 23b77f84dd..0a5d439ba9 100644 --- a/test/data/config/restart_test/test_alter/2.toml +++ b/test/data/config/restart_test/test_alter/2.toml @@ -5,6 +5,7 @@ time_zone = "utc-8" [network] [log] log_to_stdout = true +log_level = "trace" [storage] data_dir = "/var/infinity/data" diff --git a/test/sql/ddl/alter/drop_columns.slt b/test/sql/ddl/alter/drop_columns.slt index 5d89441d18..b091d37c07 100644 --- a/test/sql/ddl/alter/drop_columns.slt +++ b/test/sql/ddl/alter/drop_columns.slt @@ -33,6 +33,6 @@ SELECT * FROM table1; 1 test 2 test2 -# statement ok -# DROP TABLE table1; +statement ok +DROP TABLE table1;