Skip to content

Commit

Permalink
Replace Spark SQL isNull check with Spark Scala based DSL (#493)
Browse files Browse the repository at this point in the history
- This is to ensure columns with spaces in their names get their names escaped correctly in the where condition.
- Added a test to verify.
  • Loading branch information
rdsharma26 authored Jul 5, 2023
1 parent 2eaeaed commit f53283e
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 13 deletions.
18 changes: 12 additions & 6 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,20 +465,26 @@ private[deequ] object Analyzers {
conditionalSelection(col(selection), where)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
val conditionColumn = where.map(expr)
conditionColumn
def conditionSelectionGivenColumn(selection: Column, where: Option[Column], replaceWith: Double): Column = {
where
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
val conditionColumn = where.map(expr)
conditionColumn
def conditionSelectionGivenColumn(selection: Column, where: Option[Column], replaceWith: String): Column = {
where
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
conditionSelectionGivenColumn(selection, where.map(expr), replaceWith)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
conditionSelectionGivenColumn(selection, where.map(expr), replaceWith)
}

def conditionalSelection(selection: Column, condition: Option[String]): Column = {
val conditionColumn = condition.map { expression => expr(expression) }
conditionalSelectionFromColumns(selection, conditionColumn)
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio
override def filterCondition: Option[String] = where

private def criterion(nullBehavior: NullBehavior): Column = {
val isNullCheck = col(column).isNull
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue)
conditionSelectionGivenColumn(colLengths, Option(isNullCheck), replaceWith = Double.MaxValue)
case NullBehavior.EmptyString =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
length(conditionSelectionGivenColumn(col(column), Option(isNullCheck), replaceWith = "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/main/scala/com/amazon/deequ/analyzers/MinLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.amazon.deequ.analyzers.Preconditions.hasColumn
import com.amazon.deequ.analyzers.Preconditions.isString
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, length, min}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.length
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType

Expand All @@ -47,12 +49,13 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio
override def filterCondition: Option[String] = where

private[deequ] def criterion(nullBehavior: NullBehavior): Column = {
val isNullCheck = col(column).isNull
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue)
conditionSelectionGivenColumn(colLengths, Option(isNullCheck), replaceWith = Double.MinValue)
case NullBehavior.EmptyString =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
length(conditionSelectionGivenColumn(col(column), Option(isNullCheck), replaceWith = "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}
Expand Down
24 changes: 22 additions & 2 deletions src/test/scala/com/amazon/deequ/profiles/ColumnProfilerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
assert(actualColumnProfile == expectedColumnProfile)
}

"return correct StringColumnProfile for column names with spaces" in withSparkSession { session =>
val data = getDfCompleteAndInCompleteColumnsWithSpacesInNames(session)
val columnNames = data.columns.toSeq

val lengthMap = Map(
"att 1" -> (1, 3),
"att 2" -> (0, 7)
)

lengthMap.foreach { case (columnName, (minLength, maxLength)) =>
val actualColumnProfile = ColumnProfiler.profile(data, Option(columnNames), false, 1)
.profiles(columnName)

assert(actualColumnProfile.isInstanceOf[StringColumnProfile])
val actualStringColumnProfile = actualColumnProfile.asInstanceOf[StringColumnProfile]

assert(actualStringColumnProfile.minLength.contains(minLength))
assert(actualStringColumnProfile.maxLength.contains(maxLength))
}
}

"return correct columnProfiles with predefined dataType" in withSparkSession { session =>

val data = getDfCompleteAndInCompleteColumns(session)
Expand Down Expand Up @@ -131,7 +152,6 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
assert(actualColumnProfile == expectedColumnProfile)
}


"return correct NumericColumnProfiles for numeric String DataType columns" in
withSparkSession { session =>

Expand Down Expand Up @@ -171,6 +191,7 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
assertProfilesEqual(expectedColumnProfile,
actualColumnProfile.asInstanceOf[NumericColumnProfile])
}

"return correct NumericColumnProfiles for numeric String DataType columns when " +
"kllProfiling disabled" in withSparkSession { session =>

Expand Down Expand Up @@ -562,7 +583,6 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
)

assertSameColumnProfiles(columnProfiles.profiles, expectedProfiles)

}

private[this] def assertSameColumnProfiles(
Expand Down
13 changes: 13 additions & 0 deletions src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ trait FixtureSupport {
).toDF("item", "att1", "att2")
}

def getDfCompleteAndInCompleteColumnsWithSpacesInNames(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._

Seq(
("1", "ab", "abc1"),
("2", "bc", null),
("3", "ab", "def2ghi"),
("4", "ab", null),
("5", "bcd", "ab"),
("6", "a", "pqrs")
).toDF("some item", "att 1", "att 2")
}

def getDfCompleteAndInCompleteColumnsAndVarLengthStrings(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._

Expand Down

0 comments on commit f53283e

Please sign in to comment.