diff --git a/src/main/scala/com/metabolic/data/core/services/spark/reader/table/IcebergReader.scala b/src/main/scala/com/metabolic/data/core/services/spark/reader/table/GenericReader.scala similarity index 69% rename from src/main/scala/com/metabolic/data/core/services/spark/reader/table/IcebergReader.scala rename to src/main/scala/com/metabolic/data/core/services/spark/reader/table/GenericReader.scala index 69a7dc6..ce4b337 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/reader/table/IcebergReader.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/reader/table/GenericReader.scala @@ -3,19 +3,16 @@ package com.metabolic.data.core.services.spark.reader.table import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader import org.apache.spark.sql.{DataFrame, SparkSession} -import scala.util.Random - -class IcebergReader(fqn : String) extends DataframeUnifiedReader { +class GenericReader(fqn: String) extends DataframeUnifiedReader { override val input_identifier: String = fqn override def readBatch(spark: SparkSession): DataFrame = { - spark.table(fqn) + spark.table(input_identifier) } override def readStream(spark: SparkSession): DataFrame = { - spark.readStream - .table(input_identifier) + spark.readStream.table(input_identifier) } -} \ No newline at end of file +} diff --git a/src/main/scala/com/metabolic/data/core/services/spark/reader/table/TableReader.scala b/src/main/scala/com/metabolic/data/core/services/spark/reader/table/TableReader.scala index 8f4a243..5782146 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/reader/table/TableReader.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/reader/table/TableReader.scala @@ -20,6 +20,7 @@ class TableReader(fqn : String, enableJDBC: Boolean, queryOutputLocation: String .option("S3OutputLocation", s"${queryOutputLocation}/${input_identifier}-${Random.nextInt(100000)}") .load() }else { + // Read table from catalog spark.read .table(input_identifier) } diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala index 3609ec2..dc58a72 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala @@ -3,7 +3,7 @@ package com.metabolic.data.mapper.app import com.metabolic.data.core.services.spark.filter.{DateComponentsFromReader, DateComponentsUpToReader, DateFieldFromReader, DateFieldUpToReader} import com.metabolic.data.core.services.spark.reader.file.{CSVReader, DeltaReader, JSONReader, ParquetReader} import com.metabolic.data.core.services.spark.reader.stream.KafkaReader -import com.metabolic.data.core.services.spark.reader.table.{IcebergReader, TableReader} +import com.metabolic.data.core.services.spark.reader.table.{GenericReader, TableReader} import com.metabolic.data.core.services.spark.transformations._ import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode import com.metabolic.data.mapper.domain.io._ @@ -16,7 +16,7 @@ object MetabolicReader extends Logging { def read(source: Source, historical: Boolean, mode: EngineMode, enableJDBC: Boolean, queryOutputLocation: String, jobName: String)(implicit spark: SparkSession) = { - val input = readSource(source, mode, spark, enableJDBC, queryOutputLocation, jobName) + val input: DataFrame = readSource(source, mode, spark, enableJDBC, queryOutputLocation, jobName) val prepared = prepareSource(source, historical, input) @@ -47,10 +47,11 @@ object MetabolicReader extends Logging { } case table: TableSource => { - logger.info(s"Reading source ${table.fqn} already in metastore") + logger.info(s"Reading table source ${table.fqn}") + + //TODO: reader depending of type of table + new GenericReader(table.fqn).read(spark, mode) - //TODO: reader repending of type of table - new IcebergReader(table.fqn).read(spark, mode) } case streamSource: StreamSource => { diff --git a/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala b/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala index aa0ee1b..1fb7741 100644 --- a/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala +++ b/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala @@ -1,6 +1,6 @@ package com.metabolic.data.mapper.domain.io -import IOFormat.{DELTA, IOFormat} +import com.metabolic.data.mapper.domain.io.IOFormat.{DELTA, IOFormat} import com.metabolic.data.mapper.domain.ops.SourceOp case class FileSource(inputPath: String, @@ -8,5 +8,4 @@ case class FileSource(inputPath: String, format: IOFormat = DELTA, useStringPrimitives: Boolean = false, ops: Seq[SourceOp] = Seq.empty - ) - extends Source + ) extends Source diff --git a/src/main/scala/com/metabolic/data/mapper/domain/io/TableSource.scala b/src/main/scala/com/metabolic/data/mapper/domain/io/TableSource.scala index 9327850..cf2b8f2 100644 --- a/src/main/scala/com/metabolic/data/mapper/domain/io/TableSource.scala +++ b/src/main/scala/com/metabolic/data/mapper/domain/io/TableSource.scala @@ -3,7 +3,7 @@ package com.metabolic.data.mapper.domain.io import com.metabolic.data.mapper.domain.ops.SourceOp case class TableSource( - fqn: String, - name: String, - ops: Seq[SourceOp] = Seq.empty - ) extends Source + fqn: String, + name: String, + ops: Seq[SourceOp] = Seq.empty + ) extends Source diff --git a/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala b/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala index 9ddd891..24be48b 100644 --- a/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala +++ b/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala @@ -23,7 +23,7 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars case IOFormat.KAFKA => parseKafkaSource(name, config, ops) - case IOFormat.TABLE => parseMetastoreSource(name, config, ops) + case IOFormat.TABLE => parseTableSource(name, config, ops) } } @@ -79,7 +79,7 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars StreamSource(name, servers, apiKey, apiSecret, topic, IOFormat.KAFKA, ops) } - private def parseMetastoreSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { + private def parseTableSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { val fqdn = config.getString("catalog") TableSource(fqdn, name, ops) diff --git a/src/test/scala/com/metabolic/data/core/services/spark/reader/IcebergReaderTest.scala b/src/test/scala/com/metabolic/data/core/services/spark/reader/IcebergReaderTest.scala index 68ab105..0e16bcc 100644 --- a/src/test/scala/com/metabolic/data/core/services/spark/reader/IcebergReaderTest.scala +++ b/src/test/scala/com/metabolic/data/core/services/spark/reader/IcebergReaderTest.scala @@ -1,7 +1,7 @@ package com.metabolic.data.core.services.spark.reader import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext} -import com.metabolic.data.core.services.spark.reader.table.IcebergReader +import com.metabolic.data.core.services.spark.reader.table.GenericReader import com.metabolic.data.mapper.domain.io.EngineMode import org.apache.spark.SparkConf import org.apache.spark.sql.Row @@ -59,7 +59,7 @@ class IcebergReaderTest extends AnyFunSuite .mode("overwrite") .saveAsTable(fqn) - val iceberg = new IcebergReader(fqn) + val iceberg = new GenericReader(fqn) val inputDf = iceberg.read(spark, EngineMode.Batch) assertDataFrameEquals(inputDf, expectedDf) diff --git a/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala b/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala index b632661..0d08950 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala @@ -12,6 +12,7 @@ import org.apache.spark.sql.types._ import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite import com.metabolic.data.mapper.domain.ops.source._ +import org.apache.derby.impl.sql.compile.TableName class MetabolicReaderIT extends AnyFunSuite with DataFrameSuiteBase @@ -33,8 +34,6 @@ class MetabolicReaderIT extends AnyFunSuite def getFakeEmployeesDataframe(): DataFrame = { - val sqlCtx = sqlContext - val fakeEmployeesData = Seq( Row("Marc", 33L, 1), Row("Pau", 30L, 1) @@ -170,14 +169,20 @@ class MetabolicReaderIT extends AnyFunSuite val result = spark.table(tableName) + + print("hola") + print(result.toString()) + print(MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark)) + assertDataFrameNoOrderEquals(expected, result) } - test("Reader Table Batch") { + test("Reader Table Iceberg Batch") { - val fqn = "local.data_lake.fake_employee_delta" + val catalog = "local.data_lake" val tableName = "fake_employee_delta" + val fqn = catalog + "." + tableName val expected = getFakeEmployeesDataframe() @@ -197,5 +202,38 @@ class MetabolicReaderIT extends AnyFunSuite } + // This test is not doing a full comparison + ignore("Reader Table Delta Batch") { + + val inputPath = "src/test/tmp/fake_employee_delta" + val catalog = "default" + val tableName = "fake_employee_delta" + + val expected = getFakeEmployeesDataframe() + + expected + .write + .format("delta") + .mode("overwrite") + .option("mergeSchema", "true") + .save(inputPath) + + spark.sql(s""" + CREATE TABLE default.fake_employee_delta + USING DELTA + LOCATION '$inputPath' + """) + + val fqn = catalog + "." + tableName + + val source = TableSource(fqn,tableName) + + MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark) + + val result = spark.table(fqn) + + assertDataFrameNoOrderEquals(expected, result) + + } }