Skip to content

Commit

Permalink
Simplify GenericReader
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Sep 12, 2024
1 parent 876bdeb commit 5667f12
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,14 @@ class GenericReader(fqn: String) extends DataframeUnifiedReader with Logging{

override val input_identifier: String = fqn

private def getTableProvider(spark: SparkSession): String = {
spark.sql(s"DESCRIBE FORMATTED $input_identifier")
.filter(col("col_name").contains("Provider"))
.select("data_type")
.as[String]
.first()
}

override def readBatch(spark: SparkSession): DataFrame = {
//Generic for Delta Lake and Iceberg tables using fqn
spark.table(input_identifier)
}

override def readStream(spark: SparkSession): DataFrame = {

val provider = getTableProvider(spark)
provider match {
case "iceberg" =>
logger.info(s"Reading Iceberg Table source ${input_identifier}")
spark.readStream
.format("iceberg")
.option("stream-from-timestamp", (System.currentTimeMillis() - 3600000).toString)
.load(input_identifier)

case "delta" =>
logger.info(s"Reading Delta Table source $input_identifier")
spark.readStream
.format("delta")
.table(input_identifier)

case unknown =>
logger.warn(s"Table source $provider not supported for table $input_identifier")
spark.emptyDataFrame
}
//Generic for Delta Lake and Iceberg tables using fqn
spark.readStream.table(input_identifier)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ class GenericReaderTest extends AnyFunSuite
StructField("date", StringType, true),
)

//TODO: check iceberg catalog generalization
//TODO: use same table for all tests
override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", "src/test/tmp/gr_test/catalog")
.set("spark.databricks.delta.optimize.repartition.enabled","true")
.set("spark.databricks.delta.vacuum.parallelDelete.enabled","true")
.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "src/test/tmp/gr_test/catalog")
.set("spark.sql.defaultCatalog", "spark_catalog")

val testDir = "src/test/tmp/gr_test/"
Expand All @@ -67,9 +67,10 @@ class GenericReaderTest extends AnyFunSuite

cleanUpTestDir()

val fqn = "local.data_lake.letters"
val fqn = "spark_catalog.data_lake.letters"
spark.sql("CREATE DATABASE IF NOT EXISTS data_lake")

val expectedDf = createExpectedDataFrame()
val expectedDf = createExpectedDataFrame()git add
expectedDf
.write
.format("iceberg")
Expand All @@ -86,7 +87,7 @@ class GenericReaderTest extends AnyFunSuite

cleanUpTestDir()

val fqn = "data_lake.letters"
val fqn = "data_lake.letters2"
spark.sql("CREATE DATABASE IF NOT EXISTS data_lake")

val expectedDf = createExpectedDataFrame()
Expand Down Expand Up @@ -120,11 +121,11 @@ class GenericReaderTest extends AnyFunSuite
.saveAsTable(fqn)

val iceberg = new GenericReader(fqn)
val inputDf = iceberg.read(spark, EngineMode.Stream)
val readDf = iceberg.read(spark, EngineMode.Stream)

val checkpointPath = testDir + "checkpoints"

val query = inputDf.writeStream
val query = readDf.writeStream
.format("parquet") // or "csv", "json", etc.
.outputMode("append") // Ensure the output mode is correct for your use case
.trigger(Trigger.Once()) // Process only one batch
Expand All @@ -140,6 +141,28 @@ class GenericReaderTest extends AnyFunSuite

assertDataFrameEquals(expectedDf, resultDf)

expectedDf
.write
.format("iceberg")
.mode("append")
.saveAsTable(fqn)

val query2 = readDf.writeStream
.format("parquet") // or "csv", "json", etc.
.outputMode("append") // Ensure the output mode is correct for your use case
.trigger(Trigger.Once()) // Process only one batch
.option("checkpointLocation", checkpointPath)
.option("path", testDir + table) // Specify the output path for the file
.start()

query2.awaitTermination()

val resultDf2 = spark.read
.format("parquet")
.load(testDir + table)

assertDataFrameEquals(expectedDf.union(expectedDf), resultDf2)

}

test("Delta stream read") {
Expand All @@ -149,9 +172,10 @@ class GenericReaderTest extends AnyFunSuite
new Directory(new File(testDir)).deleteRecursively()

val fqn = "data_lake.letters_stream"
val database = "data_lake"
val table = "letters_stream"

spark.sql(s"CREATE DATABASE IF NOT EXISTS ${table}")
spark.sql(s"CREATE DATABASE IF NOT EXISTS ${database}")

val expectedDf = createExpectedDataFrame()
expectedDf
Expand Down Expand Up @@ -184,4 +208,6 @@ class GenericReaderTest extends AnyFunSuite
assertDataFrameEquals(sortedExpectedDf, sortedResultDf)
}

//TODO: test other formats and glue catalog compatibility

}

0 comments on commit 5667f12

Please sign in to comment.