Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,16 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.catalyst.trees.TreePattern.{CURRENT_LIKE, TreePattern}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.TimeFormatter
import org.apache.spark.sql.catalyst.util.TypeUtils.ordinalNumber
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types.{AbstractDataType, AnyTimeType, ByteType, DataType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, IntegralType, LongType, NumericType, ObjectType, TimeType}
import org.apache.spark.sql.types.{AbstractDataType, AnyTimeType, ByteType, DataType, DayTimeIntervalType, DecimalType, IntegerType, IntegralType, LongType, NumericType, ObjectType, TimeType}
import org.apache.spark.sql.types.DayTimeIntervalType.{HOUR, SECOND}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -768,46 +766,20 @@ case class TimeTrunc(unit: Expression, time: Expression)
}
}

abstract class IntegralToTimeBase
extends UnaryExpression with ExpectsInputTypes with CodegenFallback
abstract class TimeFromBase extends UnaryExpression with RuntimeReplaceable with ExpectsInputTypes
with TimeExpression {
protected def upScaleFactor: Long
protected def timeConversionMethod: String

override def inputTypes: Seq[AbstractDataType] = Seq(IntegralType)
override def dataType: DataType = TimeType(TimeType.MICROS_PRECISION)
override def nullable: Boolean = true
override def nullIntolerant: Boolean = true

@inline
protected final def validateTimeNanos(nanos: Long): Any = {
if (nanos < 0 || nanos >= NANOS_PER_DAY) null else nanos
}

override protected def nullSafeEval(input: Any): Any = {
val nanos = Math.multiplyExact(input.asInstanceOf[Number].longValue(), upScaleFactor)
validateTimeNanos(nanos)
}
}

abstract class TimeToLongBase extends UnaryExpression with ExpectsInputTypes
with TimeExpression {
protected def scaleFactor: Long

override def inputTypes: Seq[AbstractDataType] = Seq(AnyTimeType)
override def dataType: DataType = LongType
override def nullIntolerant: Boolean = true

override def nullSafeEval(input: Any): Any = {
Math.floorDiv(input.asInstanceOf[Number].longValue(), scaleFactor)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
if (scaleFactor == 1) {
defineCodeGen(ctx, ev, c => c)
} else {
defineCodeGen(ctx, ev, c => s"java.lang.Math.floorDiv($c, ${scaleFactor}L)")
}
}
override def replacement: Expression = StaticInvoke(
classOf[DateTimeUtils.type],
dataType,
timeConversionMethod,
Seq(child),
Seq(child.dataType)
)
}

// scalastyle:off line.size.limit
Expand All @@ -828,57 +800,14 @@ abstract class TimeToLongBase extends UnaryExpression with ExpectsInputTypes
14:30:00.5
> SELECT _FUNC_(86399.999999);
23:59:59.999999
> SELECT _FUNC_(90000);
NULL
> SELECT _FUNC_(-1);
NULL
""",
since = "4.2.0",
group = "datetime_funcs")
// scalastyle:on line.size.limit
case class TimeFromSeconds(child: Expression)
extends UnaryExpression with ExpectsInputTypes with CodegenFallback
with TimeExpression {
case class TimeFromSeconds(child: Expression) extends TimeFromBase {
override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
override def dataType: DataType = TimeType(TimeType.MICROS_PRECISION)
override def nullable: Boolean = true
override def nullIntolerant: Boolean = true

@inline
private def validateTimeNanos(nanos: Long): Any = {
if (nanos < 0 || nanos >= NANOS_PER_DAY) null else nanos
}

@transient
private lazy val evalFunc: Any => Any = child.dataType match {
case _: IntegralType => input =>
val nanos = Math.multiplyExact(input.asInstanceOf[Number].longValue(), NANOS_PER_SECOND)
validateTimeNanos(nanos)
case _: DecimalType => input =>
val operand = new java.math.BigDecimal(NANOS_PER_SECOND)
val nanos = input.asInstanceOf[Decimal].toJavaBigDecimal.multiply(operand).longValueExact()
validateTimeNanos(nanos)
case _: FloatType => input =>
val f = input.asInstanceOf[Float]
if (f.isNaN || f.isInfinite) {
null
} else {
val nanos = (f.toDouble * NANOS_PER_SECOND).toLong
validateTimeNanos(nanos)
}
case _: DoubleType => input =>
val d = input.asInstanceOf[Double]
if (d.isNaN || d.isInfinite) {
null
} else {
val nanos = (d * NANOS_PER_SECOND).toLong
validateTimeNanos(nanos)
}
}

override def nullSafeEval(input: Any): Any = evalFunc(input)

override def prettyName: String = "time_from_seconds"
override protected def timeConversionMethod: String = "timeFromSeconds"

override protected def withNewChildInternal(newChild: Expression): TimeFromSeconds =
copy(child = newChild)
Expand All @@ -905,12 +834,9 @@ case class TimeFromSeconds(child: Expression)
since = "4.2.0",
group = "datetime_funcs")
// scalastyle:on line.size.limit
case class TimeFromMillis(child: Expression)
extends IntegralToTimeBase {

override def upScaleFactor: Long = NANOS_PER_MILLIS

case class TimeFromMillis(child: Expression) extends TimeFromBase {
override def prettyName: String = "time_from_millis"
override protected def timeConversionMethod: String = "timeFromMillis"

override protected def withNewChildInternal(newChild: Expression): TimeFromMillis =
copy(child = newChild)
Expand All @@ -937,17 +863,30 @@ case class TimeFromMillis(child: Expression)
since = "4.2.0",
group = "datetime_funcs")
// scalastyle:on line.size.limit
case class TimeFromMicros(child: Expression)
extends IntegralToTimeBase {

override def upScaleFactor: Long = NANOS_PER_MICROS

case class TimeFromMicros(child: Expression) extends TimeFromBase {
override def prettyName: String = "time_from_micros"
override protected def timeConversionMethod: String = "timeFromMicros"

override protected def withNewChildInternal(newChild: Expression): TimeFromMicros =
copy(child = newChild)
}

abstract class TimeToBase extends UnaryExpression with RuntimeReplaceable with ExpectsInputTypes
with TimeExpression {
protected def timeConversionMethod: String

override def inputTypes: Seq[AbstractDataType] = Seq(AnyTimeType)
override def dataType: DataType = LongType

override def replacement: Expression = StaticInvoke(
classOf[DateTimeUtils.type],
dataType,
timeConversionMethod,
Seq(child),
Seq(child.dataType)
)
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage =
Expand All @@ -970,21 +909,11 @@ case class TimeFromMicros(child: Expression)
since = "4.2.0",
group = "datetime_funcs")
// scalastyle:on line.size.limit
case class TimeToSeconds(child: Expression)
extends UnaryExpression with ImplicitCastInputTypes with CodegenFallback {
case class TimeToSeconds(child: Expression) extends TimeToBase {

override def inputTypes: Seq[AbstractDataType] = Seq(AnyTimeType)
override def dataType: DataType = DecimalType(14, 6)
override def nullable: Boolean = true
override def nullIntolerant: Boolean = true

protected override def nullSafeEval(input: Any): Any = {
val nanos = input.asInstanceOf[Long]
val result = Decimal(nanos) / Decimal(NANOS_PER_SECOND)
if (result.changePrecision(14, 6)) result else null
}

override def prettyName: String = "time_to_seconds"
override protected def timeConversionMethod: String = "timeToSeconds"

override protected def withNewChildInternal(newChild: Expression): TimeToSeconds =
copy(child = newChild)
Expand Down Expand Up @@ -1012,12 +941,9 @@ case class TimeToSeconds(child: Expression)
since = "4.2.0",
group = "datetime_funcs")
// scalastyle:on line.size.limit
case class TimeToMillis(child: Expression)
extends TimeToLongBase {

override def scaleFactor: Long = NANOS_PER_MILLIS

case class TimeToMillis(child: Expression) extends TimeToBase {
override def prettyName: String = "time_to_millis"
override protected def timeConversionMethod: String = "timeToMillis"

override protected def withNewChildInternal(newChild: Expression): TimeToMillis =
copy(child = newChild)
Expand Down Expand Up @@ -1045,12 +971,9 @@ case class TimeToMillis(child: Expression)
since = "4.2.0",
group = "datetime_funcs")
// scalastyle:on line.size.limit
case class TimeToMicros(child: Expression)
extends TimeToLongBase {

override def scaleFactor: Long = NANOS_PER_MICROS

case class TimeToMicros(child: Expression) extends TimeToBase {
override def prettyName: String = "time_to_micros"
override protected def timeConversionMethod: String = "timeToMicros"

override protected def withNewChildInternal(newChild: Expression): TimeToMicros =
copy(child = newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,102 @@ object DateTimeUtils extends SparkDateTimeUtils {
}
}

private def withTimeConversionErrorHandling(f: => Long): Long = {
try {
val nanos = f
if (nanos < 0 || nanos >= NANOS_PER_DAY) {
throw new DateTimeException(
s"Invalid TIME value: must be between 00:00:00 and 23:59:59.999999999, " +
s"but got $nanos nanoseconds")
}
nanos
} catch {
case e: DateTimeException =>
throw QueryExecutionErrors.ansiDateTimeArgumentOutOfRangeWithoutSuggestion(e)
case e: ArithmeticException =>
throw QueryExecutionErrors.ansiDateTimeArgumentOutOfRangeWithoutSuggestion(
new DateTimeException("Overflow in TIME conversion", e))
}
}

/**
* Creates a TIME value from seconds since midnight (integral types).
* @param seconds Seconds (0 to 86399)
* @return Nanoseconds since midnight
*/
def timeFromSeconds(seconds: Long): Long = withTimeConversionErrorHandling {
Math.multiplyExact(seconds, NANOS_PER_SECOND)
}

/**
* Creates a TIME value from seconds since midnight (decimal type).
* @param seconds Seconds (0 to 86399.999999)
* @return Nanoseconds since midnight
*/
def timeFromSeconds(seconds: Decimal): Long = withTimeConversionErrorHandling {
val operand = new java.math.BigDecimal(NANOS_PER_SECOND)
seconds.toJavaBigDecimal.multiply(operand).longValueExact()
}

/**
* Creates a TIME value from seconds since midnight (floating point type).
* @param seconds Seconds (0 to 86399.999999)
* @return Nanoseconds since midnight
*/
def timeFromSeconds(seconds: Double): Long = withTimeConversionErrorHandling {
if (seconds.isNaN || seconds.isInfinite) {
throw new DateTimeException("Cannot convert NaN or Infinite value to TIME")
}
(seconds * NANOS_PER_SECOND).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider overflow here when calling toLong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the resolution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or overflow can never happen here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, overflow can happen, but it is already handled in withTimeConversionErrorHandling

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? Double.toLong never throws exception AFAIK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double.toLong can produce overflow results (when the Double is too large/small):
Large values → Long.MaxValue
Small values → Long.MinValue
How it's handled: The range check (nanos < 0 || nanos >= NANOS_PER_DAY) in withTimeConversionErrorHandling detects these overflow values and throws SparkDateTimeException.

I tested with -1e199 , 1e200

scala>spark.conf.set("spark.sql.timeType.enabled", "true")
scala>spark.sql("SELECT time_from_seconds(-1e199)").show()
org.apache.spark.SparkDateTimeException: [DATETIME_FIELD_OUT_OF_BOUNDS.WITHOUT_SUGGESTION] Invalid TIME value: must be between 00:00:00 and 23:59:59.999999999, but got -9223372036854775808 nanoseconds.  SQLSTATE: 22023
scala>spark.sql("SELECT time_from_seconds(1e200)").show()
org.apache.spark.SparkDateTimeException: [DATETIME_FIELD_OUT_OF_BOUNDS.WITHOUT_SUGGESTION] Invalid TIME value: must be between 00:00:00 and 23:59:59.999999999, but got 9223372036854775807 nanoseconds.  SQLSTATE: 22023

}

/**
* Creates a TIME value from milliseconds since midnight.
* @param millis Milliseconds (0 to 86399999)
* @return Nanoseconds since midnight
*/
def timeFromMillis(millis: Long): Long = withTimeConversionErrorHandling {
Math.multiplyExact(millis, NANOS_PER_MILLIS)
}

/**
* Creates a TIME value from microseconds since midnight.
* @param micros Microseconds (0 to 86399999999)
* @return Nanoseconds since midnight
*/
def timeFromMicros(micros: Long): Long = withTimeConversionErrorHandling {
Math.multiplyExact(micros, NANOS_PER_MICROS)
}

/**
* Converts a TIME value to seconds.
* @param nanos Nanoseconds since midnight
* @return Seconds as Decimal(14, 6)
*/
def timeToSeconds(nanos: Long): Decimal = {
val result = Decimal(nanos) / Decimal(NANOS_PER_SECOND)
result.changePrecision(14, 6)
result
}

/**
* Converts a TIME value to milliseconds.
* @param nanos Nanoseconds since midnight
* @return Milliseconds since midnight
*/
def timeToMillis(nanos: Long): Long = {
Math.floorDiv(nanos, NANOS_PER_MILLIS)
}

/**
* Converts a TIME value to microseconds.
* @param nanos Nanoseconds since midnight
* @return Microseconds since midnight
*/
def timeToMicros(nanos: Long): Long = {
Math.floorDiv(nanos, NANOS_PER_MICROS)
}

/**
* Makes a timestamp without time zone from a date and a local time.
*
Expand Down
Loading