Skip to content

Commit

Permalink
Iceberg Reader Test with Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Sep 11, 2024
1 parent e7e976c commit 7e630da
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
26 changes: 14 additions & 12 deletions src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@ object MetabolicReader extends Logging {
private def readSource(source: Source, mode: EngineMode, spark: SparkSession, enableJDBC: Boolean, queryOutputLocation: String, jobName: String) = {
source match {

case streamSource: StreamSource => {
logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}")

streamSource.format match {
case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, jobName)
.read(spark, mode)
}
}

case fileSource: FileSource => {
logger.info(s"Reading file source ${fileSource.name} from ${fileSource.inputPath}")

Expand All @@ -55,11 +46,22 @@ object MetabolicReader extends Logging {
}
}

case meta: TableSource => {
logger.info(s"Reading source ${meta.fqn} already in metastore")
case table: TableSource => {
logger.info(s"Reading source ${table.fqn} already in metastore")

new IcebergReader(meta.fqn).read(spark, mode)
//TODO: reader repending of type of table
new IcebergReader(table.fqn).read(spark, mode)
}

case streamSource: StreamSource => {
logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}")

streamSource.format match {
case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, jobName)
.read(spark, mode)
}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ class FileFormatsIT extends AnyFunSuite
with RegionedTest {

override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "src/test/tmp/it_formats")
.set("spark.sql.defaultCatalog", "local")
.set("spark.sql.defaultCatalog", "spark_catalog")

test("Write parquet") {

Expand Down Expand Up @@ -134,7 +134,7 @@ class FileFormatsIT extends AnyFunSuite
StructType(someSchema)
)

inputEmployeesDF.write.saveAsTable("fake_json_employees")
inputEmployeesDF.write.mode("overwrite").saveAsTable("fake_json_employees")

print(spark.catalog.tableExists("fake_json_employees"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

import com.metabolic.data.mapper.domain.ops.source._

class MetabolicReaderIT extends AnyFunSuite
with DataFrameSuiteBase
Expand All @@ -20,11 +20,16 @@ class MetabolicReaderIT extends AnyFunSuite
with RegionedTest {

override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.databricks.delta.optimize.repartition.enabled","true")
.set("spark.databricks.delta.vacuum.parallelDelete.enabled","true")
.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "src/test/tmp/it_formats")
.set("spark.sql.defaultCatalog", "spark_catalog")

def getFakeEmployeesDataframe(): DataFrame = {

Expand Down Expand Up @@ -169,5 +174,28 @@ class MetabolicReaderIT extends AnyFunSuite

}

test("Reader Table Batch") {

val fqn = "local.data_lake.fake_employee_delta"
val tableName = "fake_employee_delta"

val expected = getFakeEmployeesDataframe()

expected
.write
.format("iceberg")
.mode("overwrite")
.saveAsTable(fqn)

val source = TableSource(fqn,tableName)

MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark)

val result = spark.table(tableName)

assertDataFrameNoOrderEquals(expected, result)

}


}

0 comments on commit 7e630da

Please sign in to comment.