Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Refine MutableSupport and DeltaMergeDefines #9746

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace DB
Block AddExtraTableIDColumnTransformAction::buildHeader(const Block & inner_header_, int extra_table_id_index)
{
auto header = inner_header_.cloneEmpty();
if (extra_table_id_index != InvalidColumnID)
if (extra_table_id_index != MutSup::invalid_col_id)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{
Expand Down Expand Up @@ -67,7 +67,7 @@ bool AddExtraTableIDColumnTransformAction::transform(Block & block, TableID phys
if (unlikely(!block))
return true;

if (extra_table_id_index != InvalidColumnID)
if (extra_table_id_index != MutSup::invalid_col_id)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <IO/WriteHelpers.h>
#include <Storages/FormatVersion.h>

#include <magic_enum.hpp>

#if __SSE2__
#include <emmintrin.h>
#endif
Expand Down Expand Up @@ -517,4 +519,14 @@ String DataTypeString::getNullableDefaultName()
return fmt::format("Nullable({})", getDefaultName());
}

std::span<const std::pair<String, DataTypePtr>> DataTypeString::getTiDBPkColumnStringNameAndTypes()
{
static const auto name_and_types = std::array{
std::make_pair(NameV2, DataTypeFactory::instance().getOrSet(NameV2)),
std::make_pair(LegacyName, DataTypeFactory::instance().getOrSet(LegacyName)),
};
// Minus one for ignoring SerdesFormat::None.
static_assert(magic_enum::enum_count<SerdesFormat>() - 1 == name_and_types.size());
return name_and_types;
}
} // namespace DB
5 changes: 4 additions & 1 deletion dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

#pragma once

#include <DataTypes/IDataType.h>

#include <span>

namespace DB
{
class DataTypeString final : public IDataType
Expand Down Expand Up @@ -98,6 +99,8 @@ class DataTypeString final : public IDataType
static String getDefaultName();
static String getNullableDefaultName();

static std::span<const std::pair<String, DataTypePtr>> getTiDBPkColumnStringNameAndTypes();

explicit DataTypeString(SerdesFormat serdes_fmt_ = SerdesFormat::None);

private:
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -257,7 +257,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand All @@ -272,7 +272,7 @@ try

auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -373,7 +373,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand All @@ -391,7 +391,7 @@ try

auto storage = db2->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -581,7 +581,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand All @@ -597,7 +597,7 @@ try

auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -632,7 +632,7 @@ try
// Get storage from database
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -744,7 +744,7 @@ try
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);

EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down Expand Up @@ -793,7 +793,7 @@ try
const auto * tbl_name = "t_45";
auto storage = db->tryGetTable(*ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getName(), MutSup::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/MockExecutor/TableScanBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TableID TableScanBinder::getTableId() const
void TableScanBinder::setTipbColumnInfo(tipb::ColumnInfo * ci, const DAGColumnInfo & dag_column_info) const
{
auto names = splitQualifiedName(dag_column_info.first);
if (names.column_name == MutableSupport::tidb_pk_column_name)
if (names.column_name == MutSup::extra_handle_column_name)
ci->set_column_id(-1);
else
ci->set_column_id(table_info.getColumnID(names.column_name));
Expand Down Expand Up @@ -142,10 +142,10 @@ ExecutorBinderPtr compileTableScan(
{
TiDB::ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.id = TiDBPkColumnID;
ci.id = MutSup::extra_handle_id;
ci.setPriKeyFlag();
ci.setNotNullFlag();
ts_output.emplace_back(std::make_pair(MutableSupport::tidb_pk_column_name, std::move(ci)));
ts_output.emplace_back(std::make_pair(MutSup::extra_handle_column_name, std::move(ci)));
}

return std::make_shared<mock::TableScanBinder>(executor_index, ts_output, table_info, keep_order);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Debug/dbgQueryCompiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ std::pair<ExecutorBinderPtr, bool> compileQueryBlock(
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(expr.get()))
{
if (identifier->getColumnName() == MutableSupport::tidb_pk_column_name)
if (identifier->getColumnName() == MutSup::extra_handle_column_name)
{
append_pk_column = true;
}
Expand Down Expand Up @@ -179,7 +179,7 @@ std::pair<ExecutorBinderPtr, bool> compileQueryBlock(
if (auto * identifier = typeid_cast<ASTIdentifier *>(expr.get()))
{
auto [db_name, table_name, column_name] = splitQualifiedName(identifier->getColumnName());
if (column_name == MutableSupport::tidb_pk_column_name)
if (column_name == MutSup::extra_handle_column_name)
{
if (table_name.empty())
{
Expand Down Expand Up @@ -423,4 +423,4 @@ QueryTasks queryPlanToQueryTasks(
}
return tasks;
}
} // namespace DB
} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ std::tuple<bool, String> compareColumns(

for (const auto & column : columns)
{
// Exclude virtual columns, including EXTRA_HANDLE_COLUMN_ID, VERSION_COLUMN_ID,TAG_COLUMN_ID,EXTRA_TABLE_ID_COLUMN_ID
// Exclude virtual columns, including MutSup::extra_handle_id, MutSup::version_col_id,MutSup::delmark_col_id,MutSup::extra_table_id_col_id
if (column.id < 0)
{
continue;
Expand Down Expand Up @@ -1482,7 +1482,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableScan()
{
// Get handle column name.
String handle_column_name = MutableSupport::tidb_pk_column_name;
String handle_column_name = MutSup::extra_handle_column_name;
if (auto pk_handle_col = storage_for_logical_table->getTableInfo().getPKHandleColumn())
handle_column_name = pk_handle_col->get().name;

Expand All @@ -1504,10 +1504,10 @@ std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableSc
}
// Column ID -1 return the handle column
String name;
if (cid == TiDBPkColumnID)
if (cid == MutSup::extra_handle_id)
name = handle_column_name;
else if (cid == ExtraTableIDColumnID)
name = MutableSupport::extra_table_id_column_name;
else if (cid == MutSup::extra_table_id_col_id)
name = MutSup::extra_table_id_column_name;
else
name = storage_for_logical_table->getTableInfo().getColumnName(cid);
required_columns_tmp.emplace_back(std::move(name));
Expand Down
36 changes: 17 additions & 19 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ namespace
DataTypePtr getPkType(const TiDB::ColumnInfo & column_info)
{
const auto & pk_data_type = getDataTypeByColumnInfoForComputingLayer(column_info);
/// primary key type must be tidb_pk_column_int_type or tidb_pk_column_string_type.
/// primary key type must be getTiDBPkColumnIntType or getTiDBPkColumnStringType.
RUNTIME_CHECK(
pk_data_type->equals(*MutableSupport::tidb_pk_column_int_type)
|| pk_data_type->equals(*MutableSupport::tidb_pk_column_string_type),
pk_data_type->equals(*MutSup::getExtraHandleColumnIntType())
|| pk_data_type->equals(*MutSup::getExtraHandleColumnStringType()),
pk_data_type->getName(),
MutableSupport::tidb_pk_column_int_type->getName(),
MutableSupport::tidb_pk_column_string_type->getName());
MutSup::getExtraHandleColumnIntType()->getName(),
MutSup::getExtraHandleColumnStringType()->getName());
return pk_data_type;
}
} // namespace
Expand Down Expand Up @@ -70,13 +70,11 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str
const auto & column_info = column_infos[i];
switch (column_info.id)
{
case TiDBPkColumnID:
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getPkType(column_info));
case MutSup::extra_handle_id:
names_and_types.emplace_back(MutSup::extra_handle_column_name, getPkType(column_info));
break;
case ExtraTableIDColumnID:
names_and_types.emplace_back(
MutableSupport::extra_table_id_column_name,
MutableSupport::extra_table_id_column_type);
case MutSup::extra_table_id_col_id:
names_and_types.emplace_back(MutSup::extra_table_id_column_name, MutSup::getExtraTableIdColumnType());
break;
default:
names_and_types.emplace_back(
Expand All @@ -94,7 +92,7 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
{
auto column_defines = std::make_shared<DM::ColumnDefines>();
int extra_table_id_index = InvalidColumnID;
int extra_table_id_index = MutSup::invalid_col_id;
column_defines->reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
Expand All @@ -106,18 +104,18 @@ std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const
const auto output_name = genNameForExchangeReceiver(i);
switch (column_info.id)
{
case TiDBPkColumnID:
case MutSup::extra_handle_id:
column_defines->emplace_back(DM::ColumnDefine{
TiDBPkColumnID,
output_name, // MutableSupport::tidb_pk_column_name
MutSup::extra_handle_id,
output_name, // MutSup::extra_handle_column_name
getPkType(column_info)});
break;
case ExtraTableIDColumnID:
case MutSup::extra_table_id_col_id:
{
column_defines->emplace_back(DM::ColumnDefine{
ExtraTableIDColumnID,
output_name, // MutableSupport::extra_table_id_column_name
MutableSupport::extra_table_id_column_type});
MutSup::extra_table_id_col_id,
output_name, // MutSup::extra_table_id_column_name
MutSup::getExtraTableIdColumnType()});
extra_table_id_index = i;
break;
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ RemoteRequest RemoteRequest::build(
auto * mutable_table_scan = ts_exec->mutable_tbl_scan();
table_scan.constructTableScanForRemoteRead(mutable_table_scan, table_info.id);

String handle_column_name = MutableSupport::tidb_pk_column_name;
String handle_column_name = MutSup::extra_handle_column_name;
if (auto pk_handle_col = table_info.getPKHandleColumn())
handle_column_name = pk_handle_col->get().name;

Expand All @@ -53,19 +53,19 @@ RemoteRequest RemoteRequest::build(
const auto & col = table_scan.getColumns()[i];
auto col_id = col.id;

if (col_id == DB::TiDBPkColumnID)
if (col_id == MutSup::extra_handle_id)
{
TiDB::ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.setPriKeyFlag();
ci.setNotNullFlag();
schema.emplace_back(std::make_pair(handle_column_name, std::move(ci)));
}
else if (col_id == ExtraTableIDColumnID)
else if (col_id == MutSup::extra_table_id_col_id)
{
TiDB::ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
schema.emplace_back(std::make_pair(MutableSupport::extra_table_id_column_name, std::move(ci)));
schema.emplace_back(std::make_pair(MutSup::extra_table_id_column_name, std::move(ci)));
}
else
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/tests/gtest_trigger_pipeline_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataTypes/DataTypesNumber.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/WaitResult.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/tests/bench_aggregation_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Columns/ColumnDecimal.h>
#include <Common/Stopwatch.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDecimal.h>
#include <DataTypes/DataTypeString.h>
Expand Down Expand Up @@ -310,7 +311,7 @@ try
LOG_DEBUG(log, "build_side_watch: {}, hashmap size: {}", build_side_watch.elapsed(), data_variants->size());

std::vector<AggregatedDataVariantsPtr> variants{data_variants};
auto merging_buckets = aggregator->mergeAndConvertToBlocks(variants, /*final=*/true, /*max_thread=*/1);
auto merging_buckets = aggregator->mergeAndConvertToBlocks(variants, /*final=*/true, /*max_threads=*/1);
std::vector<Block> res_block;

Stopwatch probe_side_watch;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ void ExpressionAnalyzer::analyzeAggregation()
{
NameSet unique_keys;
ASTs & group_asts = select_query->group_expression_list->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
for (ssize_t i = 0; i < static_cast<ssize_t>(group_asts.size()); ++i)
{
ssize_t size = group_asts.size();
getRootActions(group_asts[i], true, false, temp_actions);
Expand Down Expand Up @@ -1037,7 +1037,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
if (storage && select_query && !select_query->raw_for_mutable)
{
// LOG_DEBUG(&Poco::Logger::get("ExpressionAnalyzer"), "Filter hidden columns for mutable table.");
filtered_names = MutableSupport::instance().hiddenColumns(storage->getName());
filtered_names = MutSup::instance().hiddenColumns(storage->getName());
}
}

Expand Down Expand Up @@ -1392,7 +1392,7 @@ void ExpressionAnalyzer::optimizeGroupBy()
}

select_query->group_expression_list = std::make_shared<ASTExpressionList>();
select_query->group_expression_list->children.emplace_back(std::make_shared<ASTLiteral>(UInt64(unused_column)));
select_query->group_expression_list->children.emplace_back(std::make_shared<ASTLiteral>(unused_column));
}
}

Expand Down
Loading