From d4b6b5d3a092778f30794a464fe68dc4f0b86451 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Fri, 22 Mar 2024 18:39:48 -0400 Subject: [PATCH] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors --- .../hadoop/AbstractHadoopProcessor.java | 27 +- .../nifi/processors/hadoop/DeleteHDFS.java | 35 ++- .../nifi/processors/hadoop/FetchHDFS.java | 110 +++---- .../nifi/processors/hadoop/GetHDFS.java | 37 ++- .../processors/hadoop/GetHDFSFileInfo.java | 25 +- .../hadoop/GetHDFSSequenceFile.java | 24 +- .../nifi/processors/hadoop/MoveHDFS.java | 27 +- .../nifi/processors/hadoop/PutHDFS.java | 135 ++++----- .../hadoop/GetHDFSSequenceFileTest.java | 57 +++- .../nifi/processors/hadoop/GetHDFSTest.java | 41 ++- .../nifi/processors/hadoop/MoveHDFSTest.java | 62 +++- .../nifi/processors/hadoop/PutHDFSTest.java | 180 +++--------- .../processors/hadoop/TestDeleteHDFS.java | 18 +- .../nifi/processors/hadoop/TestFetchHDFS.java | 62 +++- .../hadoop/TestGetHDFSFileInfo.java | 164 ++--------- .../hadoop/util/MockFileSystem.java | 273 ++++++++++++++++++ 16 files changed, 795 insertions(+), 482 deletions(-) create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 447f198226191..df4ef86c3b5a3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.hadoop; +import com.google.common.base.Throwables; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -62,7 +63,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.regex.Pattern; +import java.util.stream.Stream; /** * This is a base class that is helpful when building processors interacting with HDFS. @@ -171,7 +174,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen // variables shared by all threads of this processor // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) - private final AtomicReference hdfsResources = new AtomicReference<>(); + final AtomicReference hdfsResources = new AtomicReference<>(); // Holder of cached Configuration information so validation does not reload the same config over and over private final AtomicReference validationResourceHolder = new AtomicReference<>(); @@ -532,12 +535,7 @@ protected FileSystem getFileSystem(final Configuration config) throws IOExceptio protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { try { - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws Exception { - return FileSystem.get(config); - } - }); + return ugi.doAs((PrivilegedExceptionAction) () -> FileSystem.get(config)); } catch (InterruptedException e) { throw new IOException("Unable to create file system: " + e.getMessage()); } @@ -703,4 +701,19 @@ private Path getNormalizedPath(final String rawPath, final Optional prop return new Path(path.replaceAll("/+", "/")); } + + /** + * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type, + * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise. + * @param t The throwable to inspect for the cause. + * @return Throwable Cause + */ + protected Optional findCause(Throwable t, Class expectedCauseType, Predicate causePredicate) { + Stream causalChain = Throwables.getCausalChain(t).stream(); + return causalChain + .filter(expectedCauseType::isInstance) + .map(expectedCauseType::cast) + .filter(causePredicate) + .findFirst(); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 0cab0b0bdc7ec..bebcdf5314dd8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -39,6 +39,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.ietf.jgss.GSSException; import java.io.IOException; import java.security.PrivilegedAction; @@ -47,6 +48,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -177,16 +179,29 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); } catch (IOException ioe) { - // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible - // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. - getLogger().warn("Failed to delete file or directory", ioe); - - Map attributes = Maps.newHashMapWithExpectedSize(1); - // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) - attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); - - session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); - failedPath++; + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(ioe, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException resetResourcesException) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException); + } + session.rollback(); + context.yield(); + } else { + // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible + // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. + getLogger().warn("Failed to delete file or directory", ioe); + + Map attributes = Maps.newHashMapWithExpectedSize(1); + // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) + attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); + + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); + failedPath++; + } } } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 2a4986cadd82a..0316b578647e8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -46,6 +46,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; import java.io.FileNotFoundException; import java.io.IOException; @@ -54,6 +55,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -141,60 +143,70 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final StopWatch stopWatch = new StopWatch(true); final FlowFile finalFlowFile = flowFile; - ugi.doAs(new PrivilegedAction() { - @Override - public Object run() { - InputStream stream = null; - CompressionCodec codec = null; - Configuration conf = getConfiguration(); - final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); - final CompressionType compressionType = getCompressionType(context); - final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; - - if(inferCompressionCodec) { - codec = compressionCodecFactory.getCodec(path); - } else if (compressionType != CompressionType.NONE) { - codec = getCompressionCodec(context, getConfiguration()); + ugi.doAs((PrivilegedAction) () -> { + InputStream stream = null; + CompressionCodec codec = null; + Configuration conf = getConfiguration(); + final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); + final CompressionType compressionType = getCompressionType(context); + final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; + + if (inferCompressionCodec) { + codec = compressionCodecFactory.getCodec(path); + } else if (compressionType != CompressionType.NONE) { + codec = getCompressionCodec(context, getConfiguration()); + } + + FlowFile flowFile1 = finalFlowFile; + final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); + try { + final String outputFilename; + final String originalFilename = path.getName(); + stream = hdfs.open(path, 16384); + + // Check if compression codec is defined (inferred or otherwise) + if (codec != null) { + stream = codec.createInputStream(stream); + outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension()); + } else { + outputFilename = originalFilename; } - FlowFile flowFile = finalFlowFile; - final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); - try { - final String outputFilename; - final String originalFilename = path.getName(); - stream = hdfs.open(path, 16384); - - // Check if compression codec is defined (inferred or otherwise) - if (codec != null) { - stream = codec.createInputStream(stream); - outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension()); - } else { - outputFilename = originalFilename; + flowFile1 = session.importFrom(stream, finalFlowFile); + flowFile1 = session.putAttribute(flowFile1, CoreAttributes.FILENAME.key(), outputFilename); + + stopWatch.stop(); + getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, flowFile1, stopWatch.getDuration()}); + flowFile1 = session.putAttribute(flowFile1, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); + session.getProvenanceReporter().fetch(flowFile1, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile1, getSuccessRelationship()); + } catch (final FileNotFoundException | AccessControlException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, flowFile1, e}); + flowFile1 = session.putAttribute(flowFile1, getAttributePrefix() + ".failure.reason", e.getMessage()); + flowFile1 = session.penalize(flowFile1); + session.transfer(flowFile1, getFailureRelationship()); + } catch (final IOException e) { + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException resetResourcesException) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException); } - - flowFile = session.importFrom(stream, finalFlowFile); - flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); - - stopWatch.stop(); - getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()}); - flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); - session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, getSuccessRelationship()); - } catch (final FileNotFoundException | AccessControlException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e}); - flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage()); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, getFailureRelationship()); - } catch (final IOException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, getCommsFailureRelationship()); - } finally { - IOUtils.closeQuietly(stream); + session.rollback(); + context.yield(); + } else { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[]{qualifiedPath, flowFile1, e}); + flowFile1 = session.penalize(flowFile1); + session.transfer(flowFile1, getCommsFailureRelationship()); } - - return null; + } finally { + IOUtils.closeQuietly(stream); } + + return null; }); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index f1e86613665f5..6f2a6ed327dd1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -50,6 +50,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; import java.io.IOException; import java.io.InputStream; @@ -59,6 +60,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -294,12 +296,22 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } if (logEmptyListing.getAndDecrement() > 0) { getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", - new Object[]{millis, listedFiles.size(), newItems}); + millis, listedFiles.size(), newItems); } } } catch (IOException e) { + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException ioe) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor."); + } + } context.yield(); - getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e}); + getLogger().warn("Error while retrieving list of files due to {}", e.getMessage(), e); return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -388,18 +400,29 @@ protected void processBatchOfFiles(final List files, final ProcessContext flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.delete(file, false))) { - getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", - new Object[]{file}); + getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", file); session.remove(flowFile); continue; } session.getProvenanceReporter().receive(flowFile, file.toString()); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", - new Object[]{flowFile, file, millis, dataRate}); + getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", flowFile, file, millis, dataRate); + } catch (final IOException e) { + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException resetResourcesException) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException); + } + } + session.rollback(); + context.yield(); } catch (final Throwable t) { - getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t}); + getLogger().error("Error retrieving file {} from HDFS due to {}", file, t); session.rollback(); context.yield(); } finally { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java index 8383732f77813..ef7e85fdad287 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -55,6 +56,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.ietf.jgss.GSSException; import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.ALL; import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.DIR; @@ -75,10 +77,10 @@ @WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the object in HDFS"), @WritesAttribute(attribute = "hdfs.group", description = "The group that owns the object in HDFS"), @WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), - @WritesAttribute(attribute = "hdfs.length", description = "" - + "In case of files: The number of bytes in the file in HDFS. " + @WritesAttribute(attribute = "hdfs.length", description = + "In case of files: The number of bytes in the file in HDFS. " + "In case of dirs: Retuns storage space consumed by directory. " - + ""), + ), @WritesAttribute(attribute = "hdfs.count.files", description = "In case of type='directory' will represent total count of files under this dir. " + "Won't be populated to other types of HDFS objects. "), @WritesAttribute(attribute = "hdfs.count.dirs", description = "In case of type='directory' will represent total count of directories under this dir (including itself). " @@ -326,6 +328,23 @@ public void onTrigger(final ProcessContext context, final ProcessSession session getLogger().error("Interrupted while performing listing of HDFS", e); ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); session.transfer(ff, REL_FAILURE); + } catch (final IOException e) { + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException resetResourcesException) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException); + } + session.rollback(); + context.yield(); + } else { + getLogger().error("Interrupted while performing listing of HDFS", e); + ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); + session.transfer(ff, REL_FAILURE); + } } catch (final Exception e) { getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e}); ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index 86fe7917335fa..79eba76e850ce 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@ -32,11 +32,14 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; +import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -108,6 +111,20 @@ protected void processBatchOfFiles(final List files, final ProcessContext if (!keepSourceFiles && !hdfs.delete(file, false)) { logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over..."); } + } catch (final IOException e) { + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException resetResourcesException) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException); + } + + } + session.rollback(); + context.yield(); } catch (Throwable t) { logger.error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t}); session.rollback(); @@ -132,12 +149,7 @@ protected void processBatchOfFiles(final List files, final ProcessContext } protected Set getFlowFiles(final Configuration conf, final FileSystem hdfs, final SequenceFileReader> reader, final Path file) throws Exception { - PrivilegedExceptionAction> privilegedExceptionAction = new PrivilegedExceptionAction>() { - @Override - public Set run() throws Exception { - return reader.readSequenceFile(file, conf, hdfs); - } - }; + PrivilegedExceptionAction> privilegedExceptionAction = () -> reader.readSequenceFile(file, conf, hdfs); UserGroupInformation userGroupInformation = getUserGroupInformation(); if (userGroupInformation == null) { return privilegedExceptionAction.run(); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java index 33e1fac44c751..7acb04e37b29b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java @@ -47,6 +47,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; import java.io.FileNotFoundException; import java.io.IOException; @@ -57,6 +58,7 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -254,8 +256,29 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (!directoryExists) { throw new IOException("Input Directory or File does not exist in HDFS"); } + } catch (final IOException e) { + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("An error occurred while connecting to HDFS. " + + "Rolling back session, resetting HDFS resources, and penalizing flow file {}", + flowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException ioe) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor."); + } + session.rollback(); + context.yield(); + return; + } else { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e); + flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + return; } catch (Exception e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, flowFile, e}); + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e); flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); @@ -294,7 +317,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (logEmptyListing.getAndDecrement() > 0) { getLogger().info( "Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", - new Object[]{millis, listedFiles.size(), newItems}); + millis, listedFiles.size(), newItems); } } } catch (IOException e) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 91e91ff7b1b00..6aea17fd2ca34 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -18,7 +18,6 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.base.Throwables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileStatus; @@ -54,7 +53,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; @@ -63,7 +61,6 @@ import java.io.BufferedInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.io.UncheckedIOException; import java.security.PrivilegedAction; @@ -76,8 +73,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.stream.Stream; /** * This processor copies FlowFiles to HDFS. @@ -295,7 +290,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro return; } - ugi.doAs(new PrivilegedAction() { + ugi.doAs(new PrivilegedAction<>() { @Override public Object run() { Path tempDotCopyFile = null; @@ -352,18 +347,18 @@ public Object run() { case REPLACE_RESOLUTION: if (hdfs.delete(copyFile, false)) { getLogger().info("deleted {} in order to replace with the contents of {}", - new Object[]{copyFile, putFlowFile}); + copyFile, putFlowFile); } break; case IGNORE_RESOLUTION: session.transfer(putFlowFile, getSuccessRelationship()); getLogger().info("transferring {} to success because file with same name already exists", - new Object[]{putFlowFile}); + putFlowFile); return null; case FAIL_RESOLUTION: session.transfer(session.penalize(putFlowFile), getFailureRelationship()); getLogger().warn("penalizing {} and routing to failure because file with same name already exists", - new Object[]{putFlowFile}); + putFlowFile); return null; default: break; @@ -372,63 +367,68 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); - session.read(putFlowFile, new InputStreamCallback() { - - @Override - public void process(InputStream in) throws IOException { - OutputStream fos = null; - Path createdFile = null; - try { - if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { - fos = hdfs.append(copyFile, bufferSize); - } else { - final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - - if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); - } + session.read(putFlowFile, in -> { + OutputStream fos = null; + Path createdFile = null; + try { + if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { + fos = hdfs.append(copyFile, bufferSize); + } else { + final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, - null, null); + if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } - if (codec != null) { - fos = codec.createOutputStream(fos); + fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, + null, null); + } + + if (codec != null) { + fos = codec.createOutputStream(fos); + } + createdFile = actualCopyFile; + BufferedInputStream bis = new BufferedInputStream(in); + StreamUtils.copy(bis, fos); + bis = null; + fos.flush(); + } catch (IOException e) { + // Catch GSSExceptions and reset the resources + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional); + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + throw new ProcessException(causeOptional.get()); + } else { + throw new ProcessException(e); + } + } finally { + try { + if (fos != null) { + fos.close(); } - createdFile = actualCopyFile; - BufferedInputStream bis = new BufferedInputStream(in); - StreamUtils.copy(bis, fos); - bis = null; - fos.flush(); - } finally { - try { - if (fos != null) { - fos.close(); + } catch (Throwable t) { + // when talking to remote HDFS clusters, we don't notice problems until fos.close() + if (createdFile != null) { + try { + hdfs.delete(createdFile, false); + } catch (Throwable ignore) { } - } catch (Throwable t) { - // when talking to remote HDFS clusters, we don't notice problems until fos.close() - if (createdFile != null) { - try { - hdfs.delete(createdFile, false); - } catch (Throwable ignore) { - } - } - throw t; } - fos = null; + throw t; } + fos = null; } - }); stopWatch.stop(); final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize()); final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); tempDotCopyFile = tempCopyFile; - if ( - writingStrategy.equals(WRITE_AND_RENAME) - && (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) + if ( + writingStrategy.equals(WRITE_AND_RENAME) + && (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) ) { boolean renamed = false; @@ -449,7 +449,7 @@ public void process(InputStream in) throws IOException { } getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", - new Object[]{putFlowFile, copyFile, millis, dataRate}); + putFlowFile, copyFile, millis, dataRate); final String newFilename = copyFile.getName(); final String hdfsPath = copyFile.getParent().toString(); @@ -465,10 +465,16 @@ public void process(InputStream in) throws IOException { } catch (final IOException e) { Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); if (causeOptional.isPresent()) { - getLogger().warn("An error occurred while connecting to HDFS. " - + "Rolling back session, and penalizing flow file {}", - new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()}); - session.rollback(true); + getLogger().error("An error occurred while connecting to HDFS. " + + "Rolling back session, resetting HDFS resources, and penalizing flow file {}", + putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()); + try { + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); + } catch (IOException ioe) { + getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor."); + } + session.rollback(false); + context.yield(); } else { getLogger().error("Failed to access HDFS due to {}", new Object[]{e}); session.transfer(putFlowFile, getFailureRelationship()); @@ -548,21 +554,6 @@ protected String getGroup(final ProcessContext context, final FlowFile flowFile) return group == null || group.isEmpty() ? null : group; } - /** - * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type, - * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise. - * @param t The throwable to inspect for the cause. - * @return Throwable Cause - */ - private Optional findCause(Throwable t, Class expectedCauseType, Predicate causePredicate) { - Stream causalChain = Throwables.getCausalChain(t).stream(); - return causalChain - .filter(expectedCauseType::isInstance) - .map(expectedCauseType::cast) - .filter(causePredicate) - .findFirst(); - } - protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { try { // Change owner and group of file if configured to do so diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java index af1df3d56b369..7b59817d212bd 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java @@ -27,6 +27,10 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.ietf.jgss.GSSException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -37,13 +41,14 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class GetHDFSSequenceFileTest { - private HdfsResources hdfsResources; + private HdfsResources hdfsResourcesLocal; private GetHDFSSequenceFile getHDFSSequenceFile; private Configuration configuration; private FileSystem fileSystem; @@ -55,9 +60,8 @@ public void setup() throws IOException { configuration = mock(Configuration.class); fileSystem = mock(FileSystem.class); userGroupInformation = mock(UserGroupInformation.class); - hdfsResources = new HdfsResources(configuration, fileSystem, userGroupInformation, null); - getHDFSSequenceFile = new TestableGetHDFSSequenceFile(); - getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class); + hdfsResourcesLocal = new HdfsResources(configuration, fileSystem, userGroupInformation, null); + getHDFSSequenceFile = new TestableGetHDFSSequenceFile(new KerberosProperties(null), userGroupInformation); reloginTried = false; init(); } @@ -75,6 +79,7 @@ private void init() throws IOException { public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws Exception { SequenceFileReader reader = mock(SequenceFileReader.class); Path file = mock(Path.class); + getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class); getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, file); ArgumentCaptor privilegedExceptionActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedExceptionAction.class); verifyNoMoreInteractions(reader); @@ -86,7 +91,8 @@ public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws @Test public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception { - hdfsResources = new HdfsResources(configuration, fileSystem, null, null); + getHDFSSequenceFile = new TestableGetHDFSSequenceFile(new KerberosProperties(null), null); + hdfsResourcesLocal = new HdfsResources(configuration, fileSystem, null, null); init(); SequenceFileReader reader = mock(SequenceFileReader.class); Path file = mock(Path.class); @@ -94,10 +100,45 @@ public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception { verify(reader).readSequenceFile(file, configuration, fileSystem); } + @Test + public void testGSSExceptionOnDoAs() throws Exception { + NiFiProperties mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + GetHDFSSequenceFile testSubject = new TestableGetHDFSSequenceFile(getHDFSSequenceFile.kerberosProperties, userGroupInformation, true); + TestRunner runner = TestRunners.newTestRunner(testSubject); + runner.setProperty(GetHDFSSequenceFile.DIRECTORY, "path/does/not/exist"); + runner.run(); + // assert no flowfiles transferred to outgoing relationships + runner.assertTransferCount(MoveHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(MoveHDFS.REL_FAILURE, 0); + } + public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile { + + UserGroupInformation userGroupInformation; + private KerberosProperties kerberosProperties; + + + public TestableGetHDFSSequenceFile(KerberosProperties kerberosProperties, UserGroupInformation ugi) throws IOException { + this(kerberosProperties, ugi, false); + } + + public TestableGetHDFSSequenceFile(KerberosProperties kerberosProperties, UserGroupInformation ugi, boolean failOnDoAs) throws IOException { + this.kerberosProperties = kerberosProperties; + this.userGroupInformation = ugi; + if(failOnDoAs && userGroupInformation != null) { + try { + when(userGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenThrow(new IOException(new GSSException(13))); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + + @Override HdfsResources resetHDFSResources(final List resourceLocations, ProcessContext context) throws IOException { - return hdfsResources; + return hdfsResourcesLocal; } @Override @@ -109,5 +150,9 @@ public void onScheduled(ProcessContext context) throws IOException { protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { return kerberosProperties; } + + protected UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index 903e38d750d67..35683e13d5dd0 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -31,6 +31,7 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.ietf.jgss.GSSException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; @@ -58,12 +59,11 @@ @DisabledOnOs(OS.WINDOWS) public class GetHDFSTest { - private NiFiProperties mockNiFiProperties; private KerberosProperties kerberosProperties; @BeforeEach public void setup() { - mockNiFiProperties = mock(NiFiProperties.class); + NiFiProperties mockNiFiProperties = mock(NiFiProperties.class); when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); kerberosProperties = new KerberosProperties(null); } @@ -179,7 +179,7 @@ public void testAutomaticDecompression() throws IOException { assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1")); + assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1"); flowFile.assertContentEquals(expected); } @@ -198,7 +198,7 @@ public void testInferCompressionCodecDisabled() throws IOException { assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz")); + assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz"); flowFile.assertContentEquals(expected); } @@ -217,7 +217,7 @@ public void testFileExtensionNotACompressionCodec() throws IOException { assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); + assertEquals("13545423550275052.zip", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); flowFile.assertContentEquals(expected); } @@ -236,7 +236,7 @@ public void testDirectoryUsesValidEL() throws IOException { assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); + assertEquals("13545423550275052.zip", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); flowFile.assertContentEquals(expected); final List provenanceEvents = runner.getProvenanceEvents(); @@ -297,19 +297,19 @@ private void directoryExistsWrappedInUGICall(boolean directoryExists) throws IOE runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata"); // WHEN - Answer answer = new Answer() { + Answer answer = new Answer<>() { private int callCounter = 0; @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { final Object result; if (callCounter == 0) { when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists); - result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run(); + result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run(); verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class)); verify(mockFileSystem).exists(any(Path.class)); } else { when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]); - result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run(); + result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run(); verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class)); verify(mockFileSystem).listStatus(any(Path.class)); } @@ -322,7 +322,24 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { // THEN verify(mockFileSystem).getUri(); - verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation); + verifyNoMoreInteractions(mockUserGroupInformation); + } + + @Test + public void testGSSExceptionOnExists() throws Exception { + FileSystem mockFileSystem = mock(FileSystem.class); + UserGroupInformation mockUserGroupInformation = mock(UserGroupInformation.class); + + GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, mockFileSystem, mockUserGroupInformation); + TestRunner runner = TestRunners.newTestRunner(testSubject); + runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata"); + when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenThrow(new IOException(new GSSException(13))); + runner.run(); + + // Assert session rollback + runner.assertTransferCount(GetHDFS.REL_SUCCESS, 0); + // assert that no files were penalized + runner.assertPenalizeCount(0); } private static class TestableGetHDFS extends GetHDFS { @@ -340,8 +357,8 @@ protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { } private static class TestableGetHDFSForUGI extends TestableGetHDFS { - private FileSystem mockFileSystem; - private UserGroupInformation mockUserGroupInformation; + private final FileSystem mockFileSystem; + private final UserGroupInformation mockUserGroupInformation; public TestableGetHDFSForUGI(KerberosProperties testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation mockUserGroupInformation) { super(testKerberosProperties); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java index 8e14507571d1b..6a95d9d66d01e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java @@ -18,27 +18,38 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.hadoop.util.MockFileSystem; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.ietf.jgss.GSSException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import javax.security.sasl.SaslException; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,7 +64,6 @@ public class MoveHDFSTest { private static final String OUTPUT_DIRECTORY = "target/test-data-output"; private static final String TEST_DATA_DIRECTORY = "src/test/resources/testdata"; private static final String INPUT_DIRECTORY = "target/test-data-input"; - private NiFiProperties mockNiFiProperties; private KerberosProperties kerberosProperties; @BeforeAll @@ -63,7 +73,7 @@ public static void setUpSuite() { @BeforeEach public void setup() { - mockNiFiProperties = mock(NiFiProperties.class); + NiFiProperties mockNiFiProperties = mock(NiFiProperties.class); when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); kerberosProperties = new KerberosProperties(null); } @@ -244,6 +254,37 @@ public void testEmptyInputDirectory() throws IOException { assertEquals(0, flowFiles.size()); } + @Test + public void testPutFileWithGSSException() throws IOException { + MockFileSystem noCredentialsFileSystem = new MockFileSystem() { + @Override + public FileStatus getFileStatus(Path path) throws IOException { + throw new IOException("ioe", new SaslException("sasle", new GSSException(13))); + } + }; + noCredentialsFileSystem.setFailOnExists(true); + TestRunner runner = TestRunners.newTestRunner(new TestableMoveHDFS(kerberosProperties, noCredentialsFileSystem)); + runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "input/does/not/exist"); + runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, "target/test-classes"); + runner.setProperty(MoveHDFS.CONFLICT_RESOLUTION, "replace"); + + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + // assert no flowfiles transferred to outgoing relationships + runner.assertTransferCount(MoveHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(MoveHDFS.REL_FAILURE, 0); + // assert the processor's queue is not empty (session rollback) + assertFalse(runner.isQueueEmpty()); + // assert that no files were penalized + runner.assertPenalizeCount(0); + noCredentialsFileSystem.setFailOnExists(false); + } + @Test public void testPutWhenAlreadyExistingShouldFailWhenFAIL_RESOLUTION() throws IOException { testPutWhenAlreadyExisting(MoveHDFS.FAIL_RESOLUTION, MoveHDFS.REL_FAILURE, "randombytes-1"); @@ -292,15 +333,30 @@ private void testPutWhenAlreadyExisting(String conflictResolution, Relationship private static class TestableMoveHDFS extends MoveHDFS { - private KerberosProperties testKerberosProperties; + private final KerberosProperties testKerberosProperties; + private final FileSystem fileSystem; public TestableMoveHDFS(KerberosProperties testKerberosProperties) { + this(testKerberosProperties, null); + } + + public TestableMoveHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) { this.testKerberosProperties = testKerberosProperties; + this.fileSystem = fileSystem; } @Override protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { return testKerberosProperties; } + + @Override + protected FileSystem getFileSystem(Configuration config) throws IOException { + return fileSystem == null ? super.getFileSystem(config) : fileSystem; + } + @Override + protected FileSystem getFileSystem() { + return fileSystem == null ? super.getFileSystem() : fileSystem; + } } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index ab33df270ef5d..d0bc337f4b906 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -17,16 +17,12 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.util.Progressable; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -35,6 +31,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.hadoop.util.MockFileSystem; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; @@ -47,13 +44,9 @@ import org.mockito.Mockito; import javax.security.sasl.SaslException; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -75,7 +68,7 @@ public class PutHDFSTest { private final static String FILE_NAME = "randombytes-1"; private KerberosProperties kerberosProperties; - private FileSystem mockFileSystem; + private MockFileSystem mockFileSystem; @BeforeEach public void setup() { @@ -359,10 +352,6 @@ public FileStatus getFileStatus(Path path) throws IOException { // assert no flowfiles transferred to outgoing relationships runner.assertTransferCount(PutHDFS.REL_SUCCESS, 0); runner.assertTransferCount(PutHDFS.REL_FAILURE, 0); - // assert the input flowfile was penalized - List penalizedFlowFiles = runner.getPenalizedFlowFiles(); - assertEquals(1, penalizedFlowFiles.size()); - assertEquals("randombytes-1", penalizedFlowFiles.iterator().next().getAttribute(CoreAttributes.FILENAME.key())); // assert the processor's queue is not empty assertFalse(runner.isQueueEmpty()); assertEquals(1, runner.getQueueSize().getObjectCount()); @@ -610,7 +599,8 @@ public void testPutFileWithDefaultACL() { @Test public void testPutFileWithCloseException() throws IOException { - mockFileSystem = new MockFileSystem(true); + mockFileSystem = new MockFileSystem(); + mockFileSystem.setFailOnClose(true); String dirName = "target/testPutFileCloseException"; File file = new File(dirName); file.mkdirs(); @@ -635,10 +625,38 @@ public void testPutFileWithCloseException() throws IOException { mockFileSystem.delete(p, true); } + @Test + public void testPutFileWithCreateException() throws IOException { + mockFileSystem = new MockFileSystem(); + mockFileSystem.setFailOnCreate(true); + String dirName = "target/testPutFileCreateException"; + File file = new File(dirName); + file.mkdirs(); + Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory()); + + TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem)); + runner.setProperty(PutHDFS.DIRECTORY, dirName); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + List failedFlowFiles = runner + .getFlowFilesForRelationship(PutHDFS.REL_FAILURE); + assertFalse(failedFlowFiles.isEmpty()); + assertTrue(failedFlowFiles.get(0).isPenalized()); + + mockFileSystem.delete(p, true); + } + private class TestablePutHDFS extends PutHDFS { - private KerberosProperties testKerberosProperties; - private FileSystem fileSystem; + private final KerberosProperties testKerberosProperties; + private final FileSystem fileSystem; TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) { this.testKerberosProperties = testKerberosProperties; @@ -661,134 +679,4 @@ protected FileSystem getFileSystem() { return fileSystem; } } - - private static class MockFileSystem extends FileSystem { - private final Map pathToStatus = new HashMap<>(); - private final Map> pathToAcl = new HashMap<>(); - private final boolean failOnClose; - - public MockFileSystem() { - failOnClose = false; - } - - public MockFileSystem(boolean failOnClose) { - this.failOnClose = failOnClose; - } - - public void setAcl(final Path path, final List aclSpec) { - pathToAcl.put(path, aclSpec); - } - - @Override - public AclStatus getAclStatus(final Path path) { - return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new ArrayList<>())).build(); - } - - @Override - public URI getUri() { - return URI.create("file:///"); - } - - @Override - public FSDataInputStream open(final Path f, final int bufferSize) { - return null; - } - - @Override - public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, - final long blockSize, final Progressable progress) { - pathToStatus.put(f, newFile(f, permission)); - if(failOnClose) { - return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) { - @Override - public void close() throws IOException { - super.close(); - throw new IOException("Fail on close"); - } - }; - } else { - return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); - } - } - - @Override - public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) { - return null; - } - - @Override - public boolean rename(final Path src, final Path dst) { - if (pathToStatus.containsKey(src)) { - pathToStatus.put(dst, pathToStatus.remove(src)); - } else { - return false; - } - return true; - } - - @Override - public boolean delete(final Path f, final boolean recursive) { - if (pathToStatus.containsKey(f)) { - pathToStatus.remove(f); - } else { - return false; - } - return true; - } - - @Override - public FileStatus[] listStatus(final Path f) { - return null; - } - - @Override - public void setWorkingDirectory(final Path new_dir) { - - } - - @Override - public Path getWorkingDirectory() { - return new Path(new File(".").getAbsolutePath()); - } - - @Override - public boolean mkdirs(final Path f, final FsPermission permission) { - return false; - } - - @Override - public boolean mkdirs(Path f) { - pathToStatus.put(f, newDir(f)); - return true; - } - - @Override - public FileStatus getFileStatus(final Path f) throws IOException { - final FileStatus fileStatus = pathToStatus.get(f); - if (fileStatus == null) throw new FileNotFoundException(); - return fileStatus; - } - - @Override - public boolean exists(Path f) { - return pathToStatus.containsKey(f); - } - - private FileStatus newFile(Path p, FsPermission permission) { - return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p); - } - - private FileStatus newDir(Path p) { - return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false); - } - - @Override - public long getDefaultBlockSize(Path f) { - return 33554432L; - } - } - - static FsPermission perms(short p) { - return new FsPermission(p); - } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index 6b64cca7674b2..0023af5a1a3ef 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -27,6 +27,7 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.ietf.jgss.GSSException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,13 +46,12 @@ import static org.mockito.Mockito.when; public class TestDeleteHDFS { - private NiFiProperties mockNiFiProperties; private FileSystem mockFileSystem; private KerberosProperties kerberosProperties; @BeforeEach public void setup() throws Exception { - mockNiFiProperties = mock(NiFiProperties.class); + NiFiProperties mockNiFiProperties = mock(NiFiProperties.class); when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); kerberosProperties = new KerberosProperties(null); mockFileSystem = mock(FileSystem.class); @@ -114,6 +114,20 @@ public void testIOException() throws Exception { runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); } + @Test + public void testGSSException() throws Exception { + Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenThrow(new IOException(new GSSException(13))); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); + Map attributes = Maps.newHashMap(); + attributes.put("hdfs.file", filePath.toString()); + runner.enqueue("foo", attributes); + runner.run(); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); + } + @Test public void testPermissionIOException() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java index 43f0c3ce6d5ab..036904312c381 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -16,8 +16,10 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.hadoop.fs.FileSystem; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.processors.hadoop.util.MockFileSystem; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; @@ -44,17 +46,15 @@ public class TestFetchHDFS { private TestRunner runner; - private TestableFetchHDFS proc; - private NiFiProperties mockNiFiProperties; private KerberosProperties kerberosProperties; @BeforeEach public void setup() { - mockNiFiProperties = mock(NiFiProperties.class); + NiFiProperties mockNiFiProperties = mock(NiFiProperties.class); when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); kerberosProperties = new KerberosProperties(null); - proc = new TestableFetchHDFS(kerberosProperties); + TestableFetchHDFS proc = new TestableFetchHDFS(kerberosProperties); runner = TestRunners.newTestRunner(proc); } @@ -63,7 +63,7 @@ public void testFetchStaticFileThatExists() { final String file = "src/test/resources/testdata/randombytes-1"; final String fileWithMultipliedSeparators = "src/test////resources//testdata/randombytes-1"; runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); final List provenanceEvents = runner.getProvenanceEvents(); @@ -83,7 +83,7 @@ public void testFetchStaticFileThatExistsWithAbsolutePath() { final String file = destination.getAbsolutePath(); final String fileWithMultipliedSeparators = "/" + file; runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); final List provenanceEvents = runner.getProvenanceEvents(); @@ -98,7 +98,7 @@ public void testFetchStaticFileThatExistsWithAbsolutePath() { public void testFetchStaticFileThatDoesNotExist() { final String file = "src/test/resources/testdata/doesnotexist"; runner.setProperty(FetchHDFS.FILENAME, file); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1); } @@ -111,7 +111,7 @@ public void testFetchFileThatExistsFromIncomingFlowFile() { final Map attributes = new HashMap<>(); attributes.put("my.file", file); - runner.enqueue(new String("trigger flow file"), attributes); + runner.enqueue("trigger flow file", attributes); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); } @@ -120,7 +120,7 @@ public void testFetchFileThatExistsFromIncomingFlowFile() { public void testFilenameWithValidEL() { final String file = "src/test/resources/testdata/${literal('randombytes-1')}"; runner.setProperty(FetchHDFS.FILENAME, file); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); } @@ -136,7 +136,7 @@ public void testFilenameWithInvalidEL() { public void testFilenameWithUnrecognizedEL() { final String file = "data_${literal('testing'):substring(0,4)%7D"; runner.setProperty(FetchHDFS.FILENAME, file); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1); } @@ -147,14 +147,14 @@ public void testAutomaticDecompression() throws IOException { TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC"); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); List flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1")); + assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1"); flowFile.assertContentEquals(expected); } @@ -165,14 +165,14 @@ public void testInferCompressionCodecDisabled() throws IOException { TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE"); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); List flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz")); + assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz"); flowFile.assertContentEquals(expected); } @@ -183,28 +183,58 @@ public void testFileExtensionNotACompressionCodec() throws IOException { TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/13545423550275052.zip"); runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC"); - runner.enqueue(new String("trigger flow file")); + runner.enqueue("trigger flow file"); runner.run(); List flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); + assertEquals("13545423550275052.zip", flowFile.getAttribute(CoreAttributes.FILENAME.key())); InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); flowFile.assertContentEquals(expected); } + @Test + public void testGSSException() throws IOException { + MockFileSystem fileSystem = new MockFileSystem(); + fileSystem.setFailOnOpen(true); + FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); + runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE"); + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 0); + // assert that no files were penalized + runner.assertPenalizeCount(0); + fileSystem.setFailOnOpen(false); + } + private static class TestableFetchHDFS extends FetchHDFS { private final KerberosProperties testKerberosProps; + private final FileSystem fileSystem; public TestableFetchHDFS(KerberosProperties testKerberosProps) { this.testKerberosProps = testKerberosProps; + this.fileSystem = null; + } + public TestableFetchHDFS(KerberosProperties testKerberosProps, final FileSystem fileSystem) { + this.testKerberosProps = testKerberosProps; + this.fileSystem = fileSystem; } @Override protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { return testKerberosProps; } + + @Override + protected FileSystem getFileSystem() { + return fileSystem == null ? super.getFileSystem() : fileSystem; + } } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java index 3474fd68b053a..2b23e02939c91 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java @@ -18,30 +18,22 @@ import com.google.common.collect.Maps; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.hadoop.util.MockFileSystem; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; @@ -256,6 +248,28 @@ public void testRunWithIOException() throws InterruptedException { mff.assertAttributeEquals("hdfs.status", "Failed due to: java.io.InterruptedIOException"); } + @Test + public void testWithGSSException() { + proc.fileSystem.setFailOnExists(true); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.run(); + + // Assert session rollback + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + + proc.fileSystem.setFailOnExists(false); + } + @Test public void testRunWithPermissionsExceptionAttributes() throws InterruptedException { @@ -788,136 +802,4 @@ protected FileSystem getFileSystem(final Configuration config) throws IOExceptio return fileSystem; } } - - private class MockFileSystem extends FileSystem { - private final Map> fileStatuses = new HashMap<>(); - private final Map pathToStatus = new HashMap<>(); - - public void addFileStatus(final FileStatus parent, final FileStatus child) { - Set children = fileStatuses.computeIfAbsent(parent.getPath(), k -> new HashSet<>()); - if (child != null) { - children.add(child); - if (child.isDirectory() && !fileStatuses.containsKey(child.getPath())) { - fileStatuses.put(child.getPath(), new HashSet()); - } - } - - pathToStatus.put(parent.getPath(), parent); - pathToStatus.put(child.getPath(), child); - } - - @Override - @SuppressWarnings("deprecation") - public long getDefaultBlockSize() { - return 1024L; - } - - @Override - @SuppressWarnings("deprecation") - public short getDefaultReplication() { - return 1; - } - - @Override - public URI getUri() { - return null; - } - - @Override - public FSDataInputStream open(final Path f, final int bufferSize) { - return null; - } - - @Override - public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, - final long blockSize, final Progressable progress) { - return null; - } - - @Override - public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) { - return null; - } - - @Override - public boolean rename(final Path src, final Path dst) throws IOException { - return false; - } - - @Override - public boolean delete(final Path f, final boolean recursive) { - return false; - } - - @Override - public FileStatus[] listStatus(final Path f) throws IOException { - if (!fileStatuses.containsKey(f)) { - throw new FileNotFoundException(); - } - - if (f.getName().startsWith("list_exception_")) { - final String className = f.getName().substring("list_exception_".length()); - final IOException exception; - try { - exception = (IOException) Class.forName(className).getDeclaredConstructor().newInstance(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - throw exception; - } - - final Set statuses = fileStatuses.get(f); - if (statuses == null) { - return new FileStatus[0]; - } - - for (FileStatus s : statuses) { - getFileStatus(s.getPath()); //support exception handling only. - } - - return statuses.toArray(new FileStatus[0]); - } - - @Override - public void setWorkingDirectory(final Path new_dir) { - - } - - @Override - public Path getWorkingDirectory() { - return new Path(new File(".").getAbsolutePath()); - } - - @Override - public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { - return false; - } - - @Override - public FileStatus getFileStatus(final Path path) throws IOException { - if (path != null && path.getName().startsWith("exception_")) { - final String className = path.getName().substring("exception_".length()); - final IOException exception; - try { - exception = (IOException) Class.forName(className).getDeclaredConstructor().newInstance(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - throw exception; - } - - final FileStatus fileStatus = pathToStatus.get(path); - if (fileStatus == null) { - throw new FileNotFoundException(); - } - return fileStatus; - } - - public FileStatus newFile(String p) { - return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p)); - } - public FileStatus newDir(String p) { - return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p)); - } - } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java new file mode 100644 index 0000000000000..3a3477502e01e --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop.util; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.util.Progressable; +import org.ietf.jgss.GSSException; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class MockFileSystem extends FileSystem { + private final Map pathToStatus = new HashMap<>(); + private final Map> pathToAcl = new HashMap<>(); + private final Map> fileStatuses = new HashMap<>(); + + private boolean failOnOpen; + private boolean failOnClose; + private boolean failOnCreate; + private boolean failOnFileStatus; + private boolean failOnExists; + + + public void setFailOnClose(final boolean failOnClose) { + this.failOnClose = failOnClose; + } + + public void setFailOnCreate(final boolean failOnCreate) { + this.failOnCreate = failOnCreate; + } + + public void setFailOnFileStatus(final boolean failOnFileStatus) { + this.failOnFileStatus = failOnFileStatus; + } + + public void setFailOnExists(final boolean failOnExists) { + this.failOnExists = failOnExists; + } + + public void setFailOnOpen(final boolean failOnOpen) { + this.failOnOpen = failOnOpen; + } + + public void setAcl(final Path path, final List aclSpec) { + pathToAcl.put(path, aclSpec); + } + + @Override + public AclStatus getAclStatus(final Path path) { + return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new ArrayList<>())).build(); + } + + @Override + public URI getUri() { + return URI.create("file:///"); + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { + if (failOnOpen) { + throw new IOException(new GSSException(13)); + } + return null; + } + + @Override + public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, + final long blockSize, final Progressable progress) throws IOException { + if (failOnCreate) { + // Simulate an AuthenticationException wrapped in an IOException + throw new IOException(new AuthenticationException("test auth error")); + } + pathToStatus.put(f, newFile(f, permission)); + if(failOnClose) { + return new FSDataOutputStream(new ByteArrayOutputStream(), new FileSystem.Statistics("")) { + @Override + public void close() throws IOException { + super.close(); + throw new IOException("Fail on close"); + } + }; + } else { + return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); + } + } + + @Override + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) { + return null; + } + + @Override + public boolean rename(final Path src, final Path dst) { + if (pathToStatus.containsKey(src)) { + pathToStatus.put(dst, pathToStatus.remove(src)); + } else { + return false; + } + return true; + } + + @Override + public boolean delete(final Path f, final boolean recursive) { + if (pathToStatus.containsKey(f)) { + pathToStatus.remove(f); + } else { + return false; + } + return true; + } + + @Override + public void setWorkingDirectory(final Path new_dir) { + + } + + @Override + public Path getWorkingDirectory() { + return new Path(new File(".").getAbsolutePath()); + } + + @Override + public boolean mkdirs(final Path f, final FsPermission permission) { + return false; + } + + @Override + public boolean mkdirs(Path f) { + pathToStatus.put(f, newDir(f)); + return true; + } + + @Override + public FileStatus getFileStatus(final Path path) throws IOException { + if (failOnFileStatus) { + throw new IOException(new GSSException(13)); + } + if (path != null && path.getName().startsWith("exception_")) { + final String className = path.getName().substring("exception_".length()); + final IOException exception; + try { + exception = (IOException) Class.forName(className).getDeclaredConstructor().newInstance(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + throw exception; + } + + final FileStatus fileStatus = pathToStatus.get(path); + if (fileStatus == null) { + throw new FileNotFoundException(); + } + return fileStatus; + } + + @Override + public boolean exists(Path f) throws IOException { + if (failOnExists) { + throw new IOException(new GSSException(13)); + } + return pathToStatus.containsKey(f); + } + + public FileStatus newFile(Path p, FsPermission permission) { + return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p); + } + + public FileStatus newDir(Path p) { + return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false); + } + + public FileStatus newFile(String p) { + return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p)); + } + public FileStatus newDir(String p) { + return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p)); + } + + @Override + public long getDefaultBlockSize(Path f) { + return 33554432L; + } + + public void addFileStatus(final FileStatus parent, final FileStatus child) { + Set children = fileStatuses.computeIfAbsent(parent.getPath(), k -> new HashSet<>()); + if (child != null) { + children.add(child); + if (child.isDirectory() && !fileStatuses.containsKey(child.getPath())) { + fileStatuses.put(child.getPath(), new HashSet<>()); + } + } + + pathToStatus.put(parent.getPath(), parent); + pathToStatus.put(child.getPath(), child); + } + + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + if (!fileStatuses.containsKey(f)) { + throw new FileNotFoundException(); + } + + if (f.getName().startsWith("list_exception_")) { + final String className = f.getName().substring("list_exception_".length()); + final IOException exception; + try { + exception = (IOException) Class.forName(className).getDeclaredConstructor().newInstance(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + throw exception; + } + + final Set statuses = fileStatuses.get(f); + if (statuses == null) { + return new FileStatus[0]; + } + + for (FileStatus s : statuses) { + getFileStatus(s.getPath()); //support exception handling only. + } + + return statuses.toArray(new FileStatus[0]); + } + + @Override + @SuppressWarnings("deprecation") + public long getDefaultBlockSize() { + return 1024L; + } + + @Override + @SuppressWarnings("deprecation") + public short getDefaultReplication() { + return 1; + } + + + private static FsPermission perms(short p) { + return new FsPermission(p); + } +} \ No newline at end of file