diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index ac49b93..1d1b828 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -100,30 +100,32 @@ public void execute() throws Exception { LOG.warn("Problem in MergedStream distcp PULL..skipping commit for this run"); skipCommit = true; } - Map> committedPaths = null; - + Map> tobeCommittedPaths = null; + Map commitPaths = new HashMap(); // if success if (!skipCommit) { Map> categoriesToCommit = prepareForCommit(tmpOut); + tobeCommittedPaths = new HashMap>(); synchronized (getDestCluster()) { long commitTime = getDestCluster().getCommitTime(); addPublishMissingPaths(missingDirsCommittedPaths, commitTime, categoriesToCommit.keySet()); - - // category, Set of Paths to commit - committedPaths = doLocalCommit(commitTime, categoriesToCommit); - + commitPaths = createLocalCommitPaths(tmpOut, commitTime, + categoriesToCommit, tobeCommittedPaths); for (Map.Entry> entry : missingDirsCommittedPaths .entrySet()) { - Set filesList = committedPaths.get(entry.getKey()); + Set filesList = tobeCommittedPaths.get(entry.getKey()); if (filesList != null) filesList.addAll(entry.getValue()); - else - committedPaths.put(entry.getKey(), entry.getValue()); + else + tobeCommittedPaths.put(entry.getKey(), entry.getValue()); } } // Prepare paths for MirrorStreamConsumerService - commitMirroredConsumerPaths(committedPaths, tmp); + commitMirroredConsumerPaths(tobeCommittedPaths, tmp); + // category, Set of Paths to commit + doLocalCommit(commitPaths); + // Cleanup happens in parallel without sync // no race is there in consumePaths, tmpOut doFinalCommit(consumePaths); @@ -141,39 +143,53 @@ private void addPublishMissingPaths( Map> missingDirsCommittedPaths, long commitTime, Set categoriesToCommit) throws Exception { - Map> missingDirsforCategory = null; - - if(categoriesToCommit!=null) { - missingDirsforCategory = new HashMap>(); - for (String category : categoriesToCommit) { - Set missingDirectories = publishMissingPaths(getDestFs(), - getDestCluster().getFinalDestDirRoot(), commitTime, category); - missingDirsforCategory.put(category, missingDirectories); - } - } else { - missingDirsforCategory = publishMissingPaths( - getDestFs(), getDestCluster().getFinalDestDirRoot()); - } + Map> missingDirsforCategory = null; - if (missingDirsforCategory != null) { - for (Map.Entry> entry : missingDirsforCategory - .entrySet()) { - LOG.debug("Add Missing Directories to Commit Path: " - + entry.getValue().size()); - if (missingDirsCommittedPaths.get(entry.getKey()) != null) { - Set missingPaths = missingDirsCommittedPaths.get(entry - .getKey()); - missingPaths.addAll(entry.getValue()); - } else { - missingDirsCommittedPaths.put(entry.getKey(), entry.getValue()); - } - } - } + if(categoriesToCommit!=null) { + missingDirsforCategory = new HashMap>(); + for (String category : categoriesToCommit) { + Set missingDirectories = publishMissingPaths(getDestFs(), + getDestCluster().getFinalDestDirRoot(), commitTime, category); + missingDirsforCategory.put(category, missingDirectories); + } + } else { + missingDirsforCategory = publishMissingPaths( + getDestFs(), getDestCluster().getFinalDestDirRoot()); + } + + if (missingDirsforCategory != null) { + for (Map.Entry> entry : missingDirsforCategory + .entrySet()) { + LOG.debug("Add Missing Directories to Commit Path: " + + entry.getValue().size()); + if (missingDirsCommittedPaths.get(entry.getKey()) != null) { + Set missingPaths = missingDirsCommittedPaths.get(entry + .getKey()); + missingPaths.addAll(entry.getValue()); + } else { + missingDirsCommittedPaths.put(entry.getKey(), entry.getValue()); + } + } + } } /* * @param Map> commitedPaths - Stream Name, It's committed * Path. + * tmpConsumePath: hdfsUrl/rootDir/system/tmp/ + * distcp_mergedStream_databusdev1_databusdev2/tmp/src_sourceCluster_via_ + * destinationCluster_mirrorto_consumername_streamname + * final Mirror Path: hdfsUrl/rootDir/system/mirrors/ + * databusdev1/src_sourceCluster_via_destinationCluster_mirrorto_consumerName_ + * benchmark_merge_filestatus + * Example paths: + * tmpConsumePath:hdfs://databusdev2.mkhoj.com:9000/databus/system/tmp/ + * distcp_mergedStream_databusdev1_databusdev2/tmp/ + * src_databusdev1_via_databusdev2_mirrorto_databusdev1_benchmark_merge + * + * finalMirrorPath: hdfs://databusdev2.mkhoj.com:9000/databus/system/mirrors/ + * databusdev1/src_databusdev1_via_databusdev2_mirrorto_databusdev1_ + * benchmark_merge_1352687040907 */ private void commitMirroredConsumerPaths( Map> committedPaths, Path tmp) throws Exception { @@ -230,28 +246,7 @@ private void commitMirroredConsumerPaths( consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath); } // for each consumer } // for each stream - - if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) { - LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit"); - missingDirsCommittedPaths.clear(); - return; - } - // Do the final mirrorCommit - LOG.info("Committing [" + consumerCommitPaths.size() + "] paths for " + - "mirrored Stream"); - FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); - for (Map.Entry entry : consumerCommitPaths.entrySet()) { - LOG.info("Renaming [" + entry.getKey() + "] to [" + entry.getValue() + - "]"); - fs.mkdirs(entry.getValue().getParent()); - if (fs.rename(entry.getKey(), entry.getValue()) == false) { - LOG.warn("Failed to Commit for Mirrored Path. Aborting Transaction " + - "to avoid DATA LOSS, " + - "Partial data replay can happen for merged and mirror stream"); - throw new Exception("Rename failed from ["+ entry.getKey() +"] to [" - + entry.getValue() +"]"); - } - } + doLocalCommit(consumerCommitPaths); missingDirsCommittedPaths.clear(); } @@ -268,7 +263,7 @@ private Map> prepareForCommit(Path tmpOut) if (!getDestFs().exists(intermediatePath)) getDestFs().mkdirs(intermediatePath); Path source = allFiles[i].getPath().makeQualified(getDestFs()); - + Path intermediateFilePath = new Path(intermediatePath.makeQualified( getDestFs()).toString() + File.separator + fileName); @@ -297,44 +292,57 @@ private Map> prepareForCommit(Path tmpOut) } /* - * @returns Map> - Map of StreamName, Set of paths committed + * @returns Map - Map of filePath, destinationPath committed * for stream + * destinationPath : hdfsUrl/rootdir/streams/category/YYYY/MM/HH/MN/filename + * Example path: + * hdfs://databusdev2.mkhoj.com:9000/databus/streams/test-topic/ + * 2012/10/00/00/filename */ - private Map> doLocalCommit(long commitTime, - Map> categoriesToCommit) throws Exception { - Map> comittedPaths = new HashMap>(); - Set>> commitEntries = categoriesToCommit - .entrySet(); - Iterator it = commitEntries.iterator(); - while (it.hasNext()) { - Map.Entry> entry = (Map.Entry>) it - .next(); - String category = entry.getKey(); - List filesInCategory = entry.getValue(); - for (Path filePath : filesInCategory) { - Path destParentPath = new Path(getDestCluster().getFinalDestDir( - category, commitTime)); - if (!getDestFs().exists(destParentPath)) { - getDestFs().mkdirs(destParentPath); - } - LOG.debug("Moving from intermediatePath [" + filePath + "] to [" - + destParentPath + "]"); - if (getDestFs().rename(filePath, destParentPath) == false) { - LOG.warn("Rename failed, aborting transaction COMMIT to avoid " + - "dataloss. Partial data replay could happen in next run"); - throw new Exception("Abort transaction Commit. Rename failed from [" - + filePath + "] to [" + destParentPath + "]"); - } - Path commitPath = new Path(destParentPath, filePath.getName()); - Set commitPaths = comittedPaths.get(category); - if (commitPaths == null) { - commitPaths = new HashSet(); - } - commitPaths.add(commitPath); - comittedPaths.put(category, commitPaths); - } - } - return comittedPaths; + public Map createLocalCommitPaths(Path tmpOut, long commitTime, + Map> categoriesToCommit, Map> + tobeCommittedPaths) throws Exception { + FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); + + // find final destination paths + Map mvPaths = new LinkedHashMap(); + Set>> commitEntries = categoriesToCommit + .entrySet(); + Iterator it = commitEntries.iterator(); + while (it.hasNext()) { + Map.Entry> entry = (Map.Entry>) it + .next(); + String category = entry.getKey(); + List filesInCategory = entry.getValue(); + for (Path filePath : filesInCategory) { + Path destParentPath = new Path(getDestCluster().getFinalDestDir( + category, commitTime)); + Path commitPath = new Path(destParentPath, filePath.getName()); + mvPaths.put(filePath, commitPath); + Set commitPaths = tobeCommittedPaths.get(category); + if (commitPaths == null) { + commitPaths = new HashSet(); + } + commitPaths.add(commitPath); + tobeCommittedPaths.put(category, commitPaths); + } + } + return mvPaths; + } + + private void doLocalCommit(Map commitPaths) throws Exception { + LOG.info("Committing " + commitPaths.size() + " paths."); + FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); + for (Map.Entry entry : commitPaths.entrySet()) { + LOG.info("Renaming " + entry.getKey() + " to " + entry.getValue()); + fs.mkdirs(entry.getValue().getParent()); + if (fs.rename(entry.getKey(), entry.getValue()) == false) { + LOG.warn("Rename failed, aborting transaction COMMIT to avoid " + + "dataloss. Partial data replay could happen in next run"); + throw new Exception("Abort transaction Commit. Rename failed from [" + + entry.getKey() + "] to [" + entry.getValue() + "]"); + } + } } protected Path getInputPath() throws IOException {