Skip to content

Commit

Permalink
Merge pull request #29 from tcezard/nextflow_join_no_dep
Browse files Browse the repository at this point in the history
Allow nextflow pipeline to be joined without dependencies
  • Loading branch information
tcezard committed Jul 27, 2022
2 parents 97cc5c2 + 991c801 commit ac5e720
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
mongodb-version: [4.0.18]
python-version: [3.6, 3.8]
python-version: [3.8, 3.9]
java-version: [1.8]

steps:
Expand Down
21 changes: 14 additions & 7 deletions ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,30 @@ def run_pipeline(self, workflow_file_path: str, nextflow_binary_path: str = 'nex
run_command_with_output(f"Running pipeline {workflow_file_path}...", workflow_command)

@staticmethod
def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline') -> 'NextFlowPipeline':
def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline',
with_dependencies: bool = True) -> 'NextFlowPipeline':
"""
Join two pipelines.
Join two pipelines with or without dependencies
Returns a new pipeline where:
With Dependencies it returns a new pipeline where:
1) root processes are those of the main pipeline.
2) final processes are those of the dependent pipeline and
3) every root process of the dependent pipeline depends on the final processes of the main pipeline.
Without Dependencies it returns a new pipeline where:
1) the two pipeline are left independent
2) Only shared dependencies
3) every root process of the dependent pipeline depends on the final processes of the main pipeline.
"""
joined_pipeline = NextFlowPipeline()
# Aggregate dependency maps of both pipelines
joined_pipeline.process_dependency_map = nx.compose(main_pipeline.process_dependency_map,
dependent_pipeline.process_dependency_map)
for final_process_in_main_pipeline in main_pipeline._get_final_processes():
for root_process_in_dependent_pipeline in dependent_pipeline._get_root_processes():
joined_pipeline.add_process_dependency(root_process_in_dependent_pipeline,
final_process_in_main_pipeline)
if with_dependencies:
for final_process_in_main_pipeline in main_pipeline._get_final_processes():
for root_process_in_dependent_pipeline in dependent_pipeline._get_root_processes():
joined_pipeline.add_process_dependency(root_process_in_dependent_pipeline,
final_process_in_main_pipeline)
return joined_pipeline

def _get_root_processes(self) -> List[NextFlowProcess]:
Expand Down
45 changes: 39 additions & 6 deletions tests/test_nextflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ def test_non_linear_pipeline_join(self):
p6 = NextFlowProcess(process_name="sixth_process",
command_to_run=f"echo sixth_process >> {pipeline_output_file}")
# Dependency graphs
# p1 p4 p5 p1
# p1 p4 p5 p1
# / \ + \ / = / \
# p2 p3 p6 p2 p3
# / \/\
# / /\ \
# p4 p5
# \ /
# p6
# /\ /\
# | \/ |
# p4 p5
# \ /
# p6
pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]})
pipe2 = NextFlowPipeline(process_dependency_map={p6: [p4, p5]})
pipe3 = NextFlowPipeline.join_pipelines(pipe1, pipe2)
Expand All @@ -165,3 +165,36 @@ def test_non_linear_pipeline_join(self):
# Due to parallelism, order in which p4 and p5 are executed cannot be guaranteed
self.assertTrue("fourth_process" in lines[3:5] and "fifth_process" in lines[3:5])
self.assertEqual("sixth_process", lines[5])

def test_pipeline_join_no_dependencies(self):
pipeline_output_file = os.path.join(self.nextflow_test_dir, "non_linear_pipeline_output.txt")
if os.path.isfile(pipeline_output_file):
os.remove(pipeline_output_file)

p1 = NextFlowProcess(process_name="first_process",
command_to_run=f"echo first_process >> {pipeline_output_file}")
p2 = NextFlowProcess(process_name="second_process",
command_to_run=f"echo second_process >> {pipeline_output_file}")
p3 = NextFlowProcess(process_name="third_process",
command_to_run=f"echo third_process >> {pipeline_output_file}")
p4 = NextFlowProcess(process_name="fourth_process",
command_to_run=f"echo fourth_process >> {pipeline_output_file}")
p5 = NextFlowProcess(process_name="fifth_process",
command_to_run=f"echo fifth_process >> {pipeline_output_file}")
# Dependency graphs
# p1 p1 p4 -p1-- p4
# / \ + \ / = / \ \ /
# p2 p3 p5 p2 p3 p5
#
pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]})
pipe2 = NextFlowPipeline(process_dependency_map={p5: [p1, p4]})
pipe3 = NextFlowPipeline.join_pipelines(pipe1, pipe2, with_dependencies=False)
pipe3.run_pipeline(workflow_file_path=self.non_linear_workflow_file_path, working_dir=self.nextflow_test_dir,
nextflow_config_path=self.nextflow_config_file)
lines = [line.strip() for line in open(pipeline_output_file).readlines()]
self.assertEqual(5, len(lines))
# Due to parallelism, order in which p1 and p4 are executed cannot be guaranteed
self.assertTrue("first_process" in lines[0:2] and "fourth_process" in lines[0:2])
# Due to parallelism, order in which p2, p3 and p5 are executed cannot be guaranteed
self.assertTrue("second_process" in lines[2:5] and "third_process" in lines[2:5] and
"fifth_process" in lines[2:5])

0 comments on commit ac5e720

Please sign in to comment.