diff --git a/script.spark b/script.spark index a34c81c..c9de25f 100755 --- a/script.spark +++ b/script.spark @@ -1,4 +1,3 @@ - import com.amazon.deequ.{VerificationSuite, VerificationResult} import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame import com.amazon.deequ.checks.{Check, CheckLevel} @@ -8,21 +7,12 @@ import com.amazon.deequ.schema import com.amazon.deequ.schema.{RowLevelSchema,RowLevelSchemaValidator} var demo = """ - - BasicEvaluation -StatisticalEvaluation - ReportViolations - - var suggestions = SuggestConstraints() -suggestions.select("_1", "_2").show(100, 50) - - +suggestions.select("_1", "_2", "_3").show(100, 100) showEvaluation EvaluateDataQuality - """ val args = sc.getConf.get("spark.driver.args").split("\\s+") @@ -76,13 +66,13 @@ def StatisticalEvaluation(): Unit = { def SuggestConstraints(): org.apache.spark.sql.Dataset[(String, String, String)] = { val suggestionResult = { ConstraintSuggestionRunner().onData(dataset).addConstraintRules(Rules.DEFAULT).run()} - suggestionResult.constraintSuggestions.flatMap { - case (column, suggestions) => + suggestionResult.constraintSuggestions.flatMap { + case (column, suggestions) => suggestions.map { constraint => (column, constraint.description, constraint.codeForConstraint) - } + } }.toSeq.toDS() - + } var suggestions = SuggestConstraints() // suggestions.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").save(output_bucket) @@ -90,37 +80,31 @@ var suggestions = SuggestConstraints() def showEvaluation = """ def EvaluateDataQuality(): Unit = { - - val verificationResult: VerificationResult = { VerificationSuite().onData(dataset).addCheck( Check(CheckLevel.Error, "Review Check") - - .isUnique("id") + .isUnique("id") .isComplete("purpose_cat") .isComplete("annual_inc") .isComplete("total_pymnt") .isComplete("grade_cat") - - .isComplete("interest_rate") + .isComplete("interest_rate") .isNonNegative("interest_rate") - .hasMax("loan_amount", _ == 5000) - - .isContainedIn("term", Array(" 36 months", " 60 x months")) - .isContainedIn("loan_condition", Array("Good Loan"), _ >= 0.95, Some("It should be above 0.95!")) + .isContainedIn("term", Array(" 36 months", " 60 months")) + .isContainedIn("loan_condition", Array("Good Loan"), _ >= 0.95, Some("It should be above 0.95!")) - ).run() + ).run() } """ def EvaluateDataQuality(): Unit = { - + val verificationResult: VerificationResult = { VerificationSuite().onData(dataset).addCheck( Check(CheckLevel.Error, "Review Check") - + .isUnique("id") .isComplete("purpose_cat") @@ -128,23 +112,21 @@ def EvaluateDataQuality(): Unit = { .isComplete("total_pymnt") .isComplete("grade_cat") - .isComplete("interest_rate") + .isComplete("interest_rate") .isNonNegative("interest_rate") .hasMax("loan_amount", _ == 5000) - + .isContainedIn("term", Array(" 36 months", " 60 months")) - .isContainedIn("loan_condition", Array("Good Loan"), _ >= 0.95, Some("It should be above 0.95!")) + .isContainedIn("loan_condition", Array("Good Loan"), _ >= 0.95, Some("It should be above 0.95!")) - ).run() + ).run() } val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult) // resultDataFrame.show(20, 100) - resultDataFrame.select("constraint", "constraint_status", "constraint_message").show() - resultDataFrame.select("constraint_message").show(100,100) - + resultDataFrame.select("constraint", "constraint_status", "constraint_message").show(100,100) sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") @@ -163,5 +145,3 @@ def ReportViolations(): Unit = { println("Rows: ") result.invalidRows.show(truncate=true) } - -