Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.{BackendsApiManager, ValidatorApi}
import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution.ValidationResult
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.`type`.TypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.ExpressionNode
Expand Down Expand Up @@ -107,6 +108,7 @@ object VeloxValidatorApi {
private def isPrimitiveType(dataType: DataType): Boolean = {
val enableTimestampNtzValidation = VeloxConfig.get.enableTimestampNtzValidation
dataType match {
case dt if ConverterUtils.isSupportedTimeType(dt) => true
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
StringType | BinaryType | _: DecimalType | DateType | TimestampType |
YearMonthIntervalType.DEFAULT | NullType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.vectorized.{NativeColumnarToRowInfo, NativeColumnarToRowJniWrapper}
Expand All @@ -41,6 +42,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas
// Depending on the input type, VeloxColumnarToRowConverter.
for (field <- schema.fields) {
field.dataType match {
case dt if ConverterUtils.isSupportedTimeType(dt) =>
case _: BooleanType =>
case _: ByteType =>
case _: ShortType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class VeloxLiteralSuite extends VeloxWholeStageTransformerSuite {
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.ansi.enabled", "false")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.sql.sources.useV1SourceList", "avro")
}
Expand All @@ -56,6 +57,16 @@ class VeloxLiteralSuite extends VeloxWholeStageTransformerSuite {
}
}

def validateOffloadPlan(sql: String): Unit = {
val df = spark.sql(sql)
val plan = df.queryExecution.executedPlan
assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined, sql)
assert(plan.find(_.isInstanceOf[ProjectExec]).isEmpty, sql)
val wholeStageTransformers = plan.collect { case w: WholeStageTransformer => w }
assert(wholeStageTransformers.nonEmpty, sql)
wholeStageTransformers.foreach(_.nativePlanString())
}

test("Struct Literal") {
validateOffloadResult("SELECT struct('Spark', 5)")
validateOffloadResult("SELECT struct(7, struct(5, 'test'))")
Expand Down Expand Up @@ -135,6 +146,14 @@ class VeloxLiteralSuite extends VeloxWholeStageTransformerSuite {
validateOffloadResult("SELECT DATE'2020-12-31', DATE'2020-12-30'")
}

testWithMinSparkVersion("Time Literal", "4.1") {
// Spark 4.1 cannot collect TIME through Row encoders yet, so this validates planning and
// native plan conversion without comparing collected results.
validateOffloadPlan("SELECT TIME'12:34:56.123456', TIME'00:00:00'")
validateOffloadPlan("SELECT array(TIME'12:34:56.123456', TIME'00:00:00')")
validateOffloadPlan("SELECT struct(TIME'12:34:56.123456')")
}

test("Literal Fallback") {
validateFallbackResult("SELECT struct(cast(null as struct<a: string>))")
validateFallbackResult("SELECT array(struct(1, 'a'), null)")
Expand Down
160 changes: 156 additions & 4 deletions cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,172 @@
#include "memory/VeloxColumnarBatch.h"
#include "utils/Exception.h"
#include "velox/row/UnsafeRowFast.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/LazyVector.h"

using namespace facebook;

namespace gluten {
namespace {

constexpr int64_t kMicrosToNanos = 1000;

bool isTimeMicroUtc(const velox::TypePtr& type) {
return type->equivalent(*velox::TIME_MICRO_UTC());
}

bool containsTimeMicroUtc(const velox::TypePtr& type) {
if (isTimeMicroUtc(type)) {
return true;
}

switch (type->kind()) {
case velox::TypeKind::ARRAY:
return containsTimeMicroUtc(type->asArray().elementType());
case velox::TypeKind::MAP:
return containsTimeMicroUtc(type->asMap().keyType()) || containsTimeMicroUtc(type->asMap().valueType());
case velox::TypeKind::ROW: {
const auto& rowType = type->asRow();
for (const auto& child : rowType.children()) {
if (containsTimeMicroUtc(child)) {
return true;
}
}
return false;
}
default:
return false;
}
}

velox::VectorPtr normalizeTimeForSparkUnsafeRow(const velox::VectorPtr& vector, velox::memory::MemoryPool* pool);

velox::VectorPtr normalizeTimeScalarForSparkUnsafeRow(const velox::VectorPtr& vector, velox::memory::MemoryPool* pool) {
velox::DecodedVector decoded(*vector);
auto normalized = velox::BaseVector::create(velox::BIGINT(), vector->size(), pool);
auto* flat = normalized->asFlatVector<int64_t>();

for (auto row = 0; row < vector->size(); ++row) {
if (decoded.isNullAt(row)) {
flat->setNull(row, true);
} else {
flat->set(row, decoded.valueAt<int64_t>(row) * kMicrosToNanos);
}
}
return normalized;
}

velox::VectorPtr loadedFlatVector(const velox::VectorPtr& vector) {
auto loaded = velox::BaseVector::loadedVectorShared(vector);
velox::BaseVector::flattenVector(loaded);
if (loaded->isLazy()) {
loaded = loaded->as<velox::LazyVector>()->loadedVectorShared();
}
return loaded;
}

velox::VectorPtr normalizeArrayForSparkUnsafeRow(const velox::VectorPtr& vector, velox::memory::MemoryPool* pool) {
auto array = loadedFlatVector(vector)->as<velox::ArrayVector>();
auto elements = normalizeTimeForSparkUnsafeRow(array->elements(), pool);
if (elements == array->elements()) {
return vector;
}
return std::make_shared<velox::ArrayVector>(
pool,
velox::ARRAY(elements->type()),
array->nulls(),
array->size(),
array->offsets(),
array->sizes(),
elements,
array->getNullCount());
}

velox::VectorPtr normalizeMapForSparkUnsafeRow(const velox::VectorPtr& vector, velox::memory::MemoryPool* pool) {
auto map = loadedFlatVector(vector)->as<velox::MapVector>();
auto keys = normalizeTimeForSparkUnsafeRow(map->mapKeys(), pool);
auto values = normalizeTimeForSparkUnsafeRow(map->mapValues(), pool);
if (keys == map->mapKeys() && values == map->mapValues()) {
return vector;
}
return std::make_shared<velox::MapVector>(
pool,
velox::MAP(keys->type(), values->type()),
map->nulls(),
map->size(),
map->offsets(),
map->sizes(),
keys,
values,
map->getNullCount(),
map->hasSortedKeys());
}

velox::RowVectorPtr normalizeRowForSparkUnsafeRow(
const velox::RowVectorPtr& rowVector,
velox::memory::MemoryPool* pool) {
std::vector<velox::VectorPtr> children;
children.reserve(rowVector->childrenSize());
bool changed = false;
for (const auto& child : rowVector->children()) {
auto normalized = normalizeTimeForSparkUnsafeRow(child, pool);
changed = changed || normalized != child;
children.emplace_back(std::move(normalized));
}

if (!changed) {
return rowVector;
}

std::vector<velox::TypePtr> childTypes;
childTypes.reserve(children.size());
for (const auto& child : children) {
childTypes.emplace_back(child->type());
}
return std::make_shared<velox::RowVector>(
pool,
velox::ROW(velox::asRowType(rowVector->type())->names(), std::move(childTypes)),
rowVector->nulls(),
rowVector->size(),
std::move(children),
rowVector->getNullCount());
}

velox::VectorPtr normalizeTimeForSparkUnsafeRow(const velox::VectorPtr& vector, velox::memory::MemoryPool* pool) {
if (!containsTimeMicroUtc(vector->type())) {
return vector;
}

if (isTimeMicroUtc(vector->type())) {
return normalizeTimeScalarForSparkUnsafeRow(vector, pool);
}

switch (vector->typeKind()) {
case velox::TypeKind::ARRAY:
return normalizeArrayForSparkUnsafeRow(vector, pool);
case velox::TypeKind::MAP:
return normalizeMapForSparkUnsafeRow(vector, pool);
case velox::TypeKind::ROW:
return normalizeRowForSparkUnsafeRow(std::dynamic_pointer_cast<velox::RowVector>(loadedFlatVector(vector)), pool);
default:
return vector;
}
}

} // namespace

void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t startRow) {
auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();
rowVectorForUnsafeRow_ = normalizeRowForSparkUnsafeRow(rowVector, veloxPool_.get());

auto vectorLength = rowVectorForUnsafeRow_->size();
numCols_ = rowVectorForUnsafeRow_->childrenSize();

fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);
fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVectorForUnsafeRow_);

int64_t totalMemorySize;

if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVectorForUnsafeRow_->type()))) {
auto rowSize = fixedRowSize.value();
// make sure it has at least one row
numRows_ = std::max<int32_t>(1, std::min<int64_t>(memThreshold_ / rowSize, vectorLength - startRow));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class VeloxColumnarToRowConverter final : public ColumnarToRowConverter {
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::shared_ptr<facebook::velox::row::UnsafeRowFast> fast_;
facebook::velox::BufferPtr veloxBuffers_;
facebook::velox::RowVectorPtr rowVectorForUnsafeRow_;
int64_t memThreshold_;
};

Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/substrait/SubstraitParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ TypePtr SubstraitParser::parseType(const ::substrait::Type& substraitType, bool
return UNKNOWN();
case ::substrait::Type::KindCase::kDate:
return DATE();
case ::substrait::Type::KindCase::kTime:
return TIME_MICRO_UTC();
case ::substrait::Type::KindCase::kTimestampTz:
return TIMESTAMP();
case ::substrait::Type::KindCase::kDecimal: {
Expand Down Expand Up @@ -356,6 +358,9 @@ int64_t SubstraitParser::getLiteralValue(const ::substrait::Expression::Literal&
memcpy(&decimalValue, decimal.c_str(), 16);
return static_cast<int64_t>(decimalValue);
}
if (literal.has_time()) {
return literal.time();
}
return literal.i64();
}

Expand Down Expand Up @@ -431,6 +436,7 @@ const std::unordered_map<std::string, std::string> SubstraitParser::typeMap_ = {
{"fp32", "REAL"},
{"fp64", "DOUBLE"},
{"date", "DATE"},
{"time", "TIME MICRO UTC"},
{"ts", "TIMESTAMP"},
{"str", "VARCHAR"},
{"vbin", "VARBINARY"},
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxExpr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ TypePtr getScalarType(const ::substrait::Expression::Literal& literal) {
}
case ::substrait::Expression_Literal::LiteralTypeCase::kDate:
return DATE();
case ::substrait::Expression_Literal::LiteralTypeCase::kTime:
return TIME_MICRO_UTC();
case ::substrait::Expression_Literal::LiteralTypeCase::kTimestampTz:
return TIMESTAMP();
case ::substrait::Expression_Literal::LiteralTypeCase::kString:
Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/substrait/VeloxSubstraitSignature.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ std::string VeloxSubstraitSignature::toSubstraitSignature(const TypePtr& type) {
if (type->isDate()) {
return "date";
}
if (type->equivalent(*TIME_MICRO_UTC())) {
return "time";
}

switch (type->kind()) {
case TypeKind::BOOLEAN:
Expand Down Expand Up @@ -159,6 +162,10 @@ TypePtr VeloxSubstraitSignature::fromSubstraitSignature(const std::string& signa
return DATE();
}

if (signature == "time") {
return TIME_MICRO_UTC();
}

if (signature == "nothing") {
return UNKNOWN();
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/substrait/VeloxToSubstraitType.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ const ::substrait::Type& VeloxToSubstraitTypeConvertor::toSubstraitType(
substraitType->set_allocated_date(substraitDate);
return *substraitType;
}
if (type->equivalent(*velox::TIME_MICRO_UTC())) {
auto substraitTime = google::protobuf::Arena::CreateMessage<::substrait::Type_Time>(&arena);
substraitTime->set_nullability(::substrait::Type_Nullability_NULLABILITY_NULLABLE);
substraitType->set_allocated_time(substraitTime);
return *substraitType;
}

switch (type->kind()) {
case velox::TypeKind::BOOLEAN: {
Expand Down
17 changes: 17 additions & 0 deletions cpp/velox/tests/FunctionTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/vector/tests/utils/VectorTestBase.h"

#include "substrait/SubstraitParser.h"
#include "substrait/SubstraitToVeloxExpr.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/TypeUtils.h"
#include "substrait/VariantToVectorConverter.h"
Expand Down Expand Up @@ -77,6 +78,22 @@ TEST_F(FunctionTest, getIdxFromNodeName) {
ASSERT_EQ(index, 0);
}

TEST_F(FunctionTest, substraitTime) {
::substrait::Type type;
type.mutable_time()->set_nullability(::substrait::Type_Nullability_NULLABILITY_NULLABLE);
auto parsedType = SubstraitParser::parseType(type);
ASSERT_TRUE(parsedType->equivalent(*TIME_MICRO_UTC()));

::substrait::Expression_Literal literal;
literal.set_time(45'296'123'456L);
ASSERT_EQ(SubstraitParser::getLiteralValue<int64_t>(literal), 45'296'123'456L);

SubstraitVeloxExprConverter exprConverter(pool(), {});
auto constant = exprConverter.toVeloxExpr(literal);
ASSERT_TRUE(constant->type()->equivalent(*TIME_MICRO_UTC()));
ASSERT_EQ(constant->value().value<TypeKind::BIGINT>(), 45'296'123'456L);
}

TEST_F(FunctionTest, getNameBeforeDelimiter) {
std::string functionSpec = "lte:fp64_fp64";
auto funcName = SubstraitParser::getNameBeforeDelimiter(functionSpec);
Expand Down
9 changes: 9 additions & 0 deletions cpp/velox/tests/VeloxColumnarToRowTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,13 @@ TEST_F(VeloxColumnarToRowTest, Buffer_int64_int64_with_null) {
testRowBufferAddr(vector, expectArr, sizeof(expectArr));
}

TEST_F(VeloxColumnarToRowTest, Buffer_time_micro_utc) {
auto vector = makeRowVector({makeFlatVector<int64_t>({1, 2}, TIME_MICRO_UTC())});

uint8_t expectArr[] = {
0, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 208, 7, 0, 0, 0, 0, 0, 0,
};
testRowBufferAddr(vector, expectArr, sizeof(expectArr));
}

} // namespace gluten
2 changes: 2 additions & 0 deletions cpp/velox/tests/VeloxSubstraitSignatureTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ TEST_F(VeloxSubstraitSignatureTest, toSubstraitSignatureWithType) {
ASSERT_EQ(toSubstraitSignature(VARBINARY()), "vbin");
ASSERT_EQ(toSubstraitSignature(TIMESTAMP()), "ts");
ASSERT_EQ(toSubstraitSignature(DATE()), "date");
ASSERT_EQ(toSubstraitSignature(TIME_MICRO_UTC()), "time");
ASSERT_EQ(toSubstraitSignature(ARRAY(BOOLEAN())), "list");
ASSERT_EQ(toSubstraitSignature(ARRAY(INTEGER())), "list");
ASSERT_EQ(toSubstraitSignature(MAP(INTEGER(), BIGINT())), "map");
Expand Down Expand Up @@ -107,6 +108,7 @@ TEST_F(VeloxSubstraitSignatureTest, fromSubstraitSignature) {
ASSERT_EQ(fromSubstraitSignature("vbin")->kind(), TypeKind::VARBINARY);
ASSERT_EQ(fromSubstraitSignature("ts")->kind(), TypeKind::TIMESTAMP);
ASSERT_EQ(fromSubstraitSignature("date")->kind(), TypeKind::INTEGER);
ASSERT_TRUE(fromSubstraitSignature("time")->equivalent(*TIME_MICRO_UTC()));
ASSERT_EQ(fromSubstraitSignature("dec<18,2>")->kind(), TypeKind::BIGINT);
ASSERT_EQ(fromSubstraitSignature("dec<19,2>")->kind(), TypeKind::HUGEINT);

Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/tests/VeloxToSubstraitTypeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,11 @@ TEST_F(VeloxToSubstraitTypeTest, basic) {
testTypeConversion(ROW({}, {}));
}

TEST_F(VeloxToSubstraitTypeTest, time) {
google::protobuf::Arena arena;
auto substraitType = typeConvertor_->toSubstraitType(arena, TIME_MICRO_UTC());
ASSERT_EQ(substraitType.kind_case(), ::substrait::Type::KindCase::kTime);
ASSERT_TRUE(SubstraitParser::parseType(substraitType)->equivalent(*TIME_MICRO_UTC()));
}

} // namespace gluten
Loading
Loading