From 9ec77fddecc2fade8e40549c36bda5d1ca43c4b3 Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Fri, 19 Oct 2012 10:01:12 +0530 Subject: [PATCH 1/8] changes in merged stream service --- .../databus/distcp/MergedStreamService.java | 185 +++++++++--------- 1 file changed, 92 insertions(+), 93 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index ac49b93..4962f71 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -105,14 +105,13 @@ public void execute() throws Exception { // if success if (!skipCommit) { Map> categoriesToCommit = prepareForCommit(tmpOut); + committedPaths = new HashMap>(); synchronized (getDestCluster()) { long commitTime = getDestCluster().getCommitTime(); addPublishMissingPaths(missingDirsCommittedPaths, commitTime, categoriesToCommit.keySet()); - - // category, Set of Paths to commit - committedPaths = doLocalCommit(commitTime, categoriesToCommit); - + Map commitPaths = findCommitedPaths(tmpOut, commitTime, + categoriesToCommit, committedPaths); for (Map.Entry> entry : missingDirsCommittedPaths .entrySet()) { Set filesList = committedPaths.get(entry.getKey()); @@ -121,9 +120,13 @@ public void execute() throws Exception { else committedPaths.put(entry.getKey(), entry.getValue()); } + // Prepare paths for MirrorStreamConsumerService + Map consumepaths = commitMirroredConsumerPaths(committedPaths, tmp); + commitPaths.putAll(consumepaths); + // category, Set of Paths to commit + doLocalCommit(commitPaths); } - // Prepare paths for MirrorStreamConsumerService - commitMirroredConsumerPaths(committedPaths, tmp); + // Cleanup happens in parallel without sync // no race is there in consumePaths, tmpOut doFinalCommit(consumePaths); @@ -141,41 +144,41 @@ private void addPublishMissingPaths( Map> missingDirsCommittedPaths, long commitTime, Set categoriesToCommit) throws Exception { - Map> missingDirsforCategory = null; - - if(categoriesToCommit!=null) { - missingDirsforCategory = new HashMap>(); - for (String category : categoriesToCommit) { - Set missingDirectories = publishMissingPaths(getDestFs(), - getDestCluster().getFinalDestDirRoot(), commitTime, category); - missingDirsforCategory.put(category, missingDirectories); - } - } else { - missingDirsforCategory = publishMissingPaths( - getDestFs(), getDestCluster().getFinalDestDirRoot()); - } + Map> missingDirsforCategory = null; - if (missingDirsforCategory != null) { - for (Map.Entry> entry : missingDirsforCategory - .entrySet()) { - LOG.debug("Add Missing Directories to Commit Path: " - + entry.getValue().size()); - if (missingDirsCommittedPaths.get(entry.getKey()) != null) { - Set missingPaths = missingDirsCommittedPaths.get(entry - .getKey()); - missingPaths.addAll(entry.getValue()); - } else { - missingDirsCommittedPaths.put(entry.getKey(), entry.getValue()); - } - } - } + if(categoriesToCommit!=null) { + missingDirsforCategory = new HashMap>(); + for (String category : categoriesToCommit) { + Set missingDirectories = publishMissingPaths(getDestFs(), + getDestCluster().getFinalDestDirRoot(), commitTime, category); + missingDirsforCategory.put(category, missingDirectories); + } + } else { + missingDirsforCategory = publishMissingPaths( + getDestFs(), getDestCluster().getFinalDestDirRoot()); + } + + if (missingDirsforCategory != null) { + for (Map.Entry> entry : missingDirsforCategory + .entrySet()) { + LOG.debug("Add Missing Directories to Commit Path: " + + entry.getValue().size()); + if (missingDirsCommittedPaths.get(entry.getKey()) != null) { + Set missingPaths = missingDirsCommittedPaths.get(entry + .getKey()); + missingPaths.addAll(entry.getValue()); + } else { + missingDirsCommittedPaths.put(entry.getKey(), entry.getValue()); + } + } + } } /* * @param Map> commitedPaths - Stream Name, It's committed * Path. */ - private void commitMirroredConsumerPaths( + private Map commitMirroredConsumerPaths( Map> committedPaths, Path tmp) throws Exception { // Map of Stream and clusters where it's mirrored Map> mirrorStreamConsumers = new HashMap>(); @@ -230,29 +233,8 @@ private void commitMirroredConsumerPaths( consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath); } // for each consumer } // for each stream - - if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) { - LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit"); - missingDirsCommittedPaths.clear(); - return; - } - // Do the final mirrorCommit - LOG.info("Committing [" + consumerCommitPaths.size() + "] paths for " + - "mirrored Stream"); - FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); - for (Map.Entry entry : consumerCommitPaths.entrySet()) { - LOG.info("Renaming [" + entry.getKey() + "] to [" + entry.getValue() + - "]"); - fs.mkdirs(entry.getValue().getParent()); - if (fs.rename(entry.getKey(), entry.getValue()) == false) { - LOG.warn("Failed to Commit for Mirrored Path. Aborting Transaction " + - "to avoid DATA LOSS, " + - "Partial data replay can happen for merged and mirror stream"); - throw new Exception("Rename failed from ["+ entry.getKey() +"] to [" - + entry.getValue() +"]"); - } - } - missingDirsCommittedPaths.clear(); + + return consumerCommitPaths; } private Map> prepareForCommit(Path tmpOut) @@ -261,6 +243,7 @@ private Map> prepareForCommit(Path tmpOut) FileStatus[] allFiles = getDestFs().listStatus(tmpOut); for (int i = 0; i < allFiles.length; i++) { String fileName = allFiles[i].getPath().getName(); + LOG.info("added filename in merged " + fileName); if (fileName != null) { String category = getCategoryFromFileName(fileName); if (category != null) { @@ -268,7 +251,7 @@ private Map> prepareForCommit(Path tmpOut) if (!getDestFs().exists(intermediatePath)) getDestFs().mkdirs(intermediatePath); Path source = allFiles[i].getPath().makeQualified(getDestFs()); - + Path intermediateFilePath = new Path(intermediatePath.makeQualified( getDestFs()).toString() + File.separator + fileName); @@ -293,48 +276,64 @@ private Map> prepareForCommit(Path tmpOut) } } } + return categoriesToCommit; } + + public Map findCommitedPaths(Path tmpOut, long commitTime, + Map> categoriesToCommit, Map> + comittedPaths) throws Exception { + FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); + + // find final destination paths + Map mvPaths = new LinkedHashMap(); + + // Map> comittedPaths = new HashMap>(); + Set>> commitEntries = categoriesToCommit + .entrySet(); + Iterator it = commitEntries.iterator(); + while (it.hasNext()) { + Map.Entry> entry = (Map.Entry>) it + .next(); + String category = entry.getKey(); + List filesInCategory = entry.getValue(); + for (Path filePath : filesInCategory) { + Path destParentPath = new Path(getDestCluster().getFinalDestDir( + category, commitTime)); + + LOG.debug("Moving from intermediatePath [" + filePath + "] to [" + + destParentPath + "]"); + + Path commitPath = new Path(destParentPath, filePath.getName()); + mvPaths.put(filePath, commitPath); + Set commitPaths = comittedPaths.get(category); + if (commitPaths == null) { + commitPaths = new HashSet(); + } + commitPaths.add(commitPath); + comittedPaths.put(category, commitPaths); + } + } + return mvPaths; + } /* * @returns Map> - Map of StreamName, Set of paths committed * for stream */ - private Map> doLocalCommit(long commitTime, - Map> categoriesToCommit) throws Exception { - Map> comittedPaths = new HashMap>(); - Set>> commitEntries = categoriesToCommit - .entrySet(); - Iterator it = commitEntries.iterator(); - while (it.hasNext()) { - Map.Entry> entry = (Map.Entry>) it - .next(); - String category = entry.getKey(); - List filesInCategory = entry.getValue(); - for (Path filePath : filesInCategory) { - Path destParentPath = new Path(getDestCluster().getFinalDestDir( - category, commitTime)); - if (!getDestFs().exists(destParentPath)) { - getDestFs().mkdirs(destParentPath); - } - LOG.debug("Moving from intermediatePath [" + filePath + "] to [" - + destParentPath + "]"); - if (getDestFs().rename(filePath, destParentPath) == false) { - LOG.warn("Rename failed, aborting transaction COMMIT to avoid " + - "dataloss. Partial data replay could happen in next run"); - throw new Exception("Abort transaction Commit. Rename failed from [" - + filePath + "] to [" + destParentPath + "]"); - } - Path commitPath = new Path(destParentPath, filePath.getName()); - Set commitPaths = comittedPaths.get(category); - if (commitPaths == null) { - commitPaths = new HashSet(); - } - commitPaths.add(commitPath); - comittedPaths.put(category, commitPaths); - } - } - return comittedPaths; + private void doLocalCommit(Map commitPaths) throws Exception { + LOG.info("Committing " + commitPaths.size() + " paths."); + FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); + for (Map.Entry entry : commitPaths.entrySet()) { + LOG.info("Renaming " + entry.getKey() + " to " + entry.getValue()); + fs.mkdirs(entry.getValue().getParent()); + if (fs.rename(entry.getKey(), entry.getValue()) == false) { + LOG.warn("Rename failed, aborting transaction COMMIT to avoid " + + "dataloss. Partial data replay could happen in next run"); + throw new Exception("Abort transaction Commit. Rename failed from [" + + entry.getKey() + "] to [" + entry.getValue() + "]"); + } + } } protected Path getInputPath() throws IOException { From bc75e12be13139b3a2df80d3aeacccf880da4177 Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Fri, 19 Oct 2012 13:39:31 +0530 Subject: [PATCH 2/8] hole in mirror stream --- .../databus/distcp/MergedStreamService.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index 4962f71..ab17f69 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -100,28 +100,28 @@ public void execute() throws Exception { LOG.warn("Problem in MergedStream distcp PULL..skipping commit for this run"); skipCommit = true; } - Map> committedPaths = null; + Map> toBeCommittedPaths = null; // if success if (!skipCommit) { Map> categoriesToCommit = prepareForCommit(tmpOut); - committedPaths = new HashMap>(); + toBeCommittedPaths = new HashMap>(); synchronized (getDestCluster()) { long commitTime = getDestCluster().getCommitTime(); addPublishMissingPaths(missingDirsCommittedPaths, commitTime, categoriesToCommit.keySet()); Map commitPaths = findCommitedPaths(tmpOut, commitTime, - categoriesToCommit, committedPaths); + categoriesToCommit, toBeCommittedPaths); for (Map.Entry> entry : missingDirsCommittedPaths .entrySet()) { - Set filesList = committedPaths.get(entry.getKey()); + Set filesList = toBeCommittedPaths.get(entry.getKey()); if (filesList != null) filesList.addAll(entry.getValue()); else - committedPaths.put(entry.getKey(), entry.getValue()); + toBeCommittedPaths.put(entry.getKey(), entry.getValue()); } // Prepare paths for MirrorStreamConsumerService - Map consumepaths = commitMirroredConsumerPaths(committedPaths, tmp); + Map consumepaths = commitMirroredConsumerPaths(toBeCommittedPaths, tmp); commitPaths.putAll(consumepaths); // category, Set of Paths to commit doLocalCommit(commitPaths); @@ -233,7 +233,12 @@ private Map commitMirroredConsumerPaths( consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath); } // for each consumer } // for each stream - + if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) { + LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit"); + missingDirsCommittedPaths.clear(); + return null; + } + return consumerCommitPaths; } @@ -243,7 +248,6 @@ private Map> prepareForCommit(Path tmpOut) FileStatus[] allFiles = getDestFs().listStatus(tmpOut); for (int i = 0; i < allFiles.length; i++) { String fileName = allFiles[i].getPath().getName(); - LOG.info("added filename in merged " + fileName); if (fileName != null) { String category = getCategoryFromFileName(fileName); if (category != null) { @@ -288,7 +292,6 @@ public Map findCommitedPaths(Path tmpOut, long commitTime, // find final destination paths Map mvPaths = new LinkedHashMap(); - // Map> comittedPaths = new HashMap>(); Set>> commitEntries = categoriesToCommit .entrySet(); Iterator it = commitEntries.iterator(); @@ -300,10 +303,6 @@ public Map findCommitedPaths(Path tmpOut, long commitTime, for (Path filePath : filesInCategory) { Path destParentPath = new Path(getDestCluster().getFinalDestDir( category, commitTime)); - - LOG.debug("Moving from intermediatePath [" + filePath + "] to [" - + destParentPath + "]"); - Path commitPath = new Path(destParentPath, filePath.getName()); mvPaths.put(filePath, commitPath); Set commitPaths = comittedPaths.get(category); From 6a0c367d72a7cca640605bfad429be9ee23e5855 Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Fri, 19 Oct 2012 15:11:40 +0530 Subject: [PATCH 3/8] indention change --- .../databus/distcp/MergedStreamService.java | 58 +++++++++---------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index ab17f69..ccaeaeb 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -233,12 +233,7 @@ private Map commitMirroredConsumerPaths( consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath); } // for each consumer } // for each stream - if (consumerCommitPaths == null || consumerCommitPaths.size() == 0) { - LOG.info("consumerCommitPaths is empty for all stream, skipping mirrorCommit"); - missingDirsCommittedPaths.clear(); - return null; - } - + missingDirsCommittedPaths.clear(); return consumerCommitPaths; } @@ -287,33 +282,32 @@ private Map> prepareForCommit(Path tmpOut) public Map findCommitedPaths(Path tmpOut, long commitTime, Map> categoriesToCommit, Map> comittedPaths) throws Exception { - FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); - - // find final destination paths - Map mvPaths = new LinkedHashMap(); + FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); - Set>> commitEntries = categoriesToCommit - .entrySet(); - Iterator it = commitEntries.iterator(); - while (it.hasNext()) { - Map.Entry> entry = (Map.Entry>) it - .next(); - String category = entry.getKey(); - List filesInCategory = entry.getValue(); - for (Path filePath : filesInCategory) { - Path destParentPath = new Path(getDestCluster().getFinalDestDir( - category, commitTime)); - Path commitPath = new Path(destParentPath, filePath.getName()); - mvPaths.put(filePath, commitPath); - Set commitPaths = comittedPaths.get(category); - if (commitPaths == null) { - commitPaths = new HashSet(); - } - commitPaths.add(commitPath); - comittedPaths.put(category, commitPaths); - } - } - return mvPaths; + // find final destination paths + Map mvPaths = new LinkedHashMap(); + Set>> commitEntries = categoriesToCommit + .entrySet(); + Iterator it = commitEntries.iterator(); + while (it.hasNext()) { + Map.Entry> entry = (Map.Entry>) it + .next(); + String category = entry.getKey(); + List filesInCategory = entry.getValue(); + for (Path filePath : filesInCategory) { + Path destParentPath = new Path(getDestCluster().getFinalDestDir( + category, commitTime)); + Path commitPath = new Path(destParentPath, filePath.getName()); + mvPaths.put(filePath, commitPath); + Set commitPaths = comittedPaths.get(category); + if (commitPaths == null) { + commitPaths = new HashSet(); + } + commitPaths.add(commitPath); + comittedPaths.put(category, commitPaths); + } + } + return mvPaths; } /* From 8062f26d90f4a7b4528e7832fbcb8483f56a348d Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Fri, 2 Nov 2012 14:39:52 +0530 Subject: [PATCH 4/8] committing missing directories --- .../databus/distcp/MergedStreamService.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index ccaeaeb..840ec78 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -83,7 +83,9 @@ public void execute() throws Exception { if (missingDirsCommittedPaths.size() > 0) { LOG.warn("Adding Missing Directories for Pull " + missingDirsCommittedPaths.size()); - commitMirroredConsumerPaths(missingDirsCommittedPaths, tmp); + Map missingConsumePaths = commitMirroredConsumerPaths( + missingDirsCommittedPaths, tmp); + doLocalCommit(missingConsumePaths); } return; } @@ -100,31 +102,32 @@ public void execute() throws Exception { LOG.warn("Problem in MergedStream distcp PULL..skipping commit for this run"); skipCommit = true; } - Map> toBeCommittedPaths = null; + Map> tobeCommittedPaths = null; // if success if (!skipCommit) { Map> categoriesToCommit = prepareForCommit(tmpOut); - toBeCommittedPaths = new HashMap>(); + tobeCommittedPaths = new HashMap>(); synchronized (getDestCluster()) { long commitTime = getDestCluster().getCommitTime(); addPublishMissingPaths(missingDirsCommittedPaths, commitTime, categoriesToCommit.keySet()); - Map commitPaths = findCommitedPaths(tmpOut, commitTime, - categoriesToCommit, toBeCommittedPaths); + Map commitPaths = createLocalCommitPaths(tmpOut, commitTime, + categoriesToCommit, tobeCommittedPaths); for (Map.Entry> entry : missingDirsCommittedPaths .entrySet()) { - Set filesList = toBeCommittedPaths.get(entry.getKey()); + Set filesList = tobeCommittedPaths.get(entry.getKey()); if (filesList != null) filesList.addAll(entry.getValue()); - else - toBeCommittedPaths.put(entry.getKey(), entry.getValue()); + else + tobeCommittedPaths.put(entry.getKey(), entry.getValue()); } // Prepare paths for MirrorStreamConsumerService - Map consumepaths = commitMirroredConsumerPaths(toBeCommittedPaths, tmp); - commitPaths.putAll(consumepaths); + Map consumepaths = commitMirroredConsumerPaths(tobeCommittedPaths, tmp); + doLocalCommit(consumepaths); // category, Set of Paths to commit doLocalCommit(commitPaths); + missingDirsCommittedPaths.clear(); } // Cleanup happens in parallel without sync @@ -233,7 +236,6 @@ private Map commitMirroredConsumerPaths( consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath); } // for each consumer } // for each stream - missingDirsCommittedPaths.clear(); return consumerCommitPaths; } @@ -279,9 +281,9 @@ private Map> prepareForCommit(Path tmpOut) return categoriesToCommit; } - public Map findCommitedPaths(Path tmpOut, long commitTime, + public Map createLocalCommitPaths(Path tmpOut, long commitTime, Map> categoriesToCommit, Map> - comittedPaths) throws Exception { + tobeCommittedPaths) throws Exception { FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); // find final destination paths @@ -299,12 +301,12 @@ public Map findCommitedPaths(Path tmpOut, long commitTime, category, commitTime)); Path commitPath = new Path(destParentPath, filePath.getName()); mvPaths.put(filePath, commitPath); - Set commitPaths = comittedPaths.get(category); + Set commitPaths = tobeCommittedPaths.get(category); if (commitPaths == null) { commitPaths = new HashSet(); } commitPaths.add(commitPath); - comittedPaths.put(category, commitPaths); + tobeCommittedPaths.put(category, commitPaths); } } return mvPaths; From c04afff5ca4bc46ae769e8d55d2bec251ebb67a8 Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Mon, 5 Nov 2012 14:50:04 +0530 Subject: [PATCH 5/8] clear all the missing dirs and do local commit in commitMirroredConsumerPaths method --- .../inmobi/databus/distcp/MergedStreamService.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index 840ec78..b4d1141 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -83,9 +83,7 @@ public void execute() throws Exception { if (missingDirsCommittedPaths.size() > 0) { LOG.warn("Adding Missing Directories for Pull " + missingDirsCommittedPaths.size()); - Map missingConsumePaths = commitMirroredConsumerPaths( - missingDirsCommittedPaths, tmp); - doLocalCommit(missingConsumePaths); + commitMirroredConsumerPaths(missingDirsCommittedPaths, tmp); } return; } @@ -123,13 +121,10 @@ public void execute() throws Exception { tobeCommittedPaths.put(entry.getKey(), entry.getValue()); } // Prepare paths for MirrorStreamConsumerService - Map consumepaths = commitMirroredConsumerPaths(tobeCommittedPaths, tmp); - doLocalCommit(consumepaths); + commitMirroredConsumerPaths(tobeCommittedPaths, tmp); // category, Set of Paths to commit doLocalCommit(commitPaths); - missingDirsCommittedPaths.clear(); } - // Cleanup happens in parallel without sync // no race is there in consumePaths, tmpOut doFinalCommit(consumePaths); @@ -236,6 +231,8 @@ private Map commitMirroredConsumerPaths( consumerCommitPaths.put(tmpConsumerPath, finalMirrorPath); } // for each consumer } // for each stream + doLocalCommit(consumerCommitPaths); + missingDirsCommittedPaths.clear(); return consumerCommitPaths; } @@ -277,7 +274,6 @@ private Map> prepareForCommit(Path tmpOut) } } } - return categoriesToCommit; } From 614e19bce094e08b4d69dde857014fd94251a569 Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Tue, 6 Nov 2012 10:12:55 +0530 Subject: [PATCH 6/8] commitMirroredConsumePaths method does not need to return --- .../java/com/inmobi/databus/distcp/MergedStreamService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index b4d1141..2500770 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -176,7 +176,7 @@ private void addPublishMissingPaths( * @param Map> commitedPaths - Stream Name, It's committed * Path. */ - private Map commitMirroredConsumerPaths( + private void commitMirroredConsumerPaths( Map> committedPaths, Path tmp) throws Exception { // Map of Stream and clusters where it's mirrored Map> mirrorStreamConsumers = new HashMap>(); @@ -233,7 +233,6 @@ private Map commitMirroredConsumerPaths( } // for each stream doLocalCommit(consumerCommitPaths); missingDirsCommittedPaths.clear(); - return consumerCommitPaths; } private Map> prepareForCommit(Path tmpOut) From f3ec10f1052026d9cdee9e51348e3c6878a5e09c Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Tue, 6 Nov 2012 13:24:57 +0530 Subject: [PATCH 7/8] added comments to the createLocalComitPaths method --- .../com/inmobi/databus/distcp/MergedStreamService.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index 2500770..d468313 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -275,7 +275,11 @@ private Map> prepareForCommit(Path tmpOut) } return categoriesToCommit; } - + + /* + * @returns Map - Map of filePath, destinationPath committed + * for stream + */ public Map createLocalCommitPaths(Path tmpOut, long commitTime, Map> categoriesToCommit, Map> tobeCommittedPaths) throws Exception { @@ -307,10 +311,6 @@ public Map createLocalCommitPaths(Path tmpOut, long commitTime, return mvPaths; } - /* - * @returns Map> - Map of StreamName, Set of paths committed - * for stream - */ private void doLocalCommit(Map commitPaths) throws Exception { LOG.info("Committing " + commitPaths.size() + " paths."); FileSystem fs = FileSystem.get(getDestCluster().getHadoopConf()); From 79fadaf0ef8d8d5bf730d247282b7844c8d7be8b Mon Sep 17 00:00:00 2001 From: "raju.b41" Date: Mon, 19 Nov 2012 15:42:04 +0530 Subject: [PATCH 8/8] commitMirroredConsumePaths method invocation shifted to outside of synchronized block --- .../databus/distcp/MergedStreamService.java | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java index d468313..1d1b828 100644 --- a/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java +++ b/databus-worker/src/main/java/com/inmobi/databus/distcp/MergedStreamService.java @@ -101,7 +101,7 @@ public void execute() throws Exception { skipCommit = true; } Map> tobeCommittedPaths = null; - + Map commitPaths = new HashMap(); // if success if (!skipCommit) { Map> categoriesToCommit = prepareForCommit(tmpOut); @@ -110,7 +110,7 @@ public void execute() throws Exception { long commitTime = getDestCluster().getCommitTime(); addPublishMissingPaths(missingDirsCommittedPaths, commitTime, categoriesToCommit.keySet()); - Map commitPaths = createLocalCommitPaths(tmpOut, commitTime, + commitPaths = createLocalCommitPaths(tmpOut, commitTime, categoriesToCommit, tobeCommittedPaths); for (Map.Entry> entry : missingDirsCommittedPaths .entrySet()) { @@ -120,11 +120,12 @@ public void execute() throws Exception { else tobeCommittedPaths.put(entry.getKey(), entry.getValue()); } - // Prepare paths for MirrorStreamConsumerService - commitMirroredConsumerPaths(tobeCommittedPaths, tmp); - // category, Set of Paths to commit - doLocalCommit(commitPaths); } + // Prepare paths for MirrorStreamConsumerService + commitMirroredConsumerPaths(tobeCommittedPaths, tmp); + // category, Set of Paths to commit + doLocalCommit(commitPaths); + // Cleanup happens in parallel without sync // no race is there in consumePaths, tmpOut doFinalCommit(consumePaths); @@ -175,6 +176,20 @@ private void addPublishMissingPaths( /* * @param Map> commitedPaths - Stream Name, It's committed * Path. + * tmpConsumePath: hdfsUrl/rootDir/system/tmp/ + * distcp_mergedStream_databusdev1_databusdev2/tmp/src_sourceCluster_via_ + * destinationCluster_mirrorto_consumername_streamname + * final Mirror Path: hdfsUrl/rootDir/system/mirrors/ + * databusdev1/src_sourceCluster_via_destinationCluster_mirrorto_consumerName_ + * benchmark_merge_filestatus + * Example paths: + * tmpConsumePath:hdfs://databusdev2.mkhoj.com:9000/databus/system/tmp/ + * distcp_mergedStream_databusdev1_databusdev2/tmp/ + * src_databusdev1_via_databusdev2_mirrorto_databusdev1_benchmark_merge + * + * finalMirrorPath: hdfs://databusdev2.mkhoj.com:9000/databus/system/mirrors/ + * databusdev1/src_databusdev1_via_databusdev2_mirrorto_databusdev1_ + * benchmark_merge_1352687040907 */ private void commitMirroredConsumerPaths( Map> committedPaths, Path tmp) throws Exception { @@ -279,6 +294,10 @@ private Map> prepareForCommit(Path tmpOut) /* * @returns Map - Map of filePath, destinationPath committed * for stream + * destinationPath : hdfsUrl/rootdir/streams/category/YYYY/MM/HH/MN/filename + * Example path: + * hdfs://databusdev2.mkhoj.com:9000/databus/streams/test-topic/ + * 2012/10/00/00/filename */ public Map createLocalCommitPaths(Path tmpOut, long commitTime, Map> categoriesToCommit, Map>