Skip to content

Commit b393b2a

Browse files
committed
[core] Skip processed sequence group fields to improve performance of PartialUpdateMergeFunction
1 parent a9ffd30 commit b393b2a

File tree

1 file changed

+28
-20
lines changed

1 file changed

+28
-20
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private void updateWithSequenceGroup(KeyValue kv) {
199199
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = fieldAggregators.iterator();
200200
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? aggIter.next() : null;
201201

202-
boolean[] isEmptySequenceGroup = new boolean[getters.length];
202+
boolean[] isProcessedSequenceField = new boolean[getters.length];
203203
for (int i = 0; i < getters.length; i++) {
204204
FieldsComparator seqComparator = null;
205205
if (curComparator != null && curComparator.fieldIndex == i) {
@@ -214,15 +214,13 @@ private void updateWithSequenceGroup(KeyValue kv) {
214214
}
215215

216216
Object accumulator = row.getField(i);
217-
if (seqComparator == null) {
218-
Object field = getters[i].getFieldOrNull(kv.value());
219-
if (aggregator != null) {
220-
row.setField(i, aggregator.agg(accumulator, field));
221-
} else if (field != null) {
222-
row.setField(i, field);
217+
if (seqComparator != null) {
218+
// Skip if this field has already been processed as part of a sequence group
219+
if (isProcessedSequenceField[i]) {
220+
continue;
223221
}
224-
} else {
225-
if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) {
222+
223+
if (isEmptySequenceGroup(kv, seqComparator, isProcessedSequenceField)) {
226224
// skip null sequence group
227225
continue;
228226
}
@@ -237,6 +235,8 @@ private void updateWithSequenceGroup(KeyValue kv) {
237235
for (int fieldIndex : seqComparator.compareFields()) {
238236
row.setField(
239237
fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
238+
// Mark all fields in this sequence group as processed
239+
isProcessedSequenceField[fieldIndex] = true;
240240
}
241241
continue;
242242
}
@@ -245,27 +245,28 @@ private void updateWithSequenceGroup(KeyValue kv) {
245245
} else if (aggregator != null) {
246246
row.setField(i, aggregator.aggReversed(accumulator, field));
247247
}
248+
} else {
249+
Object field = getters[i].getFieldOrNull(kv.value());
250+
if (aggregator != null) {
251+
row.setField(i, aggregator.agg(accumulator, field));
252+
} else if (field != null) {
253+
row.setField(i, field);
254+
}
248255
}
249256
}
250257
}
251258

252259
private boolean isEmptySequenceGroup(
253-
KeyValue kv, FieldsComparator comparator, boolean[] isEmptySequenceGroup) {
254-
255-
// If any flag of the sequence fields is set, it means the sequence group is empty.
256-
if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
257-
return true;
258-
}
259-
260+
KeyValue kv, FieldsComparator comparator, boolean[] isProcessedSequenceField) {
260261
for (int fieldIndex : comparator.compareFields()) {
261262
if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
262263
return false;
263264
}
264265
}
265266

266-
// Set the flag of all the sequence fields of the sequence group.
267+
// Mark all fields in this sequence group as processed
267268
for (int fieldIndex : comparator.compareFields()) {
268-
isEmptySequenceGroup[fieldIndex] = true;
269+
isProcessedSequenceField[fieldIndex] = true;
269270
}
270271

271272
return true;
@@ -280,7 +281,7 @@ private void retractWithSequenceGroup(KeyValue kv) {
280281
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = fieldAggregators.iterator();
281282
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? aggIter.next() : null;
282283

283-
boolean[] isEmptySequenceGroup = new boolean[getters.length];
284+
boolean[] isProcessedSequenceField = new boolean[getters.length];
284285
for (int i = 0; i < getters.length; i++) {
285286
FieldsComparator seqComparator = null;
286287
if (curComparator != null && curComparator.fieldIndex == i) {
@@ -295,7 +296,12 @@ private void retractWithSequenceGroup(KeyValue kv) {
295296
}
296297

297298
if (seqComparator != null) {
298-
if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) {
299+
// Skip if this field has already been processed as part of a sequence group
300+
if (isProcessedSequenceField[i]) {
301+
continue;
302+
}
303+
304+
if (isEmptySequenceGroup(kv, seqComparator, isProcessedSequenceField)) {
299305
// skip null sequence group
300306
continue;
301307
}
@@ -319,6 +325,8 @@ private void retractWithSequenceGroup(KeyValue kv) {
319325
updatedSequenceFields.add(field);
320326
}
321327
}
328+
// Mark all fields in this sequence group as processed
329+
isProcessedSequenceField[field] = true;
322330
}
323331
} else {
324332
// retract normal field

0 commit comments

Comments
 (0)