Skip to content

Commit

Permalink
Fixes delta test
Browse files Browse the repository at this point in the history
  • Loading branch information
margon8 committed Mar 19, 2024
1 parent d34ac0d commit 7a655d9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class GlueCrawlerAction extends AfterAction with Logging {
val options = config.environment

val region = options.region
val name = s"${options.name} EM ${config.name}"
val crawlerName = s"${options.name} EM ${config.name}"

val dbName = options.dbName
val iamRole = options.iamRole
Expand All @@ -28,11 +28,11 @@ class GlueCrawlerAction extends AfterAction with Logging {
case sink: FileSink =>
sink.format match {
case com.metabolic.data.mapper.domain.io.IOFormat.CSV =>
runCrawler(config, options, name, dbName, iamRole, glue, sink)
runCrawler(config, options, crawlerName, dbName, iamRole, glue, sink)
case com.metabolic.data.mapper.domain.io.IOFormat.PARQUET =>
runCrawler(config, options, name, dbName, iamRole, glue, sink)
runCrawler(config, options, crawlerName, dbName, iamRole, glue, sink)
case com.metabolic.data.mapper.domain.io.IOFormat.JSON =>
runCrawler(config, options, name, dbName, iamRole, glue, sink)
runCrawler(config, options, crawlerName, dbName, iamRole, glue, sink)
case com.metabolic.data.mapper.domain.io.IOFormat.DELTA =>
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.name} for DeltaSink")
case com.metabolic.data.mapper.domain.io.IOFormat.DELTA_PARTITION =>
Expand All @@ -43,17 +43,17 @@ class GlueCrawlerAction extends AfterAction with Logging {
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.name} for DeltaSink")
}
case _ =>
logger.warn(f"After Action: Skipping $name for ${config.name} as it is not a FileSink")
logger.warn(f"After Action: Skipping $crawlerName for ${config.name} as it is not a FileSink")
}

}

private def runCrawler(config: Config, options: Environment, name: String, dbName: String, iamRole: String, glue: GlueCrawlerService, sink: FileSink): Unit = {
private def runCrawler(config: Config, options: Environment, crawlerName: String, dbName: String, iamRole: String, glue: GlueCrawlerService, sink: FileSink): Unit = {
val s3Path = sink.path.replaceAll("version=\\d+", "")
val prefix = ConfigUtilsService.getTablePrefix(options.namespaces, s3Path)

logger.info(f"After Action $name: Running Glue Crawler for ${config.name}")

glue.createAndRunCrawler(iamRole, Seq(s3Path), dbName, name, prefix)
glue.createAndRunCrawler(iamRole, Seq(s3Path), dbName, crawlerName, prefix)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class GlueCrawlerService(implicit val region: Regions) extends Logging {
.build


def createAndRunCrawler(iam: String, s3Paths: Seq[String], dbName: String, crawlerName: String,
prefix: String): Unit = {
def createAndRunCrawler(iam: String, s3Paths: Seq[String], dbName: String, crawlerName: String, prefix: String): Unit = {

try {
createCrawler(iam, s3Paths, dbName, crawlerName, prefix)
runCrawler(crawlerName)
Expand Down Expand Up @@ -56,6 +56,7 @@ class GlueCrawlerService(implicit val region: Regions) extends Logging {

private def createCrawler(iam: String, s3Paths: Seq[String], dbName: String, crawlerName: String, prefix: String) = {


val s3TargetList = s3Paths
.map {
new S3Target().withPath(_)
Expand All @@ -80,7 +81,6 @@ class GlueCrawlerService(implicit val region: Regions) extends Logging {
.withRole(iam)
.withRecrawlPolicy(recrawlPolicy)
.withSchemaChangePolicy(schemaChangePolicy)
.withTags(Map("Owner" -> "Data", "Environment" -> environment).asJava)

glueClient.createCrawler(crawlerCreateRequest)
logger.info(crawlerName + " was successfully created")
Expand Down
125 changes: 0 additions & 125 deletions src/test/scala/DeltaOptimizeTest.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,47 @@ class DeltaWriterTest extends AnyFunSuite
assertDataFrameNoOrderEquals(expectedDF, outputDf)
}

test("Tests Delta Optimize Batch") {
val path = "src/test/tmp/delta/letters_optimize"
val sqlCtx = sqlContext

val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(someSchema)
)

//Create table
val emptyRDD = spark.sparkContext.emptyRDD[Row]
val emptyDF = spark.createDataFrame(emptyRDD, inputDF.schema)
emptyDF
.write
.format("delta")
.mode(SaveMode.Append)
.save(path)

val firstWriter = new DeltaWriter(
path,
WriteMode.Overwrite,
Option("date"),
Option("name"),
"default",
"",
Seq.empty[String],
0)(region, spark)


firstWriter.write(inputDF, EngineMode.Batch)

val deltaTable = DeltaTable.forPath(path)

//Get last operation
val lastChange = deltaTable.history(1)
val operation = lastChange.head().getAs[String]("operation")

assert(operation == "VACUUM END")

}

}


0 comments on commit 7a655d9

Please sign in to comment.