diff --git a/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala b/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala index 888506e..2d8e279 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala @@ -23,7 +23,7 @@ class FileFormatsIT extends AnyFunSuite .set("spark.sql.catalog.spark_catalog.type", "hive") .set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.local.type", "hadoop") - .set("spark.sql.catalog.local.warehouse", "./warehouse") + .set("spark.sql.catalog.local.warehouse", "src/test/tmp/it_formats") .set("spark.sql.defaultCatalog", "local") test("Write parquet") { @@ -115,52 +115,6 @@ class FileFormatsIT extends AnyFunSuite ) } - ignore("Write delta") { - - val sqlCtx = sqlContext - - val fakeEmployeesData = Seq( - Row("Marc", 33, 1), - Row("Pau", 30, 1) - ) - - val someSchema = List( - StructField("name", StringType, true), - StructField("age", IntegerType, true), - StructField("version", IntegerType, true) - ) - - val fakeEmployeesDF = spark.createDataFrame( - spark.sparkContext.parallelize(fakeEmployeesData), - StructType(someSchema) - ) - - fakeEmployeesDF.write - .mode("overwrite") - .format("delta") - .save("src/test/tmp_formats/fake_delta_employees") - - val multilineSQL = """|select * - |from employees""".stripMargin - - val testingConfig = Config( - "", - List(io.FileSource("src/test/tmp_formats/fake_delta_employees", "employees", format = IOFormat.DELTA)), - List(SQLStatmentMapping(multilineSQL)), - io.FileSink("test", "src/test/tmp_formats/il_fake_delta_employees_t", WriteMode.Overwrite, IOFormat.DELTA, - Option("name"), Option("age")) - ) - - MetabolicApp() - .transformAll(List(testingConfig))(region, spark) - - val NT_fakeEmployeesDF = spark.read.format("delta") - .load("src/test/tmp_formats/il_fake_delta_employees_t") - fakeEmployeesDF.show() - NT_fakeEmployeesDF.show() - assertDataFrameNoOrderEquals(fakeEmployeesDF, NT_fakeEmployeesDF) - } - test("Read json table") { val sqlCtx = sqlContext @@ -238,7 +192,7 @@ class FileFormatsIT extends AnyFunSuite StructType(inputSchema) ) - val fqnInput = "local.data_lake.employees" + val fqnInput = "local.data_lake.iceberg_employees" inputDf .write @@ -257,7 +211,7 @@ class FileFormatsIT extends AnyFunSuite StructField("age", LongType, true) ) - val fqnOutput = "local.data_lake.employees_long" + val fqnOutput = "local.data_lake.iceberg_employees_long" val multilineSQL = "select name, cast(age as long) as age from employees" @@ -285,4 +239,63 @@ class FileFormatsIT extends AnyFunSuite ) } + test("Write delta") { + + val inputEmployeesData = Seq( + Row("Marc", "33"), + Row("Pau", "30") + ) + + val inputSchema = List( + StructField("name", StringType, true), + StructField("age", StringType, true) + ) + + val inputDf = spark.createDataFrame( + spark.sparkContext.parallelize(inputEmployeesData), + StructType(inputSchema) + ) + + val inputPath = "src/test/tmp/it_formats/data_lake/delta_employees" + + inputDf + .write + .mode("overwrite") + .format("delta") + .save(inputPath) + + + val expectedEmployeesData = Seq( + Row("Marc", 33L), + Row("Pau", 30L) + ) + + val expectedSchema = List( + StructField("name", StringType, true), + StructField("age", LongType, true) + ) + + val outputPath = "src/test/tmp/it_formats/data_lake/delta_employees_long" + val multilineSQL = "select name, cast(age as long) as age from employees" + + val testingConfig = Config( + "", + List(FileSource(inputPath, "employees", format = IOFormat.DELTA)), + List(SQLStatmentMapping(multilineSQL)), + FileSink("test", outputPath, WriteMode.Overwrite, IOFormat.DELTA, Option("name"), Option("age"), ops = List.empty) + ) + + MetabolicApp() + .transformAll(List(testingConfig))(region, spark) + + val expectedDf = spark.createDataFrame( + spark.sparkContext.parallelize(expectedEmployeesData), + StructType(expectedSchema) + ) + + val outputDf = spark.read.format("delta") .load(outputPath) + + assertDataFrameNoOrderEquals(expectedDf, outputDf) + } + }