Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
margon8 committed Oct 5, 2023
1 parent e4ff739 commit 847be02
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t
.option("subscribe", topic)
.option("kafka.session.timeout.ms", 45000)
.option("kafka.client.dns.lookup","use_all_dns_ips")
.option("startingOffsets", "latest")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", false)


Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.metabolic.data.core.services.spark.reader.file.{CSVReader, DeltaReade
import com.metabolic.data.core.services.spark.reader.stream.KafkaReader
import com.metabolic.data.core.services.spark.reader.table.TableReader
import com.metabolic.data.core.services.spark.transformations._
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.io.EngineMode.{Batch, EngineMode}
import com.metabolic.data.mapper.domain.io._
import com.metabolic.data.mapper.domain.ops._
import com.metabolic.data.mapper.domain.ops.source._
Expand All @@ -18,7 +18,7 @@ object MetabolicReader extends Logging {

val input = readSource(source, mode, spark)

val prepared = prepareSource(source, historical, input)
val prepared = prepareSource(source, mode, historical, input)

prepared.createOrReplaceTempView(source.name)

Expand Down Expand Up @@ -65,16 +65,23 @@ object MetabolicReader extends Logging {
}
}

private def prepareSource(source: Source, historical: Boolean, input: DataFrame) = {
private def prepareSource(source: Source, mode: EngineMode, historical: Boolean, input: DataFrame) = {
source.ops
.foldLeft(input) { (df: DataFrame, op: SourceOp) =>

op match {
case filter: FilterSourceOp => {
if (!historical) {
df
.transform(new DateFieldUpToReader(filter.onColumn, filter.toDate).filter())
.transform(new DateFieldFromReader(filter.onColumn, filter.fromDate).filter())
mode match {
case Batch =>
df.
transform(new DateFieldUpToReader(filter.onColumn, filter.toDate).filter())
.transform(new DateFieldFromReader(filter.onColumn, filter.fromDate).filter())
case Stream =>
df
.transform(new DateFieldFromReader(filter.onColumn, filter.fromDate).filter())

}
} else df
}
case prune: PruneDateComponentsSourceOp => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.metabolic.data.mapper.app

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import com.metabolic.data.RegionedTest
import com.metabolic.data.core.domain.{Defaults, Environment}
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.io.{EngineMode, FileSink, FileSource, IOFormat}
import com.metabolic.data.mapper.domain.ops.SQLStatmentMapping
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite

class MultiStreamingEMIT extends AnyFunSuite
with DataFrameSuiteBase
with SharedSparkContext
with BeforeAndAfterAll
with RegionedTest {

override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")


val firstBatchData = Seq(
Row("A", "a", 2022, 2, 5, "2022-02-05"),
Row("B", "b", 2022, 2, 4, "2022-02-04"),
Row("C", "c", 2022, 2, 3, "2022-02-03"),
Row("D", "d", 2022, 2, 2, "2022-02-02")
)

val secondBatchData = Seq(
Row("E", "e", 2022, 2, 1, "2022-02-01"),
Row("F", "f", 2022, 1, 5, "2022-01-05"),
Row("G", "g", 2021, 2, 2, "2021-02-02"),
Row("H", "h", 2020, 2, 5, "2020-02-05")
)

val inputSchema = List(
StructField("name", StringType, true),
StructField("data", StringType, true),
StructField("yyyy", IntegerType, true),
StructField("mm", IntegerType, true),
StructField("dd", IntegerType, true),
StructField("date", StringType, true),
)

val checkpoints_path = "src/test/tmp/checkpoints"
val input_path = "src/test/tmp/delta/letters"
val output_path = "src/test/tmp/delta/all_letters"


test("Single Job") {

val sqlCtx = sqlContext

val firstInput = spark.createDataFrame(
spark.sparkContext.parallelize(firstBatchData),
StructType(inputSchema)
)

firstInput
.write
.format("delta")
.save(input_path)

val source = FileSource(input_path, "letters", IOFormat.DELTA)
val sql = "SELECT * FROM letters"

val singleStreamingJobConfig = Config(
"My Streaming Test",
List(source),
List(SQLStatmentMapping(sql)),
FileSink("all_letters", output_path, SaveMode.Overwrite, IOFormat.DELTA),
Defaults(ConfigFactory.load()), Environment("", EngineMode.Stream, checkpoints_path, false, "test", "", Option.empty)
)

MetabolicApp()
.transformAll(List(singleStreamingJobConfig))(region, spark)


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatest.funsuite.AnyFunSuite
class GlueCatalogServiceTest extends AnyFunSuite
with RegionedTest {

ignore("Works") {
test("Works") {

val s3Path = Seq("s3://factorial-etl/entity_mapper/dl/clean/subs/")
val config = Environment("[Test] ", EngineMode.Batch, "", true, "test_data_lake", "AWSGlueServiceRoleDefault", Option.apply("fooBarAtlan"))
Expand Down

0 comments on commit 847be02

Please sign in to comment.