Skip to content

Commit

Permalink
Clean Writer Test
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Sep 12, 2024
1 parent 0997930 commit 74bd63a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.metabolic.data.core.services.spark.writer

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import com.metabolic.data.core.services.spark.reader.table.GenericReader
import com.metabolic.data.core.services.spark.writer.file.IcebergWriter
import com.metabolic.data.mapper.domain.io.{EngineMode, WriteMode}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import com.metabolic.data.mapper.domain.io.{EngineMode, WriteMode}
import org.apache.spark.sql.streaming.Trigger

import java.io.File
import scala.reflect.io.Directory
Expand All @@ -19,16 +17,7 @@ class IcebergWriterTest extends AnyFunSuite
with SharedSparkContext
with BeforeAndAfterAll {

override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "./warehouse")
.set("spark.sql.defaultCatalog", "local")


val testDir = "src/test/tmp/iw_test/"
private val inputData = Seq(
Row("A", "a", 2022, 2, 5, "2022-02-05"),
Row("B", "b", 2022, 2, 4, "2022-02-04"),
Expand All @@ -49,7 +38,14 @@ class IcebergWriterTest extends AnyFunSuite
StructField("date", StringType, true),
)

val testDir = "src/test/tmp/iw_test/"
override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "./warehouse")
.set("spark.sql.defaultCatalog", "local")

test("Iceberg batch append") {

Expand Down Expand Up @@ -98,7 +94,7 @@ class IcebergWriterTest extends AnyFunSuite

}

test("Iceberg streaming append"){
ignore("Iceberg streaming append") {

new Directory(new File(testDir)).deleteRecursively()
new Directory(new File("./warehouse/data_lake")).deleteRecursively()
Expand All @@ -123,12 +119,12 @@ class IcebergWriterTest extends AnyFunSuite
.option("stream-from-timestamp", streamStartTimestamp.toString)
.load(expected)

val wm = WriteMode.Append
val wm = WriteMode.Complete
val cpl = testDir + "checkpoints"
val iceberg = new IcebergWriter(result, wm, cpl)(spark)
iceberg.write(streamDf, EngineMode.Stream)

assertDataFrameNoOrderEquals(spark.table(expected),spark.table(result))
assertDataFrameNoOrderEquals(expectedDf, spark.table(result))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ class FileFormatsIT extends AnyFunSuite

inputEmployeesDF.write.mode("overwrite").saveAsTable("fake_json_employees")

print(spark.catalog.tableExists("fake_json_employees"))

val multilineSQL = "select name, cast(age as string) as new_age from employees"

val testingConfig = Config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ class MetabolicReaderIT extends AnyFunSuite

val result = spark.table(tableName)


print("hola")
print(result.toString())
print(MetabolicReader.read(source, historical = true, EngineMode.Batch, enableJDBC = false, "", "")(spark))

assertDataFrameNoOrderEquals(expected, result)

}
Expand Down

0 comments on commit 74bd63a

Please sign in to comment.