Skip to content

Commit

Permalink
*: Refine MutableSupport and DeltaMergeDefines (#9746)
Browse files Browse the repository at this point in the history
ref #9673
  • Loading branch information
JinheLin authored Dec 30, 2024
1 parent 9836c27 commit c807718
Show file tree
Hide file tree
Showing 99 changed files with 814 additions and 744 deletions.
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace DB
Block AddExtraTableIDColumnTransformAction::buildHeader(const Block & inner_header_, int extra_table_id_index)
{
auto header = inner_header_.cloneEmpty();
if (extra_table_id_index != InvalidColumnID)
if (extra_table_id_index != MutSup::invalid_col_id)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{
Expand Down Expand Up @@ -67,7 +67,7 @@ bool AddExtraTableIDColumnTransformAction::transform(Block & block, TableID phys
if (unlikely(!block))
return true;

if (extra_table_id_index != InvalidColumnID)
if (extra_table_id_index != MutSup::invalid_col_id)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <IO/WriteHelpers.h>
#include <Storages/FormatVersion.h>

#include <magic_enum.hpp>

#if __SSE2__
#include <emmintrin.h>
#endif
Expand Down Expand Up @@ -517,4 +519,14 @@ String DataTypeString::getNullableDefaultName()
return fmt::format("Nullable({})", getDefaultName());
}

std::span<const std::pair<String, DataTypePtr>> DataTypeString::getTiDBPkColumnStringNameAndTypes()
{
static const auto name_and_types = std::array{
std::make_pair(NameV2, DataTypeFactory::instance().getOrSet(NameV2)),
std::make_pair(LegacyName, DataTypeFactory::instance().getOrSet(LegacyName)),
};
// Minus one for ignoring SerdesFormat::None.
static_assert(magic_enum::enum_count<SerdesFormat>() - 1 == name_and_types.size());
return name_and_types;
}
} // namespace DB
5 changes: 4 additions & 1 deletion dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

#pragma once

#include <DataTypes/IDataType.h>

#include <span>

namespace DB
{
class DataTypeString final : public IDataType
Expand Down Expand Up @@ -98,6 +99,8 @@ class DataTypeString final : public IDataType
static String getDefaultName();
static String getNullableDefaultName();

static std::span<const std::pair<String, DataTypePtr>> getTiDBPkColumnStringNameAndTypes();

explicit DataTypeString(SerdesFormat serdes_fmt_ = SerdesFormat::None);

private:
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -257,7 +257,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand All @@ -272,7 +272,7 @@ try

auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -373,7 +373,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand All @@ -391,7 +391,7 @@ try

auto storage = db2->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -581,7 +581,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand All @@ -597,7 +597,7 @@ try

auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -632,7 +632,7 @@ try
// Get storage from database
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -744,7 +744,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -793,7 +793,7 @@ try
const auto * tbl_name = "t_45";
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/MockExecutor/TableScanBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TableID TableScanBinder::getTableId() const
void TableScanBinder::setTipbColumnInfo(tipb::ColumnInfo * ci, const DAGColumnInfo & dag_column_info) const
{
auto names = splitQualifiedName(dag_column_info.first);
if (names.column_name == MutableSupport::tidb_pk_column_name)
if (names.column_name == MutSup::extra_handle_column_name)
ci->set_column_id(-1);
else
ci->set_column_id(table_info.getColumnID(names.column_name));
Expand Down Expand Up @@ -142,10 +142,10 @@ ExecutorBinderPtr compileTableScan(
{
TiDB::ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.id = TiDBPkColumnID;
ci.id = MutSup::extra_handle_id;
ci.setPriKeyFlag();
ci.setNotNullFlag();
ts_output.emplace_back(std::make_pair(MutableSupport::tidb_pk_column_name, std::move(ci)));
ts_output.emplace_back(std::make_pair(MutSup::extra_handle_column_name, std::move(ci)));
}

return std::make_shared<mock::TableScanBinder>(executor_index, ts_output, table_info, keep_order);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/dbgQueryCompiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ std::pair<ExecutorBinderPtr, bool> compileQueryBlock(
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(expr.get()))
{
if (identifier->getColumnName() == MutableSupport::tidb_pk_column_name)
if (identifier->getColumnName() == MutSup::extra_handle_column_name)
{
append_pk_column = true;
}
Expand Down Expand Up @@ -179,7 +179,7 @@ std::pair<ExecutorBinderPtr, bool> compileQueryBlock(
if (auto * identifier = typeid_cast<ASTIdentifier *>(expr.get()))
{
auto [db_name, table_name, column_name] = splitQualifiedName(identifier->getColumnName());
if (column_name == MutableSupport::tidb_pk_column_name)
if (column_name == MutSup::extra_handle_column_name)
{
if (table_name.empty())
{
Expand Down Expand Up @@ -423,4 +423,4 @@ QueryTasks queryPlanToQueryTasks(
}
return tasks;
}
} // namespace DB
} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ std::tuple<bool, String> compareColumns(

for (const auto & column : columns)
{
// Exclude virtual columns, including EXTRA_HANDLE_COLUMN_ID, VERSION_COLUMN_ID,TAG_COLUMN_ID,EXTRA_TABLE_ID_COLUMN_ID
// Exclude virtual columns, including MutSup::extra_handle_id, MutSup::version_col_id,MutSup::delmark_col_id,MutSup::extra_table_id_col_id
if (column.id < 0)
{
continue;
Expand Down Expand Up @@ -1482,7 +1482,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableScan()
{
// Get handle column name.
String handle_column_name = MutableSupport::tidb_pk_column_name;
String handle_column_name = MutSup::extra_handle_column_name;
if (auto pk_handle_col = storage_for_logical_table->getTableInfo().getPKHandleColumn())
handle_column_name = pk_handle_col->get().name;

Expand All @@ -1504,10 +1504,10 @@ std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableSc
}
// Column ID -1 return the handle column
String name;
if (cid == TiDBPkColumnID)
if (cid == MutSup::extra_handle_id)
name = handle_column_name;
else if (cid == ExtraTableIDColumnID)
name = MutableSupport::extra_table_id_column_name;
else if (cid == MutSup::extra_table_id_col_id)
name = MutSup::extra_table_id_column_name;
else
name = storage_for_logical_table->getTableInfo().getColumnName(cid);
required_columns_tmp.emplace_back(std::move(name));
Expand Down
36 changes: 17 additions & 19 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ namespace
DataTypePtr getPkType(const TiDB::ColumnInfo & column_info)
{
const auto & pk_data_type = getDataTypeByColumnInfoForComputingLayer(column_info);
/// primary key type must be tidb_pk_column_int_type or tidb_pk_column_string_type.
/// primary key type must be getTiDBPkColumnIntType or getTiDBPkColumnStringType.
RUNTIME_CHECK(
pk_data_type->equals(*MutableSupport::tidb_pk_column_int_type)
|| pk_data_type->equals(*MutableSupport::tidb_pk_column_string_type),
pk_data_type->equals(*MutSup::getExtraHandleColumnIntType())
|| pk_data_type->equals(*MutSup::getExtraHandleColumnStringType()),
pk_data_type->getName(),
MutableSupport::tidb_pk_column_int_type->getName(),
MutableSupport::tidb_pk_column_string_type->getName());
MutSup::getExtraHandleColumnIntType()->getName(),
MutSup::getExtraHandleColumnStringType()->getName());
return pk_data_type;
}
} // namespace
Expand Down Expand Up @@ -70,13 +70,11 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str
const auto & column_info = column_infos[i];
switch (column_info.id)
{
case TiDBPkColumnID:
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getPkType(column_info));
case MutSup::extra_handle_id:
names_and_types.emplace_back(MutSup::extra_handle_column_name, getPkType(column_info));
break;
case ExtraTableIDColumnID:
names_and_types.emplace_back(
MutableSupport::extra_table_id_column_name,
MutableSupport::extra_table_id_column_type);
case MutSup::extra_table_id_col_id:
names_and_types.emplace_back(MutSup::extra_table_id_column_name, MutSup::getExtraTableIdColumnType());
break;
default:
names_and_types.emplace_back(
Expand All @@ -94,7 +92,7 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
{
auto column_defines = std::make_shared<DM::ColumnDefines>();
int extra_table_id_index = InvalidColumnID;
int extra_table_id_index = MutSup::invalid_col_id;
column_defines->reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
Expand All @@ -106,18 +104,18 @@ std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const
const auto output_name = genNameForExchangeReceiver(i);
switch (column_info.id)
{
case TiDBPkColumnID:
case MutSup::extra_handle_id:
column_defines->emplace_back(DM::ColumnDefine{
TiDBPkColumnID,
output_name, // MutableSupport::tidb_pk_column_name
MutSup::extra_handle_id,
output_name, // MutSup::extra_handle_column_name
getPkType(column_info)});
break;
case ExtraTableIDColumnID:
case MutSup::extra_table_id_col_id:
{
column_defines->emplace_back(DM::ColumnDefine{
ExtraTableIDColumnID,
output_name, // MutableSupport::extra_table_id_column_name
MutableSupport::extra_table_id_column_type});
MutSup::extra_table_id_col_id,
output_name, // MutSup::extra_table_id_column_name
MutSup::getExtraTableIdColumnType()});
extra_table_id_index = i;
break;
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ RemoteRequest RemoteRequest::build(
auto * mutable_table_scan = ts_exec->mutable_tbl_scan();
table_scan.constructTableScanForRemoteRead(mutable_table_scan, table_info.id);

String handle_column_name = MutableSupport::tidb_pk_column_name;
String handle_column_name = MutSup::extra_handle_column_name;
if (auto pk_handle_col = table_info.getPKHandleColumn())
handle_column_name = pk_handle_col->get().name;

Expand All @@ -53,19 +53,19 @@ RemoteRequest RemoteRequest::build(
const auto & col = table_scan.getColumns()[i];
auto col_id = col.id;

if (col_id == DB::TiDBPkColumnID)
if (col_id == MutSup::extra_handle_id)
{
TiDB::ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.setPriKeyFlag();
ci.setNotNullFlag();
schema.emplace_back(std::make_pair(handle_column_name, std::move(ci)));
}
else if (col_id == ExtraTableIDColumnID)
else if (col_id == MutSup::extra_table_id_col_id)
{
TiDB::ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
schema.emplace_back(std::make_pair(MutableSupport::extra_table_id_column_name, std::move(ci)));
schema.emplace_back(std::make_pair(MutSup::extra_table_id_column_name, std::move(ci)));
}
else
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/tests/gtest_trigger_pipeline_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataTypes/DataTypesNumber.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/WaitResult.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/tests/bench_aggregation_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Columns/ColumnDecimal.h>
#include <Common/Stopwatch.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDecimal.h>
#include <DataTypes/DataTypeString.h>
Expand Down Expand Up @@ -310,7 +311,7 @@ try
LOG_DEBUG(log, "build_side_watch: {}, hashmap size: {}", build_side_watch.elapsed(), data_variants->size());

std::vector<AggregatedDataVariantsPtr> variants{data_variants};
auto merging_buckets = aggregator->mergeAndConvertToBlocks(variants, /*final=*/true, /*max_thread=*/1);
auto merging_buckets = aggregator->mergeAndConvertToBlocks(variants, /*final=*/true, /*max_threads=*/1);
std::vector<Block> res_block;

Stopwatch probe_side_watch;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ void ExpressionAnalyzer::analyzeAggregation()
{
NameSet unique_keys;
ASTs & group_asts = select_query->group_expression_list->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
for (ssize_t i = 0; i < static_cast<ssize_t>(group_asts.size()); ++i)
{
ssize_t size = group_asts.size();
getRootActions(group_asts[i], true, false, temp_actions);
Expand Down Expand Up @@ -1037,7 +1037,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
if (storage && select_query && !select_query->raw_for_mutable)
{
// LOG_DEBUG(&Poco::Logger::get("ExpressionAnalyzer"), "Filter hidden columns for mutable table.");
filtered_names = MutableSupport::instance().hiddenColumns(storage->getName());
filtered_names = MutSup::instance().hiddenColumns(storage->getName());
}
}

Expand Down Expand Up @@ -1392,7 +1392,7 @@ void ExpressionAnalyzer::optimizeGroupBy()
}

select_query->group_expression_list = std::make_shared<ASTExpressionList>();
select_query->group_expression_list->children.emplace_back(std::make_shared<ASTLiteral>(UInt64(unused_column)));
select_query->group_expression_list->children.emplace_back(std::make_shared<ASTLiteral>(unused_column));
}
}

Expand Down
Loading

0 comments on commit c807718

Please sign in to comment.