Skip to content

Commit

Permalink
NIFI-12889: Retry Kerberos login on auth failure in HDFS processors
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Mar 22, 2024
1 parent 884dd85 commit d4b6b5d
Show file tree
Hide file tree
Showing 16 changed files with 795 additions and 482 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.hadoop;

import com.google.common.base.Throwables;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -62,7 +63,9 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;

/**
* This is a base class that is helpful when building processors interacting with HDFS.
Expand Down Expand Up @@ -171,7 +174,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen

// variables shared by all threads of this processor
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();

// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
Expand Down Expand Up @@ -532,12 +535,7 @@ protected FileSystem getFileSystem(final Configuration config) throws IOExceptio

protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage());
}
Expand Down Expand Up @@ -703,4 +701,19 @@ private Path getNormalizedPath(final String rawPath, final Optional<String> prop

return new Path(path.replaceAll("/+", "/"));
}

/**
* Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
* and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
* @param t The throwable to inspect for the cause.
* @return Throwable Cause
*/
protected <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
return causalChain
.filter(expectedCauseType::isInstance)
.map(expectedCauseType::cast)
.filter(causePredicate)
.findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.ietf.jgss.GSSException;

import java.io.IOException;
import java.security.PrivilegedAction;
Expand All @@ -47,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -177,16 +179,29 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);

Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());

session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
failedPath++;
// Catch GSSExceptions and reset the resources
Optional<GSSException> causeOptional = findCause(ioe, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional);
try {
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
} catch (IOException resetResourcesException) {
getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException);
}
session.rollback();
context.yield();
} else {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);

Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());

session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
failedPath++;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -54,6 +55,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -141,60 +143,70 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final StopWatch stopWatch = new StopWatch(true);
final FlowFile finalFlowFile = flowFile;

ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration();
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
final CompressionType compressionType = getCompressionType(context);
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;

if(inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(path);
} else if (compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
ugi.doAs((PrivilegedAction<Object>) () -> {
InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration();
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
final CompressionType compressionType = getCompressionType(context);
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;

if (inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(path);
} else if (compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
}

FlowFile flowFile1 = finalFlowFile;
final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
final String originalFilename = path.getName();
stream = hdfs.open(path, 16384);

// Check if compression codec is defined (inferred or otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
}

FlowFile flowFile = finalFlowFile;
final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
final String originalFilename = path.getName();
stream = hdfs.open(path, 16384);

// Check if compression codec is defined (inferred or otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
flowFile1 = session.importFrom(stream, finalFlowFile);
flowFile1 = session.putAttribute(flowFile1, CoreAttributes.FILENAME.key(), outputFilename);

stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, flowFile1, stopWatch.getDuration()});
flowFile1 = session.putAttribute(flowFile1, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(flowFile1, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile1, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, flowFile1, e});
flowFile1 = session.putAttribute(flowFile1, getAttributePrefix() + ".failure.reason", e.getMessage());
flowFile1 = session.penalize(flowFile1);
session.transfer(flowFile1, getFailureRelationship());
} catch (final IOException e) {
// Catch GSSExceptions and reset the resources
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional);
try {
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
} catch (IOException resetResourcesException) {
getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException);
}

flowFile = session.importFrom(stream, finalFlowFile);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);

stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, getFailureRelationship());
} catch (final IOException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, getCommsFailureRelationship());
} finally {
IOUtils.closeQuietly(stream);
session.rollback();
context.yield();
} else {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[]{qualifiedPath, flowFile1, e});
flowFile1 = session.penalize(flowFile1);
session.transfer(flowFile1, getCommsFailureRelationship());
}

return null;
} finally {
IOUtils.closeQuietly(stream);
}

return null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -59,6 +60,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -294,12 +296,22 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
if (logEmptyListing.getAndDecrement() > 0) {
getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listedFiles.size(), newItems});
millis, listedFiles.size(), newItems);
}
}
} catch (IOException e) {
// Catch GSSExceptions and reset the resources
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional);
try {
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
} catch (IOException ioe) {
getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.");
}
}
context.yield();
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
getLogger().warn("Error while retrieving list of files due to {}", e.getMessage(), e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -388,18 +400,29 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);

if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.delete(file, false))) {
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
new Object[]{file});
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", file);
session.remove(flowFile);
continue;
}

session.getProvenanceReporter().receive(flowFile, file.toString());
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, file, millis, dataRate});
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", flowFile, file, millis, dataRate);
} catch (final IOException e) {
// Catch GSSExceptions and reset the resources
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().error("Error authenticating when performing file operation, resetting HDFS resources", causeOptional);
try {
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
} catch (IOException resetResourcesException) {
getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.", resetResourcesException);
}
}
session.rollback();
context.yield();
} catch (final Throwable t) {
getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
getLogger().error("Error retrieving file {} from HDFS due to {}", file, t);
session.rollback();
context.yield();
} finally {
Expand Down
Loading

0 comments on commit d4b6b5d

Please sign in to comment.