From 858ef0dde60319508c472e2e006b1a8415b70aa2 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Fri, 5 Apr 2024 10:35:50 -0400 Subject: [PATCH] NIFI-12889: Refactored HDFS processors to use common method for handling GSS auth exceptions --- .../nifi/processors/hadoop/DeleteHDFS.java | 8 +- .../nifi/processors/hadoop/MoveHDFS.java | 165 +++++++++--------- .../nifi/processors/hadoop/PutHDFS.java | 2 - .../processors/hadoop/TestDeleteHDFS.java | 4 +- 4 files changed, 89 insertions(+), 90 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 50d86001721af..9cf0a165c1620 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -202,8 +202,12 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro session.remove(flowFile); } } catch (IOException e) { - getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e); - session.transfer(flowFile, getFailureRelationship()); + if (handleAuthErrors(e, session, context)) { + return null; + } else { + getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e); + session.transfer(flowFile, getFailureRelationship()); + } } return null; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java index cc78c59c096e6..60be5e1245d38 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java @@ -254,15 +254,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (!directoryExists) { throw new IOException("Input Directory or File does not exist in HDFS"); } - } catch (final IOException e) { - if(!handleAuthErrors(e, session, context)) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e); - flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - return; } catch (Exception e) { + if (handleAuthErrors(e, session, context)) { + return; + } getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e); flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); @@ -360,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { - ugi.doAs(new PrivilegedAction() { - @Override - public Object run() { - FlowFile flowFile = session.create(parentFlowFile); - try { - final String originalFilename = file.getName(); - final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); - final Path newFile = new Path(outputDirPath, originalFilename); - final boolean destinationExists = hdfs.exists(newFile); - // If destination file already exists, resolve that - // based on processor configuration - if (destinationExists) { - switch (processorConfig.getConflictResolution()) { - case REPLACE_RESOLUTION: - // Remove destination file (newFile) to replace - if (hdfs.delete(newFile, false)) { - getLogger().info("deleted {} in order to replace with the contents of {}", - new Object[]{newFile, flowFile}); - } - break; - case IGNORE_RESOLUTION: - session.transfer(flowFile, REL_SUCCESS); - getLogger().info( - "transferring {} to success because file with same name already exists", - new Object[]{flowFile}); - return null; - case FAIL_RESOLUTION: - session.transfer(session.penalize(flowFile), REL_FAILURE); - getLogger().warn( - "penalizing {} and routing to failure because file with same name already exists", - new Object[]{flowFile}); - return null; - default: - break; - } + ugi.doAs((PrivilegedAction) () -> { + FlowFile flowFile = session.create(parentFlowFile); + try { + final String originalFilename = file.getName(); + final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); + final Path newFile = new Path(outputDirPath, originalFilename); + final boolean destinationExists = hdfs.exists(newFile); + // If destination file already exists, resolve that + // based on processor configuration + if (destinationExists) { + switch (processorConfig.getConflictResolution()) { + case REPLACE_RESOLUTION: + // Remove destination file (newFile) to replace + if (hdfs.delete(newFile, false)) { + getLogger().info("deleted {} in order to replace with the contents of {}", + new Object[]{newFile, flowFile}); + } + break; + case IGNORE_RESOLUTION: + session.transfer(flowFile, REL_SUCCESS); + getLogger().info( + "transferring {} to success because file with same name already exists", + new Object[]{flowFile}); + return null; + case FAIL_RESOLUTION: + session.transfer(session.penalize(flowFile), REL_FAILURE); + getLogger().warn( + "penalizing {} and routing to failure because file with same name already exists", + new Object[]{flowFile}); + return null; + default: + break; } + } - // Create destination directory if it does not exist - try { - if (!hdfs.getFileStatus(outputDirPath).isDirectory()) { - throw new IOException(outputDirPath.toString() - + " already exists and is not a directory"); - } - } catch (FileNotFoundException fe) { - if (!hdfs.mkdirs(outputDirPath)) { - throw new IOException(outputDirPath.toString() + " could not be created"); - } - changeOwner(context, hdfs, outputDirPath); + // Create destination directory if it does not exist + try { + if (!hdfs.getFileStatus(outputDirPath).isDirectory()) { + throw new IOException(outputDirPath + " already exists and is not a directory"); } + } catch (FileNotFoundException fe) { + if (!hdfs.mkdirs(outputDirPath)) { + throw new IOException(outputDirPath + " could not be created"); + } + changeOwner(context, hdfs, outputDirPath); + } - boolean moved = false; - for (int i = 0; i < 10; i++) { // try to rename multiple - // times. - if (processorConfig.getOperation().equals("move")) { - if (hdfs.rename(file, newFile)) { - moved = true; - break;// rename was successful - } - } else { - if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) { - moved = true; - break;// copy was successful - } + boolean moved = false; + for (int i = 0; i < 10; i++) { // try to rename multiple + // times. + if (processorConfig.getOperation().equals("move")) { + if (hdfs.rename(file, newFile)) { + moved = true; + break;// rename was successful + } + } else { + if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) { + moved = true; + break;// copy was successful } - Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve - } - if (!moved) { - throw new ProcessException("Could not move file " + file + " to its final filename"); } + Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve + } + if (!moved) { + throw new ProcessException("Could not move file " + file + " to its final filename"); + } - changeOwner(context, hdfs, newFile); - final String outputPath = newFile.toString(); - final String newFilename = newFile.getName(); - final String hdfsPath = newFile.getParent().toString(); - flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename); - flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); - final Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); - flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); - final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/"); - session.getProvenanceReporter().send(flowFile, transitUri); - session.transfer(flowFile, REL_SUCCESS); - - } catch (final Throwable t) { + changeOwner(context, hdfs, newFile); + final String outputPath = newFile.toString(); + final String newFilename = newFile.getName(); + final String hdfsPath = newFile.getParent().toString(); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename); + flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); + final Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); + flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); + final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/"); + session.getProvenanceReporter().send(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + + } catch (final Throwable t) { + if (handleAuthErrors(t, session, context)) { + return null; + } else { getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t}); session.transfer(session.penalize(flowFile), REL_FAILURE); context.yield(); } - return null; } + return null; }); } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index bdf7aa09955c5..228031e96c535 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -391,8 +391,6 @@ public Object run() { StreamUtils.copy(bis, fos); bis = null; fos.flush(); - } catch (IOException e) { - throw new ProcessException(e); } finally { try { if (fos != null) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index 0023af5a1a3ef..11b7c037defa6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -125,7 +125,9 @@ public void testGSSException() throws Exception { attributes.put("hdfs.file", filePath.toString()); runner.enqueue("foo", attributes); runner.run(); - runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); + // GSS Auth exceptions should cause rollback + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); } @Test