Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,8 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test-compile -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
-Pspark-ut -Pdelta -DskipTests -Dmaven.source.skip
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.spark.internal.{LoggingShims, MDC}
import org.apache.spark.internal.{Logging, MDC => SparkMDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
Expand Down Expand Up @@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat(
tablePath: Option[String] = None,
isCDCRead: Boolean = false)
extends GlutenParquetFileFormat
with LoggingShims {
with Logging {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
SparkSession.getActiveSession.map { session =>
Expand Down Expand Up @@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat(
case AlwaysTrue() => Some(AlwaysTrue())
case AlwaysFalse() => Some(AlwaysFalse())
case _ =>
logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}")
logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}")
None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.extension.columnar.transition.{Convention, Transitions}

import org.apache.spark._
import org.apache.spark.internal.{LoggingShims, MDC}
import org.apache.spark.internal.{Logging, MDC => SparkMDC}
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -63,7 +63,7 @@ import java.util.{Date, UUID}
* values to data files. Specifically L123-126, L132, and L140 where it adds option
* WRITE_PARTITION_COLUMNS
*/
object GlutenDeltaFileFormatWriter extends LoggingShims {
object GlutenDeltaFileFormatWriter extends Logging {

/**
* A variable used in tests to check whether the output ordering of the query matches the
Expand Down Expand Up @@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val ret = f
val commitMsgs = ret.map(_.commitMsg)

logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.")

processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
logInfo(log"Finished processing stats for write job " +
log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")

// return a set of all the partition paths that were updated during this job
ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty)
} catch {
case cause: Throwable =>
logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
committer.abortJob(job)
throw cause
}
Expand Down Expand Up @@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.")
}, finallyBlock = {
dataWriter.close()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,19 @@
package org.apache.spark.sql.delta

object DeltaInsertIntoTableSuiteShims {
val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1")

// Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT.
val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
val INSERT_INTO_TMP_VIEW_ERROR_MSG =
if (isSpark41) {
"[TABLE_OR_VIEW_NOT_FOUND]"
} else {
"[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
}

val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG =
if (isSpark41) {
"INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
} else {
"INVALID_DEFAULT_VALUE.NOT_CONSTANT"
}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1332,8 +1332,8 @@
<sparkshim.artifactId>spark-sql-columnar-shims-spark41</sparkshim.artifactId>
<spark.version>4.1.1</spark.version>
<iceberg.version>1.10.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>4.0.0</delta.version>
<delta.package.name>delta-spark_4.1</delta.package.name>
<delta.version>4.1.0</delta.version>
<delta.binary.version>40</delta.binary.version>
<hudi.version>1.1.0</hudi.version>
<fasterxml.version>2.18.2</fasterxml.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager => Spark41CheckpointFileManager}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path, PathFilter}

import java.io.OutputStream

/**
* Binary compatibility shim for Delta 4.0, which was compiled against Spark 4.0's
* CheckpointFileManager package before Spark 4.1 moved it under streaming.checkpointing.
*/
trait CheckpointFileManager {
def createAtomic(
path: Path,
overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream

def open(path: Path): FSDataInputStream

def list(path: Path, filter: PathFilter): Array[FileStatus]

def list(path: Path): Array[FileStatus] = {
list(
path,
new PathFilter {
override def accept(path: Path): Boolean = true
})
}

def mkdirs(path: Path): Unit

def exists(path: Path): Boolean

def delete(path: Path): Unit

def isLocal: Boolean

def createCheckpointDirectory(): Path
}

object CheckpointFileManager {
def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = {
new Spark41CheckpointFileManagerAdapter(
Spark41CheckpointFileManager.create(path, hadoopConf))
}

abstract class CancellableFSDataOutputStream(outputStream: OutputStream)
extends org.apache.hadoop.fs.FSDataOutputStream(
outputStream,
null.asInstanceOf[FileSystem.Statistics]) {
def cancel(): Unit
}

private class Spark41CheckpointFileManagerAdapter(
delegate: Spark41CheckpointFileManager)
extends CheckpointFileManager {
override def createAtomic(
path: Path,
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
new CancellableFSDataOutputStreamAdapter(delegate.createAtomic(path, overwriteIfPossible))
}

override def open(path: Path): FSDataInputStream = delegate.open(path)

override def list(path: Path, filter: PathFilter): Array[FileStatus] =
delegate.list(path, filter)

override def mkdirs(path: Path): Unit = delegate.mkdirs(path)

override def exists(path: Path): Boolean = delegate.exists(path)

override def delete(path: Path): Unit = delegate.delete(path)

override def isLocal: Boolean = delegate.isLocal

override def createCheckpointDirectory(): Path = delegate.createCheckpointDirectory()
}

private class CancellableFSDataOutputStreamAdapter(
delegate: Spark41CheckpointFileManager.CancellableFSDataOutputStream)
extends CancellableFSDataOutputStream(delegate) {
override def close(): Unit = delegate.close()

override def cancel(): Unit = delegate.cancel()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.{Encoder, SQLContext}
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream}

object MemoryStream {
def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext)
}

def apply[A: Encoder](
numPartitions: Int)(
implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession)
}
}
Loading