Skip to content

Commit

Permalink
Clean Metabolic functions
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Sep 12, 2024
1 parent 74bd63a commit 9e5da27
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ object MetabolicReader extends Logging {
case IOFormat.CSV =>
new CSVReader(fileSource.inputPath)
.read(spark, mode)

case IOFormat.JSON =>
new JSONReader(fileSource.inputPath, fileSource.useStringPrimitives)
.read(spark, mode)

case IOFormat.PARQUET =>
new ParquetReader(fileSource.inputPath)
.read(spark, mode)

case IOFormat.DELTA =>
new DeltaReader(fileSource.inputPath)
.read(spark, mode)
Expand Down
31 changes: 14 additions & 17 deletions src/main/scala/com/metabolic/data/mapper/app/MetabolicWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ object MetabolicWriter extends Logging {
}
}


private def prepareSink(sink: Sink)(implicit spark: SparkSession): Repartitioner = {

sink.ops
Expand All @@ -40,15 +39,13 @@ object MetabolicWriter extends Logging {
case schema: ManageSchemaSinkOp => {

val schemaPartitioner = new SchemaManagerPartitioner("default", sink.name)

r.addColumnsWithBuilder(schemaPartitioner.partitionColumnNames, schemaPartitioner)

}

case date: DatePartitionSinkOp => {

val datePartitioner = new DatePartitioner(Option.apply(date.eventTimeColumnName), date.depth)

r.addColumnsWithBuilder(datePartitioner.partitionColumnNames, datePartitioner)

}
Expand All @@ -75,19 +72,8 @@ object MetabolicWriter extends Logging {

sink match {

case streamSink: StreamSink => {
streamSink.format match {

case IOFormat.KAFKA =>
logger.info(s"Writing Kafka sink ${streamSink.topic}")

new KafkaWriter(streamSink.servers, streamSink.apiKey, streamSink.apiSecret,
streamSink.topic, streamSink.idColumnName, checkpointPath)
.write(_df, mode)

}
}
case fileSink: FileSink => {
logger.info(s"Writing file sink ${fileSink.name}")

val path = if (autoSchema) {
val versionRegex = """(.*)/(version=\d+/)""".r
Expand Down Expand Up @@ -123,16 +109,27 @@ object MetabolicWriter extends Logging {
new DeltaZOrderWriter(repartitioner.partitionColumnNames, path, fileWriteMode, fileSink.eventTimeColumnName,
fileSink.idColumnName, fileSink.dbName, checkpointPath, namespaces, fileSink.optimize, fileSink.optimizeEvery)
.write(_output, mode)

}
}

case table: TableSink => {
logger.info(s"Writing Iceberg/Table sink ${table.catalog}")
logger.info(s"Writing Table sink ${table.catalog}")

new IcebergWriter(table.catalog, table.writeMode, checkpointPath)
.write(_df, mode)

}

case streamSink: StreamSink => {
streamSink.format match {
case IOFormat.KAFKA =>
logger.info(s"Writing Kafka sink ${streamSink.topic}")

new KafkaWriter(streamSink.servers, streamSink.apiKey, streamSink.apiSecret,
streamSink.topic, streamSink.idColumnName, checkpointPath)
.write(_df, mode)
}
}
}
}

Expand Down

0 comments on commit 9e5da27

Please sign in to comment.