Skip to content

Commit

Permalink
Fix no optimize in delta
Browse files Browse the repository at this point in the history
  • Loading branch information
berna396 committed May 16, 2024
1 parent 4542fd6 commit 8fb00c7
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
.whenNotMatched().insertAll()
.execute()

if (batchId % optimizeEvery == 0) {
if (batchId % optimizeEvery == 0 && optimize) {
compactAndVacuum
}
}
Expand Down Expand Up @@ -153,20 +153,24 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
if (optimize) {
baseQuery.foreachBatch(appendToDelta _).start
} else {
baseQuery.start
baseQuery.start(outputPath)
}

case WriteMode.Overwrite =>

DeltaTable.forPath(outputPath).delete()

df
val baseQuery = df
.writeStream
.outputMode("append")
.option("overwriteSchema", "true")
.option("mergeSchema", "true")
.option("checkpointLocation", checkpointLocation)
.foreachBatch(appendToDelta _)
.start

if (optimize) {
baseQuery.foreachBatch(appendToDelta _).start
} else {
baseQuery.start(outputPath)
}

case WriteMode.Upsert => df
.writeStream
Expand Down

0 comments on commit 8fb00c7

Please sign in to comment.