From 112ccc8af628c2047a6bd835fd793d93b79f5876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A1n=20Bern=C3=A1rdez=20Cornes?= Date: Tue, 14 May 2024 15:18:28 +0200 Subject: [PATCH] emd --- .../spark/writer/file/DeltaWriter.scala | 2 +- src/test/resources/example.conf | 61 ++++++++++++------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala index 9575da3..dd80bbd 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala @@ -153,7 +153,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, if (optimize) { baseQuery.foreachBatch(appendToDelta _).start } else { - baseQuery.option("mergeSchema", "true").start + baseQuery.start } case WriteMode.Overwrite => diff --git a/src/test/resources/example.conf b/src/test/resources/example.conf index b80129e..f582fec 100644 --- a/src/test/resources/example.conf +++ b/src/test/resources/example.conf @@ -1,26 +1,43 @@ entities: [ -{ - name: Bronze job_catalog_levels - sources: [ - { - inputPath = ${dp.dl_raw_bucket}"/rds/accesses/" - name = "raw_deduped_accesses" - format = "PARQUET" - op.dedupe = { - idColumns = ["id"] - orderColumns = ["yyyy","mm","dd","updated_at"] + { + name: Bronze surveys_question_group_configs + sources: [ + { + inputPath = "s3a://factorial-datalake-raw/debezium_json_v1/production.pub.raw.rds.factorial.surveys_question_configs/" + name = "bronze_table" + format = "JSON" + ops = [ + { + op: "expr" + expressions = ["timestamp(from_unixtime(payload.source.ts_ms/1000,'yyyy-MM-dd HH:mm:ss')) as metadata_updated_at","year as yyyy", "month as mm", "day as dd"] + }, + { + op: "prune" + from = ${df.start_of_yesterday} + to = ${df.now} + }, + { + op = "filter" + onColumn = "metadata_updated_at" + from = ${df.start_of_yesterday} + to = ${df.now} + } + ] + } + ] + mapping { + file = "src/test/resources/example.sql" + } + sink { + outputPath = ${dp.dl_bronze_bucket}"/surveys_question_group_configs/" + writeMode = "append" + ops: [ + { + op: flatten + column = "record" + } + ] + format = "DELTA" } } - ] - mapping { - sql = "select * from raw_deduped_accesses" - } - sink { - name = "attendance_deleted_shifts" - format = "kafka" - kafkaSecret = "production/kafka_credentials_sr" - topic = "test_2" - schemaRegistry = "avro" - } -} ] \ No newline at end of file