Skip to content

Commit

Permalink
GLUTEN-8836 Fix partition values with escape char
Browse files Browse the repository at this point in the history
  • Loading branch information
lwz9103 committed Feb 27, 2025
1 parent e82c2ae commit d5cc5fb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,21 @@
*/
package org.apache.spark.sql.delta.util

import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.types.StringType

import org.apache.hadoop.fs.Path

/**
* `OptimizeTableCommandOverwrites` does not use `DelayedCommitProtocol`, so we can't use
* `DelayedCommitProtocol.parsePartitions`. This is a copied version. </br> TODO: Remove it.
*/
object MergeTreePartitionUtils {

private val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"

def parsePartitions(dir: String): Map[String, String] = {
// TODO: timezones?
// TODO: enable validatePartitionColumns?
val dateFormatter = DateFormatter()
val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)
val parsedPartition =
PartitionUtils
.parsePartition(
new Path(dir),
typeInference = false,
Set.empty,
Map.empty,
validatePartitionColumns = false,
java.util.TimeZone.getDefault,
dateFormatter,
timestampFormatter)
._1
.get
parsedPartition.columnNames
.zip(
parsedPartition.literals
.map(l => Cast(l, StringType).eval())
.map(Option(_).map(_.toString).orNull))
.toMap
import scala.collection.mutable
val partitions = dir.split("/").toList
val partitionValues = mutable.Map.empty[String, String]
for (partition <- partitions) {
val keyValue = partition.split("=")
require(keyValue.length == 2, s"Expected key=value format, but got: $partition")
partitionValues += (keyValue(0) -> keyValue(1))
}
partitionValues.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}

import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types._

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -359,6 +360,52 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_partition_hdfs")
}

test("test partition values with escape chars") {

val schema = StructType(
Seq(
StructField.apply("id", IntegerType, nullable = true),
StructField.apply("escape", StringType, nullable = true)
))

// scalastyle:off nonascii
val data: Seq[Row] = Seq(
Row(1, "="),
Row(2, "/"),
Row(3, "#"),
Row(4, ":"),
Row(5, "\\"),
Row(6, "\u0001"),
Row(7, "中文"),
Row(8, " ")
)
// scalastyle:on nonascii

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("origin_table")

spark.sql(s"""
|DROP TABLE IF EXISTS partition_escape;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS partition_escape
|(
| c1 int,
| c2 string
|)
|USING clickhouse
|PARTITIONED BY (c2)
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='c1',
| primaryKey='c1')
|LOCATION '$HDFS_URL/test/partition_escape'
|""".stripMargin)

spark.sql("insert into partition_escape select * from origin_table")
spark.sql("select * from partition_escape").show()
}

testSparkVersionLE33("test mergetree write with bucket table") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info) : Merge
while (!in.eof())
{
MergeTreePart part;
std::string encoded_name;
readString(encoded_name, in);
Poco::URI::decode(encoded_name, part.name);
readString(part.name, in);
assertChar('\n', in);
readIntText(part.begin, in);
assertChar('\n', in);
Expand Down

0 comments on commit d5cc5fb

Please sign in to comment.