diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/pom.xml b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/pom.xml new file mode 100644 index 0000000..a5e4c67 --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/pom.xml @@ -0,0 +1,240 @@ + + + 4.0.0 + + org.stratio.rocket.poc + rocket-synapseml + 1.0-SNAPSHOT + + + UTF-8 + UTF-8 + UTF-8 + 1.8 + 1.8 + 2.12 + 3.1.1 + 4.13.1 + 2.2.5 + 3.0.0-SNAPSHOT + 3.4.0-M6 + + + + + mmlspark.azureedge.net/ + mmlspark.azureedge.net/ + https://mmlspark.azureedge.net/maven + + + spark-packages + spark-packages + https://repos.spark-packages.org + + + + + + + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${apache.spark.version} + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + provided + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${apache.spark.version} + provided + + + + org.apache.spark + spark-mllib_2.12 + ${apache.spark.version} + provided + + + + + + + + com.stratio.sparta + sdk-lite-xd + ${rocket.version} + provided + + + + com.stratio.crossdata + crossdata-core_${scala.binary.version} + ${crossdata.version} + provided + + + + com.stratio.sparta + rocket-ml-client + 3.1.0-SNAPSHOT + provided + + + + com.stratio.sparta + ml-pipeline-core + 3.1.0-SNAPSHOT + provided + + + + com.stratio.sparta + core + 3.0.0-SNAPSHOT + provided + + + + org.json4s + json4s-core_${scala.binary.version} + 3.6.10 + provided + + + + + + + + com.microsoft.azure + synapseml_2.12 + 0.9.5-13-d1b51517-SNAPSHOT + + + org.scala-lang + scala-reflect + + + org.apache.spark + spark-tags_2.12 + + + org.apache.spark + spark-avro_2.12 + + + org.scalatest + scalatest_2.12 + + + org.scala-lang + scala-library + + + + + + org.apache.hadoop + hadoop-azure + 3.3.1 + provided + + + + + + src/main/scala + src/test/scala + + + net.alchim31.maven + scala-maven-plugin + + false + incremental + + + + + add-source + compile + testCompile + doc + doc-jar + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.3.0 + + + package + + shade + + + + + rocket-synapseml + false + false + false + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + log4j2.xml + log4j.properties + logback.xml + **/Log4j2Plugins.dat + **/models/*.json + + + + + + classworlds:classworlds + junit:junit + jmock:* + *:xml-apis + org.apache.maven:lib:tests + com.fasterxml.jackson.* + org.scala-lang + org.slf4j + + + + + + + + + \ No newline at end of file diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/resources/log4j2.xml b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/resources/log4j2.xml new file mode 100644 index 0000000..9feb47c --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/resources/log4j2.xml @@ -0,0 +1,116 @@ + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSS}%replace{%d{XXX}}{^Z$}{+00:00} %level %replace{%X{[MDC_USER_KEY]}}{^.{0}$}{-} %replace{%X{[MDC_AUDIT_KEY]}}{^.{0}$}{0} %replace{%X{[MDC_PROCESS_KEY]}}{^.{0}$}{-} %c %encode{%m}{CRLF}%n + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSS}%replace{%d{XXX}}{^Z$}{+00:00} %level %replace{%X{[MDC_USER_KEY]}}{^.{0}$}{-} %replace{%X{[MDC_AUDIT_KEY]}}{^.{0}$}{0} %replace{%X{[MDC_PROCESS_KEY]}}{^.{0}$}{-} %c {"@message":"Unformatted message. Provided log has been added to @data","@data": %encode{%m}{CRLF},"@exception":"%enc{%throwable}{JSON}"}%n + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSS}%replace{%d{XXX}}{^Z$}{+00:00} %level %replace{%X{[MDC_USER_KEY]}}{^.{0}$}{-} %replace{%X{[MDC_AUDIT_KEY]}}{^.{0}$}{0} %replace{%X{[MDC_PROCESS_KEY]}}{^.{0}$}{-} %c {"@message":"%enc{%m}{JSON}","@data":%replace{%X{[MDC_DATA-JSON_KEY]}}{^.{0}$}{{}},"@exception":"%enc{%throwable}{JSON}"}%n + + + %level | %d{dd-MM-yyyy HH:mm:ss,SSS} | %c | %encode{%m}{CRLF}%n + + + + + + + + + + + + ${formattedJsonPatternCentralizedLogging} + + + ${unformattedJsonPatternCentralizedLogging} + + + ${commonPatternCentralizedLogging} + + + + + + + + + + + ${formattedJsonPatternCentralizedLogging} + + + ${unformattedJsonPatternCentralizedLogging} + + + ${commonPatternCentralizedLogging} + + + + + + + + ${formattedJsonPatternCentralizedLogging} + + + + + + ${basePattern} + + + + + + ${basePattern} + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/SynapseMlExample.scala b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/SynapseMlExample.scala new file mode 100644 index 0000000..51b5db0 --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/SynapseMlExample.scala @@ -0,0 +1,154 @@ +package com.stratio.rocket.poc + +// Spark ML imports +import com.stratio.rocket.ml.serverclient.RocketMlClientBuilder +import org.apache.spark.ml.classification._ +import org.apache.spark.ml.feature._ +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.sql.functions._ + +// SynapseML imports +import com.microsoft.azure.synapse.ml.explainers._ +import org.apache.spark.sql.SparkSession + + +object SynapseMlExample extends App { + + val sparkMasterIp = System.getProperty("spark.master", "local[2]") + val spark = SparkSession + .builder().master("local[2]") + .appName("SynapseMlExample") + .getOrCreate() + val sc = spark.sparkContext + + + // -------------------------------- + // => Reading input dataset + // -------------------------------- + + val df = spark.read.parquet( + "wasbs://publicwasb@mmlspark.blob.core.windows.net/AdultCensusIncome.parquet" + ).cache() + + + // ------------------------------------ + // => Target engineering + // ------------------------------------ + + // => Indexing target column + val targetColName = "income" + val labelIndexerModel: StringIndexerModel = new StringIndexer() + .setInputCol(targetColName).setOutputCol("label").setStringOrderType("alphabetAsc").fit(df) + + val trainingDf = labelIndexerModel.transform(df) + + + val featuresToPredictcsv = trainingDf.orderBy(rand()).limit(5).repartition(200) + + /* + // Saving to CSV; note, some data contains trailing whitespaces + + featuresToPredictcsv.coalesce(1).write + .option("header", true) + .option("ignoreLeadingWhiteSpace", false) + .option("ignoreTrailingWhiteSpace", false) + .csv("/tmp/AdultCensusIncome_5samples.csv") + */ + + + // => Training process setup + // ------------------------------------ + + // => Feature engineering + // ************************ + + val categoricalFeatures = Array( + "workclass", "education", "marital-status", "occupation", "relationship", "race", "sex", "native-country" + ) + + val numericFeatures = Array("age", "education-num", "capital-gain", "capital-loss", "hours-per-week") + + val weightCol = "fnlwgt" + + // - Pre-processing categorical features: StringIndexer + OneHotEncoder + val categorical_features_idx = categoricalFeatures.map(col => s"${col}_idx") + val strIndexer = new StringIndexer() + .setInputCols(categoricalFeatures) + .setOutputCols(categorical_features_idx) + + val categorical_features_enc = categoricalFeatures.map(col => s"${col}_enc") + val onehotEnc = new OneHotEncoder() + .setInputCols(categorical_features_idx) + .setOutputCols(categorical_features_enc) + + // - Assembling all features in one vector + val vectAssem = new VectorAssembler() + .setInputCols(categorical_features_enc ++ numericFeatures) + .setOutputCol("features") + + + // => Trainer definition + // ************************ + val lr = new LogisticRegression() + .setFeaturesCol("features") + .setLabelCol("label") + .setWeightCol(weightCol) + + // => Complete pipeline + // ************************ + val pipeline = new Pipeline() + .setStages(Array(strIndexer, onehotEnc, vectAssem, lr)) + + + // ------------------------------------ + // => Executing training process + // ------------------------------------ + + val model = pipeline.fit(trainingDf) + + + // ------------------------------------ + // => Executing inference process + // ------------------------------------ + + val predictions = model.transform(trainingDf).cache() + + + // ------------------------------------ + // => Explainability + // ------------------------------------ + + val featuresToPredict = trainingDf.orderBy(rand()).limit(5).repartition(200) + + // => Inference + val predictionsToExplain = model.transform(featuresToPredict) + + + // => Explainer setup + // ************************ + + val shapExplainer = new TabularSHAP() + .setInputCols(categoricalFeatures ++ numericFeatures) + .setOutputCol("shapValues") + .setNumSamples(5000) + .setModel(model) + .setTargetCol("probability") + .setTargetClasses(Array(1)) + .setBackgroundData(broadcast(trainingDf.orderBy(rand())).limit(100).cache()) + + // · Explain predictions + val explanationsDf = shapExplainer.transform(predictionsToExplain) + explanationsDf.show() + + // · Predict and explain from features + val explanationsFromFeaturesDf = shapExplainer.transform(featuresToPredict) + explanationsFromFeaturesDf.show() + + + val shapExplainerPipelineModel = new Pipeline().setStages(Array(shapExplainer)).fit(trainingDf) + shapExplainerPipelineModel.save("/tmp/shapExplainerModel") + + +} + + diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/SynapseMlExampleLoad.scala b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/SynapseMlExampleLoad.scala new file mode 100644 index 0000000..fd9073a --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/SynapseMlExampleLoad.scala @@ -0,0 +1,68 @@ +package com.stratio.rocket.poc + +// Spark ML imports +import com.stratio.rocket.ml.serverclient.RocketMlClientBuilder +import org.apache.spark.ml.PipelineModel +import org.apache.spark.ml.feature.{StringIndexer, StringIndexerModel} +import org.apache.spark.sql.functions.rand + +// SynapseML imports +import org.apache.spark.sql.SparkSession + + +object SynapseMlExampleLoad extends App { + + val sparkMasterIp = System.getProperty("spark.master", "local[2]") + val spark = SparkSession + .builder().master("local[2]") + .appName("SynapseMlExample") + .getOrCreate() + val sc = spark.sparkContext + + + // -------------------------------- + // => Load pipelineModel + // -------------------------------- + + val shapExplainerPipelineModel = PipelineModel.load("/tmp/shapExplainerModel") + + + // -------------------------------- + // => Reading input dataset + // -------------------------------- + + val df = spark.read.parquet( + "wasbs://publicwasb@mmlspark.blob.core.windows.net/AdultCensusIncome.parquet" + ).cache() + + + // ------------------------------------ + // => Target engineering + // ------------------------------------ + + val targetColName = "income" + val labelIndexerModel: StringIndexerModel = new StringIndexer() + .setInputCol(targetColName).setOutputCol("label").setStringOrderType("alphabetAsc").fit(df) + val trainingDf = labelIndexerModel.transform(df) + + val featuresToPredict = trainingDf.orderBy(rand()).limit(5).repartition(200) + val explanationsDf = shapExplainerPipelineModel.transform(featuresToPredict) + + + + val rocketMlClient = new RocketMlClientBuilder() + .withRocketURL("http://localhost:9090") + .withSparkSession(spark) + .build() + + rocketMlClient.mlModel.spark.saveModelAsNewAsset( + pipelineModel = shapExplainerPipelineModel, + assetPath = "synapse/fromlocal", + trainDf = featuresToPredict, + trainingTransformedDf = explanationsDf, + evalDf = None + ) + +} + + diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/ShapExplainerModel.scala b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/ShapExplainerModel.scala new file mode 100644 index 0000000..5df9036 --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/ShapExplainerModel.scala @@ -0,0 +1,80 @@ +/* + * © 2017 Stratio Big Data Inc., Sucursal en España. All rights reserved. + * + * This software – including all its source code – contains proprietary information of Stratio Big Data Inc., Sucursal en España and may not be revealed, sold, transferred, modified, distributed or otherwise made available, licensed or sublicensed to third parties; nor reverse engineered, disassembled or decompiled, without express written authorization from Stratio Big Data Inc., Sucursal en España. + */ + +package com.stratio.rocket.poc.plugins + +import com.microsoft.azure.synapse.ml.explainers.TabularSHAP +import com.stratio.rocket.poc.plugins.properties.ValidatingPropertyMap._ +import com.stratio.sparta.sdk.lite.batch.models.{OutputBatchTransformData, ResultBatchData} +import com.stratio.sparta.sdk.lite.validation.ValidationResult +import com.stratio.sparta.sdk.lite.xd.batch.LiteCustomXDBatchTransform +import org.apache.spark.ml.Transformer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.crossdata.XDSession +import org.apache.spark.sql.types.StructType + +import scala.util.Try + +// com.stratio.rocket.poc.plugins.ShapExplainerModel + +class ShapExplainerModel( + xdSession: XDSession, + properties: Map[String, String] + ) extends LiteCustomXDBatchTransform(xdSession, properties) { + + // ------------------------------ + // Input properties + // ------------------------------ + + lazy val shapExplainerPath: Option[String] = + Try(Option(properties.getString("shapExplainerPath"))).getOrElse(None).notBlank + + + // ------------------------------ + // => Validating properties + // ------------------------------ + + override def validate(): ValidationResult = { + var validation = ValidationResult(valid = true, messages = Seq.empty, warnings = Seq.empty) + + if (shapExplainerPath.isEmpty) { + validation = ValidationResult( + valid = false, + messages = validation.messages :+ "'shapExplainerPath' is empty'") + } + validation + } + + + // ------------------------------ + // => Transform logic + // ------------------------------ + + override def transform(inputData: Map[String, ResultBatchData]): OutputBatchTransformData = { + + // => Load TabularSHAP model + val shapExplainerModel = TabularSHAP.load(shapExplainerPath.get) + + // => Getting MLModel == Spark PipelineModel + val pipelineModel: Transformer = shapExplainerModel.getModel + + // => Getting input data + val inputRDD: RDD[Row] = inputData.head._2.data + val inputSchema = inputData.head._2.schema.getOrElse(new StructType()) + val inputDf = xdSession.createDataFrame(inputRDD, inputSchema) + + // => Inference: making predictions + val predictionDf = pipelineModel.transform(inputDf) + + // => Explainability: explain predictions + val explanationsDF = shapExplainerModel.transform(predictionDf) + + // => Output data + OutputBatchTransformData(explanationsDF.rdd, Option(explanationsDF.schema)) + } +} + diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/ShapExplainerTrainerOutput.scala b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/ShapExplainerTrainerOutput.scala new file mode 100644 index 0000000..e6651ce --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/ShapExplainerTrainerOutput.scala @@ -0,0 +1,134 @@ +/* + * © 2017 Stratio Big Data Inc., Sucursal en España. All rights reserved. + * + * This software – including all its source code – contains proprietary information of Stratio Big Data Inc., Sucursal en España and may not be revealed, sold, transferred, modified, distributed or otherwise made available, licensed or sublicensed to third parties; nor reverse engineered, disassembled or decompiled, without express written authorization from Stratio Big Data Inc., Sucursal en España. + */ + +package com.stratio.rocket.poc.plugins + +import com.microsoft.azure.synapse.ml.explainers.TabularSHAP +import com.stratio.rocket.poc.plugins.properties.ValidatingPropertyMap._ +import com.stratio.sparta.sdk.lite.common.models.OutputOptions +import com.stratio.sparta.sdk.lite.validation.ValidationResult +import com.stratio.sparta.sdk.lite.xd.common.LiteCustomXDOutput +import org.apache.spark.ml.PipelineModel +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.crossdata.XDSession +import org.apache.spark.sql.functions.{broadcast, rand} + +import scala.util.Try + + +// TODO - Custom output + +class ShapExplainerTrainerOutput( + xdSession: XDSession, + properties: Map[String, String] + ) extends LiteCustomXDOutput(xdSession, properties) { + + // ------------------------------ + // Input properties + // ------------------------------ + + // PipelineModel Path + lazy val pipelineModelInputPath: Option[String] = + Try(Option(properties.getString("pipelineModelInputPath"))).getOrElse(None).notBlank + + // Path where to save created Shap explainer + lazy val shapExplainerOutputPath: Option[String] = + Try(Option(properties.getString("shapExplainerOutputPath"))).getOrElse(None).notBlank + + // Input columns + lazy val inputColumnsOpt: Option[String] = + Try(Option(properties.getString("inputColumns"))).getOrElse(None).notBlank + lazy val inputColumns: Array[String] = inputColumnsOpt.get.split(",") + + // Target column + lazy val targetColumn: Option[String] = + Try(Option(properties.getString("targetColumn"))).getOrElse(None).notBlank + + // Target classes + lazy val targetClassesOpt: Option[String] = + Try(Option(properties.getString("targetClasses"))).getOrElse(None).notBlank + lazy val targetClasses: Array[Int] = targetClassesOpt.get.split(",").map(_.toInt) + + // Output column + lazy val outputColumn: Option[String] = + Try(Option(properties.getString("outputColumn"))).getOrElse(None).notBlank + + // Num. samples for background data + lazy val numSamplesBackgroundDataOpt: Option[String] = + Try(Option(properties.getString("numSamplesBackgroundData"))).getOrElse(None).notBlank + lazy val numSamplesBackgroundData: Int = numSamplesBackgroundDataOpt.get.toInt + + // Num. samples for internal generator + lazy val numSamplesGeneratorOpt: Option[String] = + Try(Option(properties.getString("numSamplesGenerator"))).getOrElse(None).notBlank + lazy val numSamplesGenerator: Int = numSamplesGeneratorOpt.get.toInt + + lazy val mandatoryProperties: Seq[(Option[String], String)] = Seq( + pipelineModelInputPath -> "pipelineModelInputPath", + shapExplainerOutputPath -> "shapExplainerOutputPath", + inputColumnsOpt -> "inputColumns", + targetColumn -> "targetColumn", + targetClassesOpt -> "targetClasses", + outputColumn -> "outputColumn", + numSamplesBackgroundDataOpt -> "numSamplesBackgroundData", + numSamplesGeneratorOpt -> "numSamplesGenerator" + ) + + // ------------------------------ + // => Validating properties + // ------------------------------ + + private def validateNonEmpty(property: Option[_], propertyName: String, validationResult: ValidationResult): (ValidationResult, Boolean) = { + if (property.isEmpty) { + ValidationResult( + valid = false, messages = validationResult.messages :+ s"'$propertyName' is empty'" + ) -> false + } else { + validationResult -> true + } + } + + override def validate(): ValidationResult = { + var validation = ValidationResult(valid = true, messages = Seq.empty, warnings = Seq.empty) + + for((prop, propName) <- mandatoryProperties){ + val (propValidation, _ ) = validateNonEmpty(prop, propName, validation) + validation = propValidation + } + + validation + } + + + // ------------------------------ + // => Save logic + // ------------------------------ + + override def save(data: DataFrame, outputOptions: OutputOptions): Unit = { + val trainingDf = data + + // => Load SparkML pipelineModel + val pipelineModel = PipelineModel.load(pipelineModelInputPath.get) + + // => Get background data as DataFrame + val backgroudDataDf = broadcast(trainingDf.orderBy(rand()).limit(this.numSamplesBackgroundData).cache()) + + // => Create shapExplainer + val shapExplainer = new TabularSHAP() + .setInputCols(inputColumns) + .setOutputCol(outputColumn.get) + .setNumSamples(numSamplesGenerator) + .setModel(pipelineModel) + .setTargetCol(targetColumn.get) + .setTargetClasses(targetClasses) + .setBackgroundData(backgroudDataDf) + + // => Save shapExplainer + shapExplainer.save(shapExplainerOutputPath.get) + } + + override def save(data: DataFrame, saveMode: String, saveOptions: Map[String, String]): Unit = () +} \ No newline at end of file diff --git a/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/properties/ValidatePropertiesMap.scala b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/properties/ValidatePropertiesMap.scala new file mode 100644 index 0000000..0e3b9bd --- /dev/null +++ b/rocket-ml-extensions/rocket-3.0.0-SDK/synapseML/src/main/scala/com/stratio/rocket/poc/plugins/properties/ValidatePropertiesMap.scala @@ -0,0 +1,30 @@ +/* + * © 2017 Stratio Big Data Inc., Sucursal en España. All rights reserved. + * + * This software – including all its source code – contains proprietary information of Stratio Big Data Inc., Sucursal en España and may not be revealed, sold, transferred, modified, distributed or otherwise made available, licensed or sublicensed to third parties; nor reverse engineered, disassembled or decompiled, without express written authorization from Stratio Big Data Inc., Sucursal en España. + */ +package com.stratio.rocket.poc.plugins.properties + +class ValidatePropertiesMap [K, V](val m: Map[K, V]){ + def getString(key: K): String = + m.get(key) match { + case Some(value: String) => value + case Some(value) => value.toString + case None => + throw new IllegalStateException(s"$key is mandatory") + } + + def notBlank(option: Option[String]): Boolean = + option.map(_.trim).forall(_.isEmpty) +} + +class NotBlankOption(s: Option[String]) { + def notBlank: Option[String] = s.map(_.trim).filterNot(_.isEmpty) +} + +object ValidatingPropertyMap{ + implicit def map2ValidatingPropertyMap[K, V](m: Map[K, V]): ValidatePropertiesMap[K, V] = + new ValidatePropertiesMap[K, V](m) + + implicit def option2NotBlankOption(s: Option[String]): NotBlankOption = new NotBlankOption(s) +} \ No newline at end of file