Skip to content

Commit

Permalink
NIFI-12700: refactored PutKudu to optimize memory handling for AUTO_F…
Browse files Browse the repository at this point in the history
…LUSH_SYNC flush mode (unbatched flush)

NIFI-12700: made changes based on PR comments. Simplified statements involving determination of whether or not there are flowfile failures/rowErrors. Separated out getting rowErrors from OperationResponses into its own function

Signed-off-by: Matt Burgess <[email protected]>

This closes apache#8322
  • Loading branch information
emiliosetiadarma authored and mattyb149 committed Mar 15, 2024
1 parent 42bd524 commit 3719fdd
Show file tree
Hide file tree
Showing 5 changed files with 388 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
Expand All @@ -35,6 +36,8 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
Expand Down Expand Up @@ -217,19 +220,33 @@ protected void executeOnKuduClient(Consumer<KuduClient> actionOnKuduClient) {
}
}

protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException {
final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();

/**
* Get the pending errors from the active {@link KuduSession}. This will only be applicable if the flushMode is
* {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND}.
* @return a {@link List} of pending {@link RowError}s
*/
protected List<RowError> getPendingRowErrorsFromKuduSession(final KuduSession kuduSession) {
if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
return Arrays.asList(kuduSession.getPendingErrors().getRowErrors());
} else {
responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.forEach(rowErrors::add);
return Collections.EMPTY_LIST;
}
}

protected List<RowError> flushKuduSession(final KuduSession kuduSession) throws KuduException {
final List<OperationResponse> responses = kuduSession.flush();
// RowErrors will only be present in the OperationResponses in this case if the flush mode
// selected is MANUAL_FLUSH. It will be empty otherwise.
return getRowErrors(responses);
}

protected List<RowError> closeKuduSession(final KuduSession kuduSession) throws KuduException {
final List<OperationResponse> responses = kuduSession.close();
// RowErrors will only be present in the OperationResponses in this case if the flush mode
// selected is MANUAL_FLUSH, since the underlying implementation of kuduSession.close() returns
// the OperationResponses from a flush() call.
return getRowErrors(responses);
}

@OnStopped
public void shutdown() throws Exception {
Expand Down Expand Up @@ -410,4 +427,11 @@ private String getName() {
return String.format("PutKudu[%s]-client-%d", identifier, threadCount.getAndIncrement());
}
}

private List<RowError> getRowErrors(final List<OperationResponse> responses) {
return responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;

import org.apache.kudu.client.Operation;
import org.apache.kudu.client.RowError;
import org.apache.nifi.flowfile.FlowFile;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AutoFlushSyncPutKuduResult extends PutKuduResult {
private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;

public AutoFlushSyncPutKuduResult() {
super();
this.flowFileRowErrorsMap = new HashMap<>();
}

@Override
public void recordOperation(final Operation operation) {
// this should be a no-op because we don't need to record Operation's origins
// for buffered flush when using AUTO_FLUSH_SYNC
return;
}

@Override
public void addError(final RowError rowError) {
final List<RowError> rowErrors = flowFileRowErrorsMap.getOrDefault(flowFile, new ArrayList<>());
rowErrors.add(rowError);
flowFileRowErrorsMap.put(flowFile, rowErrors);
}

@Override
public void addErrors(final List<RowError> rowErrors) {
// This is a no-op because we would never be in a situation where we'd have to add a collection of RowError
// using this Flush Mode. Since we do not keep Operation to FlowFile mapping, it will also be impossible to resolve
// RowErrors to the FlowFile that caused them, hence this method should never be implemented for AUTO_FLUSH_SYNC
return;
}

@Override
public boolean hasRowErrorsOrFailures() {
if (!flowFileFailures.isEmpty()) {
return true;
}

for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrorsMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
return true;
}
}

return false;
}

@Override
public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
return flowFileRowErrorsMap.getOrDefault(flowFile, Collections.EMPTY_LIST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -357,53 +355,52 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

private void processFlowFiles(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles, final KuduClient kuduClient) {
final Map<FlowFile, Integer> processedRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
final List<RowError> pendingRowErrors = new ArrayList<>();

final KuduSession kuduSession = createKuduSession(kuduClient);
final PutKuduResult putKuduResult = flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
? new AutoFlushSyncPutKuduResult() : new StandardPutKuduResult();
try {
processRecords(flowFiles,
processedRecords,
flowFileFailures,
operationFlowFileMap,
pendingRowErrors,
session,
context,
kuduClient,
kuduSession);
kuduSession,
putKuduResult);
} finally {
try {
flushKuduSession(kuduSession, true, pendingRowErrors);
final List<RowError> rowErrors = closeKuduSession(kuduSession);
if (flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
} else {
putKuduResult.addErrors(rowErrors);
}
} catch (final KuduException|RuntimeException e) {
getLogger().error("KuduSession.close() Failed", e);
}
}

if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) {
logFailures(pendingRowErrors, operationFlowFileMap);
putKuduResult.resolveFlowFileToRowErrorAssociations();

if (isRollbackOnFailure() && putKuduResult.hasRowErrorsOrFailures()) {
logFailures(putKuduResult);
session.rollback();
context.yield();
} else {
transferFlowFiles(flowFiles, processedRecords, flowFileFailures, operationFlowFileMap, pendingRowErrors, session);
transferFlowFiles(flowFiles, session, putKuduResult);
}
}

private void processRecords(final List<FlowFile> flowFiles,
final Map<FlowFile, Integer> processedRecords,
final Map<FlowFile, Object> flowFileFailures,
final Map<Operation, FlowFile> operationFlowFileMap,
final List<RowError> pendingRowErrors,
final ProcessSession session,
final ProcessContext context,
final KuduClient kuduClient,
final KuduSession kuduSession) {
final ProcessSession session,
final ProcessContext context,
final KuduClient kuduClient,
final KuduSession kuduSession,
final PutKuduResult putKuduResult) {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);

int bufferedRecords = 0;
OperationType prevOperationType = OperationType.INSERT;
for (FlowFile flowFile : flowFiles) {
putKuduResult.setFlowFile(flowFile);
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {

Expand Down Expand Up @@ -472,7 +469,12 @@ private void processRecords(final List<FlowFile> flowFiles,
// ignore operations.
if (!supportsInsertIgnoreOp && prevOperationType != operationType
&& (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
flushKuduSession(kuduSession, false, pendingRowErrors);
final List<RowError> rowErrors = flushKuduSession(kuduSession);
if (flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
} else {
putKuduResult.addErrors(rowErrors);
}
kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
}
prevOperationType = operationType;
Expand All @@ -481,34 +483,35 @@ private void processRecords(final List<FlowFile> flowFiles,
Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
putKuduResult.recordOperation(operation);

// Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one records.
if (bufferedRecords == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
bufferedRecords = 0;
flushKuduSession(kuduSession, false, pendingRowErrors);
final List<RowError> rowErrors = flushKuduSession(kuduSession);
putKuduResult.addErrors(rowErrors);
}

// OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
OperationResponse response = kuduSession.apply(operation);
final OperationResponse response = kuduSession.apply(operation);
if (response != null && response.hasRowError()) {
// Stop processing the records on the first error.
// Note that Kudu does not support rolling back of previous operations.
flowFileFailures.put(flowFile, response.getRowError());
putKuduResult.addFailure(response.getRowError());
break recordReaderLoop;
}

bufferedRecords++;
processedRecords.merge(flowFile, 1, Integer::sum);
putKuduResult.incrementProcessedRecordsForFlowFile();
}

record = recordSet.next();
}
} catch (Exception ex) {
getLogger().error("Failed to push {} to Kudu", flowFile, ex);
flowFileFailures.put(flowFile, ex);
putKuduResult.addFailure(ex);
}
}
}
Expand Down Expand Up @@ -575,54 +578,43 @@ private boolean handleSchemaDrift(final KuduTable kuduTable, final KuduClient ku
}

private void transferFlowFiles(final List<FlowFile> flowFiles,
final Map<FlowFile, Integer> processedRecords,
final Map<FlowFile, Object> flowFileFailures,
final Map<Operation, FlowFile> operationFlowFileMap,
final List<RowError> pendingRowErrors,
final ProcessSession session) {
// Find RowErrors for each FlowFile
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream()
.filter(e -> operationFlowFileMap.get(e.getOperation()) != null)
.collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))
);

final ProcessSession session,
final PutKuduResult putKuduResult) {
long totalCount = 0L;
for (FlowFile flowFile : flowFiles) {
final int count = processedRecords.getOrDefault(flowFile, 0);
final int count = putKuduResult.getProcessedRecordsForFlowFile(flowFile);
totalCount += count;
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
final List<RowError> rowErrors = putKuduResult.getRowErrorsForFlowFile(flowFile);

if (rowErrors != null) {
if (rowErrors != null && !rowErrors.isEmpty()) {
rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", rowError.toString()));
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, Integer.toString(count - rowErrors.size()));
totalCount -= rowErrors.size(); // Don't include error rows in the the counter.
totalCount -= rowErrors.size(); // Don't include error rows in the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));

if (flowFileFailures.containsKey(flowFile)) {
getLogger().error("Failed to write due to {}", flowFileFailures.get(flowFile));
session.transfer(flowFile, REL_FAILURE);
} else {
if (putKuduResult.isFlowFileProcessedSuccessfully(flowFile)) {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu");
} else {
getLogger().error("Failed to write due to {}", putKuduResult.getFailureForFlowFile(flowFile));
session.transfer(flowFile, REL_FAILURE);
}
}
}

session.adjustCounter("Records Inserted", totalCount, false);
}

private void logFailures(final List<RowError> pendingRowErrors, final Map<Operation, FlowFile> operationFlowFileMap) {
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));

for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrors.entrySet()) {
final FlowFile flowFile = entry.getKey();
final List<RowError> errors = entry.getValue();
private void logFailures(final PutKuduResult putKuduResult) {
final Set<FlowFile> processedFlowFiles = putKuduResult.getProcessedFlowFiles();
for (final FlowFile flowFile : processedFlowFiles) {
final List<RowError> errors = putKuduResult.getRowErrorsForFlowFile(flowFile);
if (!errors.isEmpty()) {
getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors);
}

getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors);
}
}

Expand Down
Loading

0 comments on commit 3719fdd

Please sign in to comment.