Skip to content

Commit

Permalink
Merge branch 'main' into arnavb/day-time-support
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnavBalyan authored Mar 4, 2025
2 parents 44c06c7 + ca2ab6a commit 2a0971a
Show file tree
Hide file tree
Showing 143 changed files with 6,787 additions and 1,521 deletions.
26 changes: 12 additions & 14 deletions .github/workflows/util/license-header.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@
# Copyright(c) 2021-2023 Intel Corporation.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
from collections import OrderedDict
Expand Down
11 changes: 7 additions & 4 deletions .github/workflows/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
# Copyright(c) 2021-2023 Intel Corporation.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
10 changes: 10 additions & 0 deletions DISCLAIMER
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Apache Gluten (Incubating) is an effort undergoing incubation at the Apache
Software Foundation (ASF), sponsored by the Apache Incubator PMC.

Incubation is required of all newly accepted projects until a further review
indicates that the infrastructure, communications, and decision making process
have stabilized in a manner consistent with other successful ASF projects.

While incubation status is not necessarily a reflection of the completeness
or stability of the code, it does indicate that the project has yet to be
fully endorsed by the ASF.
18 changes: 0 additions & 18 deletions DISCLAIMER-WIP

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ object CHRuleApi {
injector.injectPostTransform(c => InsertTransitions.create(c.outputsColumnar, CHBatch))
injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session))
injector.injectPostTransform(c => AddPreProjectionForHashJoin.apply(c.session))
injector.injectPostTransform(c => ReplaceSubStringComparison.apply(c.session))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(c => p => ExpandFallbackPolicy(c.caller.isAqe(), p))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

override def genColumnarCollectLimitExec(
limit: Int,
child: SparkPlan): ColumnarCollectLimitBaseExec =
throw new GlutenNotSupportException("ColumnarCollectLimit is not supported in ch backend.")

override def genColumnarRangeExec(
start: Long,
end: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package org.apache.gluten.extension

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.execution._
import org.apache.gluten.expression._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -31,9 +34,21 @@ import org.apache.spark.unsafe.types.UTF8String
*/

// Try to replace `from_json` with `get_json_object` if possible.

object PlanResolvedChecker {
def check(plan: LogicalPlan): Boolean = {
plan match {
case isnert: InsertIntoStatement => isnert.query.resolved
case _ => plan.resolved
}
}
}

class RepalceFromJsonWithGetJsonObject(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (!CHBackendSettings.enableReplaceFromJsonWithGetJsonObject || !plan.resolved) {
if (
!CHBackendSettings.enableReplaceFromJsonWithGetJsonObject || !PlanResolvedChecker.check(plan)
) {
plan
} else {
visitPlan(plan)
Expand Down Expand Up @@ -75,3 +90,106 @@ class RepalceFromJsonWithGetJsonObject(spark: SparkSession) extends Rule[Logical
}
}
}

/**
* We don't apply this rule on LogicalPlan, because there may be fallback in this query. If the
* replacement was in a fallback node, it could cause the query to fail.
*/
case class ReplaceSubStringComparison(spark: SparkSession) extends Rule[SparkPlan] with Logging {
val udfFunctionName = "compare_substrings"
override def apply(plan: SparkPlan): SparkPlan = {
if (UDFMappings.scalaUDFMap.get(udfFunctionName).isDefined) {
visitPlan(plan)
} else {
plan
}
}

def visitPlan(plan: SparkPlan): SparkPlan = {
plan match {
case project: ProjectExecTransformer =>
val newProjectList =
project.projectList.map(expr => visitExpression(expr).asInstanceOf[NamedExpression])
project.copy(projectList = newProjectList, child = visitPlan(project.child))
case chFilter: CHFilterExecTransformer =>
val newCondition = visitExpression(chFilter.condition)
CHFilterExecTransformer(newCondition, visitPlan(chFilter.child))
case filter: FilterExecTransformer =>
val newCondition = visitExpression(filter.condition)
FilterExecTransformer(newCondition, visitPlan(filter.child))
case other =>
other.withNewChildren(other.children.map(visitPlan))
}
}

def validateSubstring(expr: Expression): Boolean = {
expr match {
case substring: Substring =>
substring.pos.isInstanceOf[Literal] && substring.len.isInstanceOf[Literal] &&
substring.pos.asInstanceOf[Literal].value.asInstanceOf[Int] > 0
case _ => false
}
}

def visitExpression(expr: Expression): Expression = {
expr match {
case binary: BinaryComparison
if validateSubstring(binary.left) || validateSubstring(binary.right) =>
rewriteSubStringComparison(binary)
case other =>
other.withNewChildren(other.children.map(visitExpression))
}
}

def buildSubstringComparison(
leftExpression: Expression,
rightExpression: Expression): Option[Expression] = {
def getSubStringPositionAndLength(e: Expression): (Integer, Integer, Expression) = {
e match {
case subString: Substring =>
val pos = subString.pos.asInstanceOf[Literal].value.asInstanceOf[Int]
val len = subString.len.asInstanceOf[Literal].value.asInstanceOf[Int]
(pos - 1, len, subString.str)

case literal: Literal if literal.dataType.isInstanceOf[StringType] =>
val str = literal.value.toString
(0, str.length, e)
case _ => (0, Integer.MAX_VALUE, e)
}
}
val (leftPos, leftLen, leftInnerExpr) = getSubStringPositionAndLength(leftExpression)
val (rightPos, rightLen, rightInnerExpr) = getSubStringPositionAndLength(rightExpression)
if (leftLen != rightLen) {
return None;
}

val udf = ScalaUDF(
null,
IntegerType,
Seq(leftInnerExpr, rightInnerExpr, Literal(leftPos), Literal(rightPos), Literal(rightLen)),
Nil,
None,
Some(udfFunctionName),
leftExpression.nullable || rightExpression.nullable
)
Some(udf)
}

def rewriteSubStringComparison(binaryExpression: BinaryComparison): Expression = {
val zeroLiteral = Literal(0, IntegerType)
val subStringCompareExpression =
buildSubstringComparison(binaryExpression.left, binaryExpression.right)
subStringCompareExpression match {
case Some(expr) =>
binaryExpression match {
case _: EqualTo => EqualTo(expr, zeroLiteral)
case _: GreaterThan => GreaterThan(expr, zeroLiteral)
case _: GreaterThanOrEqual => GreaterThanOrEqual(expr, zeroLiteral)
case _: LessThan => LessThan(expr, zeroLiteral)
case _: LessThanOrEqual => LessThanOrEqual(expr, zeroLiteral)
case _ => binaryExpression
}
case _ => binaryExpression
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, TestUtils}
import org.apache.spark.sql.{DataFrame, GlutenTestUtils, Row}
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.SortAggregateExec
Expand Down Expand Up @@ -590,7 +590,7 @@ class GlutenClickHouseTPCHBucketSuite
case o => o
})
}
TestUtils.compareAnswers(sortedRes, exceptedResult)
GlutenTestUtils.compareAnswers(sortedRes, exceptedResult)
}

val SQL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, TestUtils}
import org.apache.spark.sql.{GlutenTestUtils, Row}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.types.{DecimalType, StructType}

Expand Down Expand Up @@ -342,7 +342,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
assert(result.size == 7)
val expected =
Seq(Row(465.0), Row(67.0), Row(160.0), Row(371.0), Row(732.0), Row(138.0), Row(785.0))
TestUtils.compareAnswers(result, expected)
GlutenTestUtils.compareAnswers(result, expected)
}

test("test 'order by' two keys") {
Expand All @@ -358,7 +358,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
val result = df.take(3)
val expected =
Seq(Row(0, "ALGERIA", 0), Row(1, "ARGENTINA", 1), Row(2, "BRAZIL", 1))
TestUtils.compareAnswers(result, expected)
GlutenTestUtils.compareAnswers(result, expected)
}
}

Expand All @@ -373,7 +373,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
assert(sortExec.size == 1)
val result = df.collect()
val expectedResult = Seq(Row(0), Row(1), Row(2), Row(3), Row(4))
TestUtils.compareAnswers(result, expectedResult)
GlutenTestUtils.compareAnswers(result, expectedResult)
}
}

Expand Down Expand Up @@ -416,7 +416,7 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
new java.math.BigDecimal("123456789.223456789012345678901234567"),
Seq(new java.math.BigDecimal("123456789.123456789012345678901234567"))
))
TestUtils.compareAnswers(result, expectedResult)
GlutenTestUtils.compareAnswers(result, expectedResult)
}

test("test decimal128") {
Expand All @@ -434,8 +434,8 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
.add("b1", DecimalType(38, 27)))

val df2 = spark.createDataFrame(data, schema)
TestUtils.compareAnswers(df2.select("b").collect(), Seq(Row(struct)))
TestUtils.compareAnswers(
GlutenTestUtils.compareAnswers(df2.select("b").collect(), Seq(Row(struct)))
GlutenTestUtils.compareAnswers(
df2.select("a").collect(),
Seq(Row(new java.math.BigDecimal("123456789.123456789012345678901234566"))))
}
Expand Down
Loading

0 comments on commit 2a0971a

Please sign in to comment.