Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ public void execute() throws Exception {
// if success
if (!skipCommit) {
Map<String, List<Path>> categoriesToCommit = prepareForCommit(tmpOut);
committedPaths = new HashMap<String, Set<Path>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we rename it to toBeCommittedPaths ?

synchronized (getDestCluster()) {
long commitTime = getDestCluster().getCommitTime();
addPublishMissingPaths(missingDirsCommittedPaths, commitTime,
categoriesToCommit.keySet());

// category, Set of Paths to commit
committedPaths = doLocalCommit(commitTime, categoriesToCommit);

Map<Path, Path> commitPaths = findCommitedPaths(tmpOut, commitTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a proper name something like createLocalCommitPaths() might be more meaningful here

categoriesToCommit, committedPaths);
for (Map.Entry<String, Set<Path>> entry : missingDirsCommittedPaths
.entrySet()) {
Set<Path> filesList = committedPaths.get(entry.getKey());
Expand All @@ -121,9 +120,13 @@ public void execute() throws Exception {
else
committedPaths.put(entry.getKey(), entry.getValue());
}
// Prepare paths for MirrorStreamConsumerService
Map<Path, Path> consumepaths = commitMirroredConsumerPaths(committedPaths, tmp);
commitPaths.putAll(consumepaths);
// category, Set of Paths to commit
doLocalCommit(commitPaths);
}
// Prepare paths for MirrorStreamConsumerService
commitMirroredConsumerPaths(committedPaths, tmp);

// Cleanup happens in parallel without sync
// no race is there in consumePaths, tmpOut
doFinalCommit(consumePaths);
Expand All @@ -141,41 +144,41 @@ private void addPublishMissingPaths(
Map<String, Set<Path>> missingDirsCommittedPaths, long commitTime,
Set<String> categoriesToCommit)
throws Exception {
Map<String, Set<Path>> missingDirsforCategory = null;

if(categoriesToCommit!=null) {
missingDirsforCategory = new HashMap<String, Set<Path>>();
for (String category : categoriesToCommit) {
Set<Path> missingDirectories = publishMissingPaths(getDestFs(),
getDestCluster().getFinalDestDirRoot(), commitTime, category);
missingDirsforCategory.put(category, missingDirectories);
}
} else {
missingDirsforCategory = publishMissingPaths(
getDestFs(), getDestCluster().getFinalDestDirRoot());
}
Map<String, Set<Path>> missingDirsforCategory = null;

if (missingDirsforCategory != null) {
for (Map.Entry<String, Set<Path>> entry : missingDirsforCategory
.entrySet()) {
LOG.debug("Add Missing Directories to Commit Path: "
+ entry.getValue().size());
if (missingDirsCommittedPaths.get(entry.getKey()) != null) {
Set<Path> missingPaths = missingDirsCommittedPaths.get(entry
.getKey());
missingPaths.addAll(entry.getValue());
} else {
missingDirsCommittedPaths.put(entry.getKey(), entry.getValue());
}
}
}
if(categoriesToCommit!=null) {
missingDirsforCategory = new HashMap<String, Set<Path>>();
for (String category : categoriesToCommit) {
Set<Path> missingDirectories = publishMissingPaths(getDestFs(),
getDestCluster().getFinalDestDirRoot(), commitTime, category);
missingDirsforCategory.put(category, missingDirectories);
}
} else {
missingDirsforCategory = publishMissingPaths(
getDestFs(), getDestCluster().getFinalDestDirRoot());
}

if (missingDirsforCategory != null) {
for (Map.Entry<String, Set<Path>> entry : missingDirsforCategory
.entrySet()) {
LOG.debug("Add Missing Directories to Commit Path: "
+ entry.getValue().size());
if (missingDirsCommittedPaths.get(entry.getKey()) != null) {
Set<Path> missingPaths = missingDirsCommittedPaths.get(entry
.getKey());
missingPaths.addAll(entry.getValue());
} else {
missingDirsCommittedPaths.put(entry.getKey(), entry.getValue());
}
}
}
}

/*
* @param Map<String, Set<Path>> commitedPaths - Stream Name, It's committed
* Path.
*/
private void commitMirroredConsumerPaths(
private Map<Path, Path> commitMirroredConsumerPaths(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not need to return any path.

Map<String, Set<Path>> committedPaths, Path tmp) throws Exception {
// Map of Stream and clusters where it's mirrored
Map<String, Set<Cluster>> mirrorStreamConsumers = new HashMap<String, Set<Cluster>>();
Expand Down Expand Up @@ -230,29 +233,8 @@ private void commitMirroredConsumerPaths(
consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath);
} // for each consumer
} // for each stream

if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) {
LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit");
missingDirsCommittedPaths.clear();
return;
}
// Do the final mirrorCommit
LOG.info("Committing [" + consumerCommitPaths.size() + "] paths for " +
"mirrored Stream");
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());
for (Map.Entry<Path, Path> entry : consumerCommitPaths.entrySet()) {
LOG.info("Renaming [" + entry.getKey() + "] to [" + entry.getValue() +
"]");
fs.mkdirs(entry.getValue().getParent());
if (fs.rename(entry.getKey(), entry.getValue()) == false) {
LOG.warn("Failed to Commit for Mirrored Path. Aborting Transaction " +
"to avoid DATA LOSS, " +
"Partial data replay can happen for merged and mirror stream");
throw new Exception("Rename failed from ["+ entry.getKey() +"] to ["
+ entry.getValue() +"]");
}
}
missingDirsCommittedPaths.clear();

return consumerCommitPaths;
}

private Map<String, List<Path>> prepareForCommit(Path tmpOut)
Expand All @@ -261,14 +243,15 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
FileStatus[] allFiles = getDestFs().listStatus(tmpOut);
for (int i = 0; i < allFiles.length; i++) {
String fileName = allFiles[i].getPath().getName();
LOG.info("added filename in merged " + fileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the debug test log

if (fileName != null) {
String category = getCategoryFromFileName(fileName);
if (category != null) {
Path intermediatePath = new Path(tmpOut, category);
if (!getDestFs().exists(intermediatePath))
getDestFs().mkdirs(intermediatePath);
Path source = allFiles[i].getPath().makeQualified(getDestFs());

Path intermediateFilePath = new Path(intermediatePath.makeQualified(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clear should be present irrespective of whether consumerCommitPaths is empty or not.

getDestFs()).toString()
+ File.separator + fileName);
Expand All @@ -293,48 +276,64 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
}
}
}

return categoriesToCommit;
}

public Map<Path, Path> findCommitedPaths(Path tmpOut, long commitTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameter name being used for tobeCommittedPaths and argument name doesn't match here...please fix to ensure code readability

Map<String, List<Path>> categoriesToCommit, Map<String, Set<Path>>
comittedPaths) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again this argument can be tobeCommittedPaths

FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());

// find final destination paths
Map<Path, Path> mvPaths = new LinkedHashMap<Path, Path>();

// Map<String, Set<Path>> comittedPaths = new HashMap<String, Set<Path>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

Set<Map.Entry<String, List<Path>>> commitEntries = categoriesToCommit
.entrySet();
Iterator it = commitEntries.iterator();
while (it.hasNext()) {
Map.Entry<String, List<Path>> entry = (Map.Entry<String, List<Path>>) it
.next();
String category = entry.getKey();
List<Path> filesInCategory = entry.getValue();
for (Path filePath : filesInCategory) {
Path destParentPath = new Path(getDestCluster().getFinalDestDir(
category, commitTime));

LOG.debug("Moving from intermediatePath [" + filePath + "] to ["
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this log from here

+ destParentPath + "]");

Path commitPath = new Path(destParentPath, filePath.getName());
mvPaths.put(filePath, commitPath);
Set<Path> commitPaths = comittedPaths.get(category);
if (commitPaths == null) {
commitPaths = new HashSet<Path>();
}
commitPaths.add(commitPath);
comittedPaths.put(category, commitPaths);
}
}
return mvPaths;
}

/*
* @returns Map<String, Set<Path>> - Map of StreamName, Set of paths committed
* for stream
*/
private Map<String, Set<Path>> doLocalCommit(long commitTime,
Map<String, List<Path>> categoriesToCommit) throws Exception {
Map<String, Set<Path>> comittedPaths = new HashMap<String, Set<Path>>();
Set<Map.Entry<String, List<Path>>> commitEntries = categoriesToCommit
.entrySet();
Iterator it = commitEntries.iterator();
while (it.hasNext()) {
Map.Entry<String, List<Path>> entry = (Map.Entry<String, List<Path>>) it
.next();
String category = entry.getKey();
List<Path> filesInCategory = entry.getValue();
for (Path filePath : filesInCategory) {
Path destParentPath = new Path(getDestCluster().getFinalDestDir(
category, commitTime));
if (!getDestFs().exists(destParentPath)) {
getDestFs().mkdirs(destParentPath);
}
LOG.debug("Moving from intermediatePath [" + filePath + "] to ["
+ destParentPath + "]");
if (getDestFs().rename(filePath, destParentPath) == false) {
LOG.warn("Rename failed, aborting transaction COMMIT to avoid " +
"dataloss. Partial data replay could happen in next run");
throw new Exception("Abort transaction Commit. Rename failed from ["
+ filePath + "] to [" + destParentPath + "]");
}
Path commitPath = new Path(destParentPath, filePath.getName());
Set<Path> commitPaths = comittedPaths.get(category);
if (commitPaths == null) {
commitPaths = new HashSet<Path>();
}
commitPaths.add(commitPath);
comittedPaths.put(category, commitPaths);
}
}
return comittedPaths;
private void doLocalCommit(Map<Path, Path> commitPaths) throws Exception {
LOG.info("Committing " + commitPaths.size() + " paths.");
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());
for (Map.Entry<Path, Path> entry : commitPaths.entrySet()) {
LOG.info("Renaming " + entry.getKey() + " to " + entry.getValue());
fs.mkdirs(entry.getValue().getParent());
if (fs.rename(entry.getKey(), entry.getValue()) == false) {
LOG.warn("Rename failed, aborting transaction COMMIT to avoid "
+ "dataloss. Partial data replay could happen in next run");
throw new Exception("Abort transaction Commit. Rename failed from ["
+ entry.getKey() + "] to [" + entry.getValue() + "]");
}
}
}

protected Path getInputPath() throws IOException {
Expand Down