Skip to content

Commit

Permalink
Generic Iceberg Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Sep 11, 2024
1 parent 7e630da commit 48e3cba
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@ package com.metabolic.data.core.services.spark.reader.table
import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.util.Random

class IcebergReader(fqn : String) extends DataframeUnifiedReader {
class GenericReader(fqn: String) extends DataframeUnifiedReader {

override val input_identifier: String = fqn

override def readBatch(spark: SparkSession): DataFrame = {
spark.table(fqn)
spark.table(input_identifier)
}

override def readStream(spark: SparkSession): DataFrame = {
spark.readStream
.table(input_identifier)
spark.readStream.table(input_identifier)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class TableReader(fqn : String, enableJDBC: Boolean, queryOutputLocation: String
.option("S3OutputLocation", s"${queryOutputLocation}/${input_identifier}-${Random.nextInt(100000)}")
.load()
}else {
// Read table from catalog
spark.read
.table(input_identifier)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.metabolic.data.mapper.app
import com.metabolic.data.core.services.spark.filter.{DateComponentsFromReader, DateComponentsUpToReader, DateFieldFromReader, DateFieldUpToReader}
import com.metabolic.data.core.services.spark.reader.file.{CSVReader, DeltaReader, JSONReader, ParquetReader}
import com.metabolic.data.core.services.spark.reader.stream.KafkaReader
import com.metabolic.data.core.services.spark.reader.table.{IcebergReader, TableReader}
import com.metabolic.data.core.services.spark.reader.table.{GenericReader, TableReader}
import com.metabolic.data.core.services.spark.transformations._
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.io._
Expand All @@ -16,7 +16,7 @@ object MetabolicReader extends Logging {

def read(source: Source, historical: Boolean, mode: EngineMode, enableJDBC: Boolean, queryOutputLocation: String, jobName: String)(implicit spark: SparkSession) = {

val input = readSource(source, mode, spark, enableJDBC, queryOutputLocation, jobName)
val input: DataFrame = readSource(source, mode, spark, enableJDBC, queryOutputLocation, jobName)

val prepared = prepareSource(source, historical, input)

Expand Down Expand Up @@ -47,10 +47,11 @@ object MetabolicReader extends Logging {
}

case table: TableSource => {
logger.info(s"Reading source ${table.fqn} already in metastore")
logger.info(s"Reading table source ${table.fqn}")

//TODO: reader depending of type of table
new GenericReader(table.fqn).read(spark, mode)

//TODO: reader repending of type of table
new IcebergReader(table.fqn).read(spark, mode)
}

case streamSource: StreamSource => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.metabolic.data.mapper.domain.io

import IOFormat.{DELTA, IOFormat}
import com.metabolic.data.mapper.domain.io.IOFormat.{DELTA, IOFormat}
import com.metabolic.data.mapper.domain.ops.SourceOp

case class FileSource(inputPath: String,
name: String,
format: IOFormat = DELTA,
useStringPrimitives: Boolean = false,
ops: Seq[SourceOp] = Seq.empty
)
extends Source
) extends Source
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.metabolic.data.mapper.domain.io
import com.metabolic.data.mapper.domain.ops.SourceOp

case class TableSource(
fqn: String,
name: String,
ops: Seq[SourceOp] = Seq.empty
) extends Source
fqn: String,
name: String,
ops: Seq[SourceOp] = Seq.empty
) extends Source
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars

case IOFormat.KAFKA => parseKafkaSource(name, config, ops)

case IOFormat.TABLE => parseMetastoreSource(name, config, ops)
case IOFormat.TABLE => parseTableSource(name, config, ops)
}
}

Expand Down Expand Up @@ -79,7 +79,7 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars
StreamSource(name, servers, apiKey, apiSecret, topic, IOFormat.KAFKA, ops)
}

private def parseMetastoreSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = {
private def parseTableSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = {
val fqdn = config.getString("catalog")

TableSource(fqdn, name, ops)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.metabolic.data.core.services.spark.reader

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import com.metabolic.data.core.services.spark.reader.table.IcebergReader
import com.metabolic.data.core.services.spark.reader.table.GenericReader
import com.metabolic.data.mapper.domain.io.EngineMode
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -59,7 +59,7 @@ class IcebergReaderTest extends AnyFunSuite
.mode("overwrite")
.saveAsTable(fqn)

val iceberg = new IcebergReader(fqn)
val iceberg = new GenericReader(fqn)
val inputDf = iceberg.read(spark, EngineMode.Batch)

assertDataFrameEquals(inputDf, expectedDf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.spark.sql.types._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import com.metabolic.data.mapper.domain.ops.source._
import org.apache.derby.impl.sql.compile.TableName

class MetabolicReaderIT extends AnyFunSuite
with DataFrameSuiteBase
Expand All @@ -33,8 +34,6 @@ class MetabolicReaderIT extends AnyFunSuite

def getFakeEmployeesDataframe(): DataFrame = {

val sqlCtx = sqlContext

val fakeEmployeesData = Seq(
Row("Marc", 33L, 1),
Row("Pau", 30L, 1)
Expand Down Expand Up @@ -170,14 +169,20 @@ class MetabolicReaderIT extends AnyFunSuite

val result = spark.table(tableName)


print("hola")
print(result.toString())
print(MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark))

assertDataFrameNoOrderEquals(expected, result)

}

test("Reader Table Batch") {
test("Reader Table Iceberg Batch") {

val fqn = "local.data_lake.fake_employee_delta"
val catalog = "local.data_lake"
val tableName = "fake_employee_delta"
val fqn = catalog + "." + tableName

val expected = getFakeEmployeesDataframe()

Expand All @@ -197,5 +202,38 @@ class MetabolicReaderIT extends AnyFunSuite

}

// This test is not doing a full comparison
ignore("Reader Table Delta Batch") {

val inputPath = "src/test/tmp/fake_employee_delta"
val catalog = "default"
val tableName = "fake_employee_delta"

val expected = getFakeEmployeesDataframe()

expected
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.save(inputPath)

spark.sql(s"""
CREATE TABLE default.fake_employee_delta
USING DELTA
LOCATION '$inputPath'
""")

val fqn = catalog + "." + tableName

val source = TableSource(fqn,tableName)

MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark)

val result = spark.table(fqn)

assertDataFrameNoOrderEquals(expected, result)

}

}

0 comments on commit 48e3cba

Please sign in to comment.