From 7e630da342859aa762033e5a363782440d3f75d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brais=20L=C3=B3pez=20Chao?= Date: Wed, 11 Sep 2024 16:26:17 +0200 Subject: [PATCH] Iceberg Reader Test with Batch --- .../data/mapper/app/MetabolicReader.scala | 26 ++++++++------- .../data/mapper/app/FileFormatsIT.scala | 8 ++--- .../data/mapper/app/MetabolicReaderIT.scala | 32 +++++++++++++++++-- 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala index 0bbffe6..3609ec2 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala @@ -27,15 +27,6 @@ object MetabolicReader extends Logging { private def readSource(source: Source, mode: EngineMode, spark: SparkSession, enableJDBC: Boolean, queryOutputLocation: String, jobName: String) = { source match { - case streamSource: StreamSource => { - logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}") - - streamSource.format match { - case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, jobName) - .read(spark, mode) - } - } - case fileSource: FileSource => { logger.info(s"Reading file source ${fileSource.name} from ${fileSource.inputPath}") @@ -55,11 +46,22 @@ object MetabolicReader extends Logging { } } - case meta: TableSource => { - logger.info(s"Reading source ${meta.fqn} already in metastore") + case table: TableSource => { + logger.info(s"Reading source ${table.fqn} already in metastore") - new IcebergReader(meta.fqn).read(spark, mode) + //TODO: reader repending of type of table + new IcebergReader(table.fqn).read(spark, mode) } + + case streamSource: StreamSource => { + logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}") + + streamSource.format match { + case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, jobName) + .read(spark, mode) + } + } + } } diff --git a/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala b/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala index 2d8e279..5205254 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/FileFormatsIT.scala @@ -18,13 +18,13 @@ class FileFormatsIT extends AnyFunSuite with RegionedTest { override def conf: SparkConf = super.conf - .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .set("spark.sql.catalog.spark_catalog.type", "hive") .set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.local.type", "hadoop") .set("spark.sql.catalog.local.warehouse", "src/test/tmp/it_formats") - .set("spark.sql.defaultCatalog", "local") + .set("spark.sql.defaultCatalog", "spark_catalog") test("Write parquet") { @@ -134,7 +134,7 @@ class FileFormatsIT extends AnyFunSuite StructType(someSchema) ) - inputEmployeesDF.write.saveAsTable("fake_json_employees") + inputEmployeesDF.write.mode("overwrite").saveAsTable("fake_json_employees") print(spark.catalog.tableExists("fake_json_employees")) diff --git a/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala b/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala index 2d6986d..b632661 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite - +import com.metabolic.data.mapper.domain.ops.source._ class MetabolicReaderIT extends AnyFunSuite with DataFrameSuiteBase @@ -20,11 +20,16 @@ class MetabolicReaderIT extends AnyFunSuite with RegionedTest { override def conf: SparkConf = super.conf - .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hive") .set("spark.databricks.delta.optimize.repartition.enabled","true") .set("spark.databricks.delta.vacuum.parallelDelete.enabled","true") .set("spark.databricks.delta.retentionDurationCheck.enabled","false") + .set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.local.type", "hadoop") + .set("spark.sql.catalog.local.warehouse", "src/test/tmp/it_formats") + .set("spark.sql.defaultCatalog", "spark_catalog") def getFakeEmployeesDataframe(): DataFrame = { @@ -169,5 +174,28 @@ class MetabolicReaderIT extends AnyFunSuite } + test("Reader Table Batch") { + + val fqn = "local.data_lake.fake_employee_delta" + val tableName = "fake_employee_delta" + + val expected = getFakeEmployeesDataframe() + + expected + .write + .format("iceberg") + .mode("overwrite") + .saveAsTable(fqn) + + val source = TableSource(fqn,tableName) + + MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark) + + val result = spark.table(tableName) + + assertDataFrameNoOrderEquals(expected, result) + + } + }