Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,31 @@ public void execute() throws Exception {
LOG.warn("Problem in MergedStream distcp PULL..skipping commit for this run");
skipCommit = true;
}
Map<String, Set<Path>> committedPaths = null;
Map<String, Set<Path>> tobeCommittedPaths = null;

// if success
if (!skipCommit) {
Map<String, List<Path>> categoriesToCommit = prepareForCommit(tmpOut);
tobeCommittedPaths = new HashMap<String, Set<Path>>();
synchronized (getDestCluster()) {
long commitTime = getDestCluster().getCommitTime();
addPublishMissingPaths(missingDirsCommittedPaths, commitTime,
categoriesToCommit.keySet());

// category, Set of Paths to commit
committedPaths = doLocalCommit(commitTime, categoriesToCommit);

Map<Path, Path> commitPaths = createLocalCommitPaths(tmpOut, commitTime,
categoriesToCommit, tobeCommittedPaths);
for (Map.Entry<String, Set<Path>> entry : missingDirsCommittedPaths
.entrySet()) {
Set<Path> filesList = committedPaths.get(entry.getKey());
Set<Path> filesList = tobeCommittedPaths.get(entry.getKey());
if (filesList != null)
filesList.addAll(entry.getValue());
else
committedPaths.put(entry.getKey(), entry.getValue());
else
tobeCommittedPaths.put(entry.getKey(), entry.getValue());
}
// Prepare paths for MirrorStreamConsumerService
commitMirroredConsumerPaths(tobeCommittedPaths, tmp);
// category, Set of Paths to commit
doLocalCommit(commitPaths);
}
// Prepare paths for MirrorStreamConsumerService
commitMirroredConsumerPaths(committedPaths, tmp);
// Cleanup happens in parallel without sync
// no race is there in consumePaths, tmpOut
doFinalCommit(consumePaths);
Expand All @@ -141,41 +142,41 @@ private void addPublishMissingPaths(
Map<String, Set<Path>> missingDirsCommittedPaths, long commitTime,
Set<String> categoriesToCommit)
throws Exception {
Map<String, Set<Path>> missingDirsforCategory = null;

if(categoriesToCommit!=null) {
missingDirsforCategory = new HashMap<String, Set<Path>>();
for (String category : categoriesToCommit) {
Set<Path> missingDirectories = publishMissingPaths(getDestFs(),
getDestCluster().getFinalDestDirRoot(), commitTime, category);
missingDirsforCategory.put(category, missingDirectories);
}
} else {
missingDirsforCategory = publishMissingPaths(
getDestFs(), getDestCluster().getFinalDestDirRoot());
}
Map<String, Set<Path>> missingDirsforCategory = null;

if (missingDirsforCategory != null) {
for (Map.Entry<String, Set<Path>> entry : missingDirsforCategory
.entrySet()) {
LOG.debug("Add Missing Directories to Commit Path: "
+ entry.getValue().size());
if (missingDirsCommittedPaths.get(entry.getKey()) != null) {
Set<Path> missingPaths = missingDirsCommittedPaths.get(entry
.getKey());
missingPaths.addAll(entry.getValue());
} else {
missingDirsCommittedPaths.put(entry.getKey(), entry.getValue());
}
}
}
if(categoriesToCommit!=null) {
missingDirsforCategory = new HashMap<String, Set<Path>>();
for (String category : categoriesToCommit) {
Set<Path> missingDirectories = publishMissingPaths(getDestFs(),
getDestCluster().getFinalDestDirRoot(), commitTime, category);
missingDirsforCategory.put(category, missingDirectories);
}
} else {
missingDirsforCategory = publishMissingPaths(
getDestFs(), getDestCluster().getFinalDestDirRoot());
}

if (missingDirsforCategory != null) {
for (Map.Entry<String, Set<Path>> entry : missingDirsforCategory
.entrySet()) {
LOG.debug("Add Missing Directories to Commit Path: "
+ entry.getValue().size());
if (missingDirsCommittedPaths.get(entry.getKey()) != null) {
Set<Path> missingPaths = missingDirsCommittedPaths.get(entry
.getKey());
missingPaths.addAll(entry.getValue());
} else {
missingDirsCommittedPaths.put(entry.getKey(), entry.getValue());
}
}
}
}

/*
* @param Map<String, Set<Path>> commitedPaths - Stream Name, It's committed
* Path.
*/
private void commitMirroredConsumerPaths(
private Map<Path, Path> commitMirroredConsumerPaths(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not need to return any path.

Map<String, Set<Path>> committedPaths, Path tmp) throws Exception {
// Map of Stream and clusters where it's mirrored
Map<String, Set<Cluster>> mirrorStreamConsumers = new HashMap<String, Set<Cluster>>();
Expand Down Expand Up @@ -230,29 +231,9 @@ private void commitMirroredConsumerPaths(
consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath);
} // for each consumer
} // for each stream

if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) {
LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit");
missingDirsCommittedPaths.clear();
return;
}
// Do the final mirrorCommit
LOG.info("Committing [" + consumerCommitPaths.size() + "] paths for " +
"mirrored Stream");
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());
for (Map.Entry<Path, Path> entry : consumerCommitPaths.entrySet()) {
LOG.info("Renaming [" + entry.getKey() + "] to [" + entry.getValue() +
"]");
fs.mkdirs(entry.getValue().getParent());
if (fs.rename(entry.getKey(), entry.getValue()) == false) {
LOG.warn("Failed to Commit for Mirrored Path. Aborting Transaction " +
"to avoid DATA LOSS, " +
"Partial data replay can happen for merged and mirror stream");
throw new Exception("Rename failed from ["+ entry.getKey() +"] to ["
+ entry.getValue() +"]");
}
}
doLocalCommit(consumerCommitPaths);
missingDirsCommittedPaths.clear();
return consumerCommitPaths;
}

private Map<String, List<Path>> prepareForCommit(Path tmpOut)
Expand All @@ -268,7 +249,7 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
if (!getDestFs().exists(intermediatePath))
getDestFs().mkdirs(intermediatePath);
Path source = allFiles[i].getPath().makeQualified(getDestFs());

Path intermediateFilePath = new Path(intermediatePath.makeQualified(
getDestFs()).toString()
+ File.separator + fileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clear should be present irrespective of whether consumerCommitPaths is empty or not.

Expand All @@ -295,46 +276,55 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
}
return categoriesToCommit;
}

public Map<Path, Path> createLocalCommitPaths(Path tmpOut, long commitTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation stating what each Path means..
i was wondering may be we should have SourcePath and DestinationPath classes abtracting out this..but we can do that later...just some docum around this would be helpful...

Map<String, List<Path>> categoriesToCommit, Map<String, Set<Path>>
tobeCommittedPaths) throws Exception {
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());

// find final destination paths
Map<Path, Path> mvPaths = new LinkedHashMap<Path, Path>();
Set<Map.Entry<String, List<Path>>> commitEntries = categoriesToCommit
.entrySet();
Iterator it = commitEntries.iterator();
while (it.hasNext()) {
Map.Entry<String, List<Path>> entry = (Map.Entry<String, List<Path>>) it
.next();
String category = entry.getKey();
List<Path> filesInCategory = entry.getValue();
for (Path filePath : filesInCategory) {
Path destParentPath = new Path(getDestCluster().getFinalDestDir(
category, commitTime));
Path commitPath = new Path(destParentPath, filePath.getName());
mvPaths.put(filePath, commitPath);
Set<Path> commitPaths = tobeCommittedPaths.get(category);
if (commitPaths == null) {
commitPaths = new HashSet<Path>();
}
commitPaths.add(commitPath);
tobeCommittedPaths.put(category, commitPaths);
}
}
return mvPaths;
}

/*
* @returns Map<String, Set<Path>> - Map of StreamName, Set of paths committed
* for stream
*/
private Map<String, Set<Path>> doLocalCommit(long commitTime,
Map<String, List<Path>> categoriesToCommit) throws Exception {
Map<String, Set<Path>> comittedPaths = new HashMap<String, Set<Path>>();
Set<Map.Entry<String, List<Path>>> commitEntries = categoriesToCommit
.entrySet();
Iterator it = commitEntries.iterator();
while (it.hasNext()) {
Map.Entry<String, List<Path>> entry = (Map.Entry<String, List<Path>>) it
.next();
String category = entry.getKey();
List<Path> filesInCategory = entry.getValue();
for (Path filePath : filesInCategory) {
Path destParentPath = new Path(getDestCluster().getFinalDestDir(
category, commitTime));
if (!getDestFs().exists(destParentPath)) {
getDestFs().mkdirs(destParentPath);
}
LOG.debug("Moving from intermediatePath [" + filePath + "] to ["
+ destParentPath + "]");
if (getDestFs().rename(filePath, destParentPath) == false) {
LOG.warn("Rename failed, aborting transaction COMMIT to avoid " +
"dataloss. Partial data replay could happen in next run");
throw new Exception("Abort transaction Commit. Rename failed from ["
+ filePath + "] to [" + destParentPath + "]");
}
Path commitPath = new Path(destParentPath, filePath.getName());
Set<Path> commitPaths = comittedPaths.get(category);
if (commitPaths == null) {
commitPaths = new HashSet<Path>();
}
commitPaths.add(commitPath);
comittedPaths.put(category, commitPaths);
}
}
return comittedPaths;
private void doLocalCommit(Map<Path, Path> commitPaths) throws Exception {
LOG.info("Committing " + commitPaths.size() + " paths.");
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());
for (Map.Entry<Path, Path> entry : commitPaths.entrySet()) {
LOG.info("Renaming " + entry.getKey() + " to " + entry.getValue());
fs.mkdirs(entry.getValue().getParent());
if (fs.rename(entry.getKey(), entry.getValue()) == false) {
LOG.warn("Rename failed, aborting transaction COMMIT to avoid "
+ "dataloss. Partial data replay could happen in next run");
throw new Exception("Abort transaction Commit. Rename failed from ["
+ entry.getKey() + "] to [" + entry.getValue() + "]");
}
}
}

protected Path getInputPath() throws IOException {
Expand Down