Skip to content

Commit

Permalink
emd
Browse files Browse the repository at this point in the history
  • Loading branch information
berna396 committed Jan 24, 2024
1 parent 67ddfc7 commit 116b326
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 45 deletions.
76 changes: 72 additions & 4 deletions src/test/scala/DeltaOptimizeTest.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,48 @@
import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import com.metabolic.data.RegionedTest
import com.metabolic.data.core.services.spark.writer.file.DeltaWriter
import com.metabolic.data.mapper.app.MetabolicApp
import org.apache.spark.sql.SparkSession
import com.metabolic.data.mapper.domain.io.{EngineMode, WriteMode}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.scalatest.funsuite.AnyFunSuite
import io.delta.tables._
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

class DeltaOptimizeTest
extends AnyFunSuite
with DataFrameSuiteBase
with SharedSparkContext {
with SharedSparkContext
with RegionedTest {


override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.set("spark.databricks.delta.retentionDurationCheck.enabled","false")

test("Delta Optimze") {

val inputData = Seq(
Row("A", "a", 2022, 2, 5, "2022-02-05"),
Row("B", "b", 2022, 2, 4, "2022-02-04"),
Row("C", "c", 2022, 2, 3, "2022-02-03"),
Row("D", "d", 2022, 2, 2, "2022-02-02"),
Row("E", "e", 2022, 2, 1, "2022-02-01"),
Row("F", "f", 2022, 1, 5, "2022-01-05"),
Row("G", "g", 2021, 2, 2, "2021-02-02"),
Row("H", "h", 2020, 2, 5, "2020-02-05")
)

val someSchema = List(
StructField("name", StringType, true),
StructField("data", StringType, true),
StructField("yyyy", IntegerType, true),
StructField("mm", IntegerType, true),
StructField("dd", IntegerType, true),
StructField("date", StringType, true),
)

ignore("Delta Optimize") {

val pathToTable = "src/test/tmp/delta/letters_overwrite"

Expand Down Expand Up @@ -52,6 +78,48 @@ class DeltaOptimizeTest
// If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `where`
//deltaTable.optimize().where("date='2021-11-18'").executeCompaction()

//deltaTable.vacuum(0.0)
deltaTable.vacuum(0.0)
}

test("Tests Delta Optimize Batch") {
val path = "src/test/tmp/delta/letters_optimize"
val sqlCtx = sqlContext

val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(someSchema)
)

//Create table
val emptyRDD = spark.sparkContext.emptyRDD[Row]
val emptyDF = spark.createDataFrame(emptyRDD, inputDF.schema)
emptyDF
.write
.format("delta")
.mode(SaveMode.Append)
.save(path)

val firstWriter = new DeltaWriter(
path,
WriteMode.Overwrite,
Option("date"),
Option("name"),
"default",
"",
Seq.empty[String],
0)(region, spark)


firstWriter.write(inputDF, EngineMode.Batch)

val deltaTable = DeltaTable.forPath(path)

//Get last operation
val lastChange = deltaTable.history(1)
val operation = lastChange.head().getAs[String]("operation")

assert(operation == "OPTIMIZE")

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -621,47 +621,6 @@ class DeltaWriterTest extends AnyFunSuite
assertDataFrameNoOrderEquals(expectedDF, outputDf)
}

test("Tests Delta Optimize Batch") {
val path = "src/test/tmp/delta/letters_optimize"
val sqlCtx = sqlContext

val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(someSchema)
)

//Create table
val emptyRDD = spark.sparkContext.emptyRDD[Row]
val emptyDF = spark.createDataFrame(emptyRDD, inputDF.schema)
emptyDF
.write
.format("delta")
.mode(SaveMode.Append)
.save(path)

val firstWriter = new DeltaWriter(
path,
WriteMode.Overwrite,
Option("date"),
Option("name"),
"default",
"",
Seq.empty[String],
0)(region, spark)


firstWriter.write(inputDF, EngineMode.Batch)

val deltaTable = DeltaTable.forPath(path)

//Get last operation
val lastChange = deltaTable.history(1)
val operation = lastChange.head().getAs[String]("operation")

assert(operation == "OPTIMIZE")

}

}


0 comments on commit 116b326

Please sign in to comment.