Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,31 @@ public void execute() throws Exception {
LOG.warn("Problem in MergedStream distcp PULL..skipping commit for this run");
skipCommit = true;
}
Map<String, Set<Path>> committedPaths = null;
Map<String, Set<Path>> tobeCommittedPaths = null;

// if success
if (!skipCommit) {
Map<String, List<Path>> categoriesToCommit = prepareForCommit(tmpOut);
tobeCommittedPaths = new HashMap<String, Set<Path>>();
synchronized (getDestCluster()) {
long commitTime = getDestCluster().getCommitTime();
addPublishMissingPaths(missingDirsCommittedPaths, commitTime,
categoriesToCommit.keySet());

// category, Set of Paths to commit
committedPaths = doLocalCommit(commitTime, categoriesToCommit);

Map<Path, Path> commitPaths = createLocalCommitPaths(tmpOut, commitTime,
categoriesToCommit, tobeCommittedPaths);
for (Map.Entry<String, Set<Path>> entry : missingDirsCommittedPaths
.entrySet()) {
Set<Path> filesList = committedPaths.get(entry.getKey());
Set<Path> filesList = tobeCommittedPaths.get(entry.getKey());
if (filesList != null)
filesList.addAll(entry.getValue());
else
committedPaths.put(entry.getKey(), entry.getValue());
else
tobeCommittedPaths.put(entry.getKey(), entry.getValue());
}
// Prepare paths for MirrorStreamConsumerService
commitMirroredConsumerPaths(tobeCommittedPaths, tmp);
// category, Set of Paths to commit
doLocalCommit(commitPaths);
}
// Prepare paths for MirrorStreamConsumerService
commitMirroredConsumerPaths(committedPaths, tmp);
// Cleanup happens in parallel without sync
// no race is there in consumePaths, tmpOut
doFinalCommit(consumePaths);
Expand All @@ -141,34 +142,34 @@ private void addPublishMissingPaths(
Map<String, Set<Path>> missingDirsCommittedPaths, long commitTime,
Set<String> categoriesToCommit)
throws Exception {
Map<String, Set<Path>> missingDirsforCategory = null;

if(categoriesToCommit!=null) {
missingDirsforCategory = new HashMap<String, Set<Path>>();
for (String category : categoriesToCommit) {
Set<Path> missingDirectories = publishMissingPaths(getDestFs(),
getDestCluster().getFinalDestDirRoot(), commitTime, category);
missingDirsforCategory.put(category, missingDirectories);
}
} else {
missingDirsforCategory = publishMissingPaths(
getDestFs(), getDestCluster().getFinalDestDirRoot());
}
Map<String, Set<Path>> missingDirsforCategory = null;

if (missingDirsforCategory != null) {
for (Map.Entry<String, Set<Path>> entry : missingDirsforCategory
.entrySet()) {
LOG.debug("Add Missing Directories to Commit Path: "
+ entry.getValue().size());
if (missingDirsCommittedPaths.get(entry.getKey()) != null) {
Set<Path> missingPaths = missingDirsCommittedPaths.get(entry
.getKey());
missingPaths.addAll(entry.getValue());
} else {
missingDirsCommittedPaths.put(entry.getKey(), entry.getValue());
}
}
}
if(categoriesToCommit!=null) {
missingDirsforCategory = new HashMap<String, Set<Path>>();
for (String category : categoriesToCommit) {
Set<Path> missingDirectories = publishMissingPaths(getDestFs(),
getDestCluster().getFinalDestDirRoot(), commitTime, category);
missingDirsforCategory.put(category, missingDirectories);
}
} else {
missingDirsforCategory = publishMissingPaths(
getDestFs(), getDestCluster().getFinalDestDirRoot());
}

if (missingDirsforCategory != null) {
for (Map.Entry<String, Set<Path>> entry : missingDirsforCategory
.entrySet()) {
LOG.debug("Add Missing Directories to Commit Path: "
+ entry.getValue().size());
if (missingDirsCommittedPaths.get(entry.getKey()) != null) {
Set<Path> missingPaths = missingDirsCommittedPaths.get(entry
.getKey());
missingPaths.addAll(entry.getValue());
} else {
missingDirsCommittedPaths.put(entry.getKey(), entry.getValue());
}
}
}
}

/*
Expand Down Expand Up @@ -230,28 +231,7 @@ private void commitMirroredConsumerPaths(
consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath);
} // for each consumer
} // for each stream

if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) {
LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit");
missingDirsCommittedPaths.clear();
return;
}
// Do the final mirrorCommit
LOG.info("Committing [" + consumerCommitPaths.size() + "] paths for " +
"mirrored Stream");
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());
for (Map.Entry<Path, Path> entry : consumerCommitPaths.entrySet()) {
LOG.info("Renaming [" + entry.getKey() + "] to [" + entry.getValue() +
"]");
fs.mkdirs(entry.getValue().getParent());
if (fs.rename(entry.getKey(), entry.getValue()) == false) {
LOG.warn("Failed to Commit for Mirrored Path. Aborting Transaction " +
"to avoid DATA LOSS, " +
"Partial data replay can happen for merged and mirror stream");
throw new Exception("Rename failed from ["+ entry.getKey() +"] to ["
+ entry.getValue() +"]");
}
}
doLocalCommit(consumerCommitPaths);
missingDirsCommittedPaths.clear();
}

Expand All @@ -268,7 +248,7 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
if (!getDestFs().exists(intermediatePath))
getDestFs().mkdirs(intermediatePath);
Path source = allFiles[i].getPath().makeQualified(getDestFs());

Path intermediateFilePath = new Path(intermediatePath.makeQualified(
getDestFs()).toString()
+ File.separator + fileName);
Expand Down Expand Up @@ -297,44 +277,53 @@ private Map<String, List<Path>> prepareForCommit(Path tmpOut)
}

/*
* @returns Map<String, Set<Path>> - Map of StreamName, Set of paths committed
* @returns Map<Path, Path> - Map of filePath, destinationPath committed
* for stream
*/
private Map<String, Set<Path>> doLocalCommit(long commitTime,
Map<String, List<Path>> categoriesToCommit) throws Exception {
Map<String, Set<Path>> comittedPaths = new HashMap<String, Set<Path>>();
Set<Map.Entry<String, List<Path>>> commitEntries = categoriesToCommit
.entrySet();
Iterator it = commitEntries.iterator();
while (it.hasNext()) {
Map.Entry<String, List<Path>> entry = (Map.Entry<String, List<Path>>) it
.next();
String category = entry.getKey();
List<Path> filesInCategory = entry.getValue();
for (Path filePath : filesInCategory) {
Path destParentPath = new Path(getDestCluster().getFinalDestDir(
category, commitTime));
if (!getDestFs().exists(destParentPath)) {
getDestFs().mkdirs(destParentPath);
}
LOG.debug("Moving from intermediatePath [" + filePath + "] to ["
+ destParentPath + "]");
if (getDestFs().rename(filePath, destParentPath) == false) {
LOG.warn("Rename failed, aborting transaction COMMIT to avoid " +
"dataloss. Partial data replay could happen in next run");
throw new Exception("Abort transaction Commit. Rename failed from ["
+ filePath + "] to [" + destParentPath + "]");
}
Path commitPath = new Path(destParentPath, filePath.getName());
Set<Path> commitPaths = comittedPaths.get(category);
if (commitPaths == null) {
commitPaths = new HashSet<Path>();
}
commitPaths.add(commitPath);
comittedPaths.put(category, commitPaths);
}
}
return comittedPaths;
public Map<Path, Path> createLocalCommitPaths(Path tmpOut, long commitTime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation stating what each Path means..
i was wondering may be we should have SourcePath and DestinationPath classes abtracting out this..but we can do that later...just some docum around this would be helpful...

Map<String, List<Path>> categoriesToCommit, Map<String, Set<Path>>
tobeCommittedPaths) throws Exception {
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());

// find final destination paths
Map<Path, Path> mvPaths = new LinkedHashMap<Path, Path>();
Set<Map.Entry<String, List<Path>>> commitEntries = categoriesToCommit
.entrySet();
Iterator it = commitEntries.iterator();
while (it.hasNext()) {
Map.Entry<String, List<Path>> entry = (Map.Entry<String, List<Path>>) it
.next();
String category = entry.getKey();
List<Path> filesInCategory = entry.getValue();
for (Path filePath : filesInCategory) {
Path destParentPath = new Path(getDestCluster().getFinalDestDir(
category, commitTime));
Path commitPath = new Path(destParentPath, filePath.getName());
mvPaths.put(filePath, commitPath);
Set<Path> commitPaths = tobeCommittedPaths.get(category);
if (commitPaths == null) {
commitPaths = new HashSet<Path>();
}
commitPaths.add(commitPath);
tobeCommittedPaths.put(category, commitPaths);
}
}
return mvPaths;
}

private void doLocalCommit(Map<Path, Path> commitPaths) throws Exception {
LOG.info("Committing " + commitPaths.size() + " paths.");
FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf());
for (Map.Entry<Path, Path> entry : commitPaths.entrySet()) {
LOG.info("Renaming " + entry.getKey() + " to " + entry.getValue());
fs.mkdirs(entry.getValue().getParent());
if (fs.rename(entry.getKey(), entry.getValue()) == false) {
LOG.warn("Rename failed, aborting transaction COMMIT to avoid "
+ "dataloss. Partial data replay could happen in next run");
throw new Exception("Abort transaction Commit. Rename failed from ["
+ entry.getKey() + "] to [" + entry.getValue() + "]");
}
}
}

protected Path getInputPath() throws IOException {
Expand Down