Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20260425-25.12.10.7
CH_COMMIT=54c5bf9a97b
CH_BRANCH=rebase_ch/20260425-25.12.10.7-complextype
CH_COMMIT=b1510a2
15 changes: 11 additions & 4 deletions cpp-ch/local-engine/Parser/ExpressionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,13 @@ ExpressionParser::addConstColumn(DB::ActionsDAG & actions_dag, const DB::DataTyp
{
String name = DB::fieldToString(field).substr(0, 10);
name = getUniqueName(name);
const auto * res_node = &actions_dag.addColumn(DB::ColumnWithTypeAndName(type->createColumnConst(1, field), type, name));
/// Substrait null literals carry a concrete type; CH `createColumnConst` inserts into the nested column (e.g. ColumnArray),
/// which cannot accept `Field::Null` unless the type is Nullable(...).
DB::DataTypePtr const_type = type;
if (field.isNull() && !type->isNullable())
const_type = makeNullable(type);
const auto * res_node = &actions_dag.addColumn(
DB::ColumnWithTypeAndName(const_type->createColumnConst(1, field), const_type, name));
if (reuseCSE())
{
// The new node, res_node will be remained in the ActionsDAG, but it will not affect the execution.
Expand Down Expand Up @@ -307,6 +313,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
const auto & input_type = args[0]->result_type;
DataTypePtr denull_input_type = removeNullable(input_type);
DataTypePtr output_type = TypeParser::parseType(substrait_type);
DataTypePtr cast_output_type = input_type->isNullable() && !output_type->isNullable() ? makeNullable(output_type) : output_type;
DataTypePtr denull_output_type = removeNullable(output_type);
const ActionsDAG::Node * result_node = nullptr;
if (substrait_type.has_binary())
Expand Down Expand Up @@ -351,7 +358,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
else if (isString(denull_input_type) && substrait_type.has_bool_())
{
/// cast(string to boolean)
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "accurateCastOrNull", args);
}
else if (isString(denull_input_type) && isInt(denull_output_type))
Expand All @@ -360,13 +367,13 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
/// Refer to https://github.com/apache/gluten/issues/4956 and https://github.com/apache/gluten/issues/8598
const auto * trim_str_arg = addConstColumn(actions_dag, std::make_shared<DataTypeString>(), " \t\n\r\f");
args[0] = toFunctionNode(actions_dag, "trimBothSpark", {args[0], trim_str_arg});
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "CAST", args);
}
else
{
/// Common process: CAST(input, type)
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "CAST", args);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), std::move(source_pipe), "substrait local files");
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType)
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex)
source_step->setStepDescription("ParquetReaderV3");
else
source_step->setStepDescription("ParquetReader");
Expand Down
3 changes: 3 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <Core/Names.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/IDataType.h>
#include <Functions/FunctionFactory.h>
Expand Down Expand Up @@ -148,6 +149,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con
const auto & origin_column = origin_columns[i];
const auto & origin_type = origin_column.type;
auto final_type = TypeParser::parseType(output_schema.types(i));
if (origin_type->isNullable() && !final_type->isNullable())
final_type = makeNullable(final_type);

/// Intermediate aggregate data is special, no check here.
if (typeid_cast<const DataTypeAggregateFunction *>(origin_column.type.get()) || origin_type->equals(*final_type))
Expand Down
9 changes: 8 additions & 1 deletion cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr>
columns[i]->insert(spark_row_reader.getField(i)); // read decimal128
}
else
columns[i]->insert(spark_row_reader.getField(i));
{
DB::Field field = spark_row_reader.getField(i);
/// Spark UnsafeRow marks null top-level values; CH non-Nullable columns cannot insert Null (e.g. Array/Map/Tuple).
if (field.isNull() && !spark_row_reader.getFieldTypes()[i]->isNullable())
columns[i]->insertDefault();
else
columns[i]->insert(std::move(field));
}
}
}

Expand Down
22 changes: 15 additions & 7 deletions cpp-ch/local-engine/Parser/TypeParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ DB::DataTypePtr TypeParser::getCHTypeByName(const String & spark_type_name)
return DB::DataTypeFactory::instance().get(ch_type_name);
}

DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, std::list<String> * field_names)
DB::DataTypePtr TypeParser::parseType(
const substrait::Type & substrait_type, std::list<String> * field_names, bool keep_list_nullability)
{
DB::DataTypePtr ch_type = nullptr;

Expand Down Expand Up @@ -185,14 +186,14 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
for (int i = 0; i < types.size(); ++i)
{
struct_field_names.push_back(field_names->front());
struct_field_types[i] = parseType(types[i], field_names);
struct_field_types[i] = parseType(types[i], field_names, keep_list_nullability);
}
}
else
{
/// Construct CH tuple type without DFS rule.
for (int i = 0; i < types.size(); ++i)
struct_field_types[i] = parseType(types[i]);
struct_field_types[i] = parseType(types[i], nullptr, keep_list_nullability);

const auto & names = substrait_type.struct_().names();
for (const auto & name : names)
Expand All @@ -209,9 +210,16 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
}
else if (substrait_type.has_list())
{
auto ch_nested_type = parseType(substrait_type.list().type());
auto ch_nested_type = parseType(substrait_type.list().type(), nullptr, true);
ch_type = std::make_shared<DB::DataTypeArray>(ch_nested_type);
ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type);
/// ClickHouse doesn't support Nullable(Array(...)) well in many execution paths.
/// In our parquet reader path, null arrays are represented as empty arrays (no null map).
/// So for top-level Substrait LIST, we intentionally drop outer nullability and keep Array(...).
///
/// Note: element nullability is still preserved via ch_nested_type; if the element is also a LIST,
/// its own nullability must be preserved to represent Array(Nullable(Array(...))).
if (keep_list_nullability)
ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type);
}
else if (substrait_type.has_map())
{
Expand All @@ -223,8 +231,8 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
}
else
{
auto ch_key_type = parseType(substrait_type.map().key());
auto ch_val_type = parseType(substrait_type.map().value());
auto ch_key_type = parseType(substrait_type.map().key(), nullptr, keep_list_nullability);
auto ch_val_type = parseType(substrait_type.map().value(), nullptr, keep_list_nullability);
ch_type = std::make_shared<DB::DataTypeMap>(ch_key_type, ch_val_type);
ch_type = tryWrapNullable(substrait_type.map().nullability(), ch_type);
}
Expand Down
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/Parser/TypeParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ namespace local_engine
static DB::DataTypePtr getCHTypeByName(const String& spark_type_name);

/// When parsing named structure, we need the field names.
static DB::DataTypePtr parseType(const substrait::Type& substrait_type, std::list<String>* field_names);
static DB::DataTypePtr
parseType(const substrait::Type& substrait_type, std::list<String>* field_names, bool keep_list_nullability = false);

inline static DB::DataTypePtr parseType(const substrait::Type& substrait_type)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr

// TODO: rebase-25.12, support complex types when there is a nullable type
// for example: parquet type is Array, requested type is Nullable(Array(Nullable(String)))
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType)
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex)
{
LOG_TRACE(
&Poco::Logger::get("ParquetFormatFile"),
Expand Down
Loading