Skip to content

Commit

Permalink
Working delta+iceberg side by side
Browse files Browse the repository at this point in the history
  • Loading branch information
margon8 committed Sep 10, 2024
1 parent 5a2645a commit a167fa9
Showing 1 changed file with 62 additions and 49 deletions.
111 changes: 62 additions & 49 deletions src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class FileFormatsIT extends AnyFunSuite
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "./warehouse")
.set("spark.sql.catalog.local.warehouse", "src/test/tmp/it_formats")
.set("spark.sql.defaultCatalog", "local")

test("Write parquet") {
Expand Down Expand Up @@ -115,52 +115,6 @@ class FileFormatsIT extends AnyFunSuite
)
}

ignore("Write delta") {

val sqlCtx = sqlContext

val fakeEmployeesData = Seq(
Row("Marc", 33, 1),
Row("Pau", 30, 1)
)

val someSchema = List(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("version", IntegerType, true)
)

val fakeEmployeesDF = spark.createDataFrame(
spark.sparkContext.parallelize(fakeEmployeesData),
StructType(someSchema)
)

fakeEmployeesDF.write
.mode("overwrite")
.format("delta")
.save("src/test/tmp_formats/fake_delta_employees")

val multilineSQL = """|select *
|from employees""".stripMargin

val testingConfig = Config(
"",
List(io.FileSource("src/test/tmp_formats/fake_delta_employees", "employees", format = IOFormat.DELTA)),
List(SQLStatmentMapping(multilineSQL)),
io.FileSink("test", "src/test/tmp_formats/il_fake_delta_employees_t", WriteMode.Overwrite, IOFormat.DELTA,
Option("name"), Option("age"))
)

MetabolicApp()
.transformAll(List(testingConfig))(region, spark)

val NT_fakeEmployeesDF = spark.read.format("delta")
.load("src/test/tmp_formats/il_fake_delta_employees_t")
fakeEmployeesDF.show()
NT_fakeEmployeesDF.show()
assertDataFrameNoOrderEquals(fakeEmployeesDF, NT_fakeEmployeesDF)
}

test("Read json table") {

val sqlCtx = sqlContext
Expand Down Expand Up @@ -238,7 +192,7 @@ class FileFormatsIT extends AnyFunSuite
StructType(inputSchema)
)

val fqnInput = "local.data_lake.employees"
val fqnInput = "local.data_lake.iceberg_employees"

inputDf
.write
Expand All @@ -257,7 +211,7 @@ class FileFormatsIT extends AnyFunSuite
StructField("age", LongType, true)
)

val fqnOutput = "local.data_lake.employees_long"
val fqnOutput = "local.data_lake.iceberg_employees_long"

val multilineSQL = "select name, cast(age as long) as age from employees"

Expand Down Expand Up @@ -285,4 +239,63 @@ class FileFormatsIT extends AnyFunSuite
)
}

test("Write delta") {

val inputEmployeesData = Seq(
Row("Marc", "33"),
Row("Pau", "30")
)

val inputSchema = List(
StructField("name", StringType, true),
StructField("age", StringType, true)
)

val inputDf = spark.createDataFrame(
spark.sparkContext.parallelize(inputEmployeesData),
StructType(inputSchema)
)

val inputPath = "src/test/tmp/it_formats/data_lake/delta_employees"

inputDf
.write
.mode("overwrite")
.format("delta")
.save(inputPath)


val expectedEmployeesData = Seq(
Row("Marc", 33L),
Row("Pau", 30L)
)

val expectedSchema = List(
StructField("name", StringType, true),
StructField("age", LongType, true)
)

val outputPath = "src/test/tmp/it_formats/data_lake/delta_employees_long"
val multilineSQL = "select name, cast(age as long) as age from employees"

val testingConfig = Config(
"",
List(FileSource(inputPath, "employees", format = IOFormat.DELTA)),
List(SQLStatmentMapping(multilineSQL)),
FileSink("test", outputPath, WriteMode.Overwrite, IOFormat.DELTA, Option("name"), Option("age"), ops = List.empty)
)

MetabolicApp()
.transformAll(List(testingConfig))(region, spark)

val expectedDf = spark.createDataFrame(
spark.sparkContext.parallelize(expectedEmployeesData),
StructType(expectedSchema)
)

val outputDf = spark.read.format("delta") .load(outputPath)

assertDataFrameNoOrderEquals(expectedDf, outputDf)
}

}

0 comments on commit a167fa9

Please sign in to comment.