diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala index dd80bbd..319b3bb 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala @@ -84,7 +84,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, .whenNotMatched().insertAll() .execute() - if (batchId % optimizeEvery == 0) { + if (batchId % optimizeEvery == 0 && optimize) { compactAndVacuum } } @@ -153,20 +153,24 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, if (optimize) { baseQuery.foreachBatch(appendToDelta _).start } else { - baseQuery.start + baseQuery.start(outputPath) } case WriteMode.Overwrite => DeltaTable.forPath(outputPath).delete() - df + val baseQuery = df .writeStream .outputMode("append") - .option("overwriteSchema", "true") + .option("mergeSchema", "true") .option("checkpointLocation", checkpointLocation) - .foreachBatch(appendToDelta _) - .start + + if (optimize) { + baseQuery.foreachBatch(appendToDelta _).start + } else { + baseQuery.start(outputPath) + } case WriteMode.Upsert => df .writeStream