Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Spark SQL isNull check with Spark Scala based DSL #493

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,20 +465,26 @@ private[deequ] object Analyzers {
conditionalSelection(col(selection), where)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
val conditionColumn = where.map(expr)
conditionColumn
def conditionSelectionGivenColumn(selection: Column, where: Option[Column], replaceWith: Double): Column = {
where
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
val conditionColumn = where.map(expr)
conditionColumn
def conditionSelectionGivenColumn(selection: Column, where: Option[Column], replaceWith: String): Column = {
where
.map { condition => when(condition, replaceWith).otherwise(selection) }
.getOrElse(selection)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: Double): Column = {
conditionSelectionGivenColumn(selection, where.map(expr), replaceWith)
}

def conditionalSelection(selection: Column, where: Option[String], replaceWith: String): Column = {
conditionSelectionGivenColumn(selection, where.map(expr), replaceWith)
}

def conditionalSelection(selection: Column, condition: Option[String]): Column = {
val conditionColumn = condition.map { expression => expr(expression) }
conditionalSelectionFromColumns(selection, conditionColumn)
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio
override def filterCondition: Option[String] = where

private def criterion(nullBehavior: NullBehavior): Column = {
val isNullCheck = col(column).isNull
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MaxValue)
conditionSelectionGivenColumn(colLengths, Option(isNullCheck), replaceWith = Double.MaxValue)
case NullBehavior.EmptyString =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
length(conditionSelectionGivenColumn(col(column), Option(isNullCheck), replaceWith = "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/main/scala/com/amazon/deequ/analyzers/MinLength.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.amazon.deequ.analyzers.Preconditions.hasColumn
import com.amazon.deequ.analyzers.Preconditions.isString
import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, length, min}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.length
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType

Expand All @@ -47,12 +49,13 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio
override def filterCondition: Option[String] = where

private[deequ] def criterion(nullBehavior: NullBehavior): Column = {
val isNullCheck = col(column).isNull
nullBehavior match {
case NullBehavior.Fail =>
val colLengths: Column = length(conditionalSelection(column, where)).cast(DoubleType)
conditionalSelection(colLengths, Option(s"${column} IS NULL"), replaceWith = Double.MinValue)
conditionSelectionGivenColumn(colLengths, Option(isNullCheck), replaceWith = Double.MinValue)
case NullBehavior.EmptyString =>
length(conditionalSelection(col(column), Option(s"${column} IS NULL"), replaceWith = "")).cast(DoubleType)
length(conditionSelectionGivenColumn(col(column), Option(isNullCheck), replaceWith = "")).cast(DoubleType)
case _ => length(conditionalSelection(column, where)).cast(DoubleType)
}
}
Expand Down
24 changes: 22 additions & 2 deletions src/test/scala/com/amazon/deequ/profiles/ColumnProfilerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
assert(actualColumnProfile == expectedColumnProfile)
}

"return correct StringColumnProfile for column names with spaces" in withSparkSession { session =>
val data = getDfCompleteAndInCompleteColumnsWithSpacesInNames(session)
val columnNames = data.columns.toSeq

val lengthMap = Map(
"att 1" -> (1, 3),
"att 2" -> (0, 7)
)

lengthMap.foreach { case (columnName, (minLength, maxLength)) =>
val actualColumnProfile = ColumnProfiler.profile(data, Option(columnNames), false, 1)
.profiles(columnName)

assert(actualColumnProfile.isInstanceOf[StringColumnProfile])
val actualStringColumnProfile = actualColumnProfile.asInstanceOf[StringColumnProfile]

assert(actualStringColumnProfile.minLength.contains(minLength))
assert(actualStringColumnProfile.maxLength.contains(maxLength))
}
}

"return correct columnProfiles with predefined dataType" in withSparkSession { session =>

val data = getDfCompleteAndInCompleteColumns(session)
Expand Down Expand Up @@ -131,7 +152,6 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
assert(actualColumnProfile == expectedColumnProfile)
}


"return correct NumericColumnProfiles for numeric String DataType columns" in
withSparkSession { session =>

Expand Down Expand Up @@ -171,6 +191,7 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
assertProfilesEqual(expectedColumnProfile,
actualColumnProfile.asInstanceOf[NumericColumnProfile])
}

"return correct NumericColumnProfiles for numeric String DataType columns when " +
"kllProfiling disabled" in withSparkSession { session =>

Expand Down Expand Up @@ -562,7 +583,6 @@ class ColumnProfilerTest extends WordSpec with Matchers with SparkContextSpec
)

assertSameColumnProfiles(columnProfiles.profiles, expectedProfiles)

}

private[this] def assertSameColumnProfiles(
Expand Down
13 changes: 13 additions & 0 deletions src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ trait FixtureSupport {
).toDF("item", "att1", "att2")
}

def getDfCompleteAndInCompleteColumnsWithSpacesInNames(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._

Seq(
("1", "ab", "abc1"),
("2", "bc", null),
("3", "ab", "def2ghi"),
("4", "ab", null),
("5", "bcd", "ab"),
("6", "a", "pqrs")
).toDF("some item", "att 1", "att 2")
}

def getDfCompleteAndInCompleteColumnsAndVarLengthStrings(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._

Expand Down