diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 42621ef..8e42a2b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: mongodb-version: [4.0.18] - python-version: [3.6, 3.8] + python-version: [3.8, 3.9] java-version: [1.8] steps: diff --git a/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py b/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py index e2cd45d..09735bf 100644 --- a/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py +++ b/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py @@ -100,23 +100,30 @@ def run_pipeline(self, workflow_file_path: str, nextflow_binary_path: str = 'nex run_command_with_output(f"Running pipeline {workflow_file_path}...", workflow_command) @staticmethod - def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline') -> 'NextFlowPipeline': + def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline', + with_dependencies: bool = True) -> 'NextFlowPipeline': """ - Join two pipelines. + Join two pipelines with or without dependencies - Returns a new pipeline where: + With Dependencies it returns a new pipeline where: 1) root processes are those of the main pipeline. 2) final processes are those of the dependent pipeline and 3) every root process of the dependent pipeline depends on the final processes of the main pipeline. + Without Dependencies it returns a new pipeline where: + 1) the two pipeline are left independent + 2) Only shared dependencies + 3) every root process of the dependent pipeline depends on the final processes of the main pipeline. + """ joined_pipeline = NextFlowPipeline() # Aggregate dependency maps of both pipelines joined_pipeline.process_dependency_map = nx.compose(main_pipeline.process_dependency_map, dependent_pipeline.process_dependency_map) - for final_process_in_main_pipeline in main_pipeline._get_final_processes(): - for root_process_in_dependent_pipeline in dependent_pipeline._get_root_processes(): - joined_pipeline.add_process_dependency(root_process_in_dependent_pipeline, - final_process_in_main_pipeline) + if with_dependencies: + for final_process_in_main_pipeline in main_pipeline._get_final_processes(): + for root_process_in_dependent_pipeline in dependent_pipeline._get_root_processes(): + joined_pipeline.add_process_dependency(root_process_in_dependent_pipeline, + final_process_in_main_pipeline) return joined_pipeline def _get_root_processes(self) -> List[NextFlowProcess]: diff --git a/tests/test_nextflow.py b/tests/test_nextflow.py index 3a8cfda..58f1e21 100644 --- a/tests/test_nextflow.py +++ b/tests/test_nextflow.py @@ -143,14 +143,14 @@ def test_non_linear_pipeline_join(self): p6 = NextFlowProcess(process_name="sixth_process", command_to_run=f"echo sixth_process >> {pipeline_output_file}") # Dependency graphs - # p1 p4 p5 p1 + # p1 p4 p5 p1 # / \ + \ / = / \ # p2 p3 p6 p2 p3 - # / \/\ - # / /\ \ - # p4 p5 - # \ / - # p6 + # /\ /\ + # | \/ | + # p4 p5 + # \ / + # p6 pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]}) pipe2 = NextFlowPipeline(process_dependency_map={p6: [p4, p5]}) pipe3 = NextFlowPipeline.join_pipelines(pipe1, pipe2) @@ -165,3 +165,36 @@ def test_non_linear_pipeline_join(self): # Due to parallelism, order in which p4 and p5 are executed cannot be guaranteed self.assertTrue("fourth_process" in lines[3:5] and "fifth_process" in lines[3:5]) self.assertEqual("sixth_process", lines[5]) + + def test_pipeline_join_no_dependencies(self): + pipeline_output_file = os.path.join(self.nextflow_test_dir, "non_linear_pipeline_output.txt") + if os.path.isfile(pipeline_output_file): + os.remove(pipeline_output_file) + + p1 = NextFlowProcess(process_name="first_process", + command_to_run=f"echo first_process >> {pipeline_output_file}") + p2 = NextFlowProcess(process_name="second_process", + command_to_run=f"echo second_process >> {pipeline_output_file}") + p3 = NextFlowProcess(process_name="third_process", + command_to_run=f"echo third_process >> {pipeline_output_file}") + p4 = NextFlowProcess(process_name="fourth_process", + command_to_run=f"echo fourth_process >> {pipeline_output_file}") + p5 = NextFlowProcess(process_name="fifth_process", + command_to_run=f"echo fifth_process >> {pipeline_output_file}") + # Dependency graphs + # p1 p1 p4 -p1-- p4 + # / \ + \ / = / \ \ / + # p2 p3 p5 p2 p3 p5 + # + pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]}) + pipe2 = NextFlowPipeline(process_dependency_map={p5: [p1, p4]}) + pipe3 = NextFlowPipeline.join_pipelines(pipe1, pipe2, with_dependencies=False) + pipe3.run_pipeline(workflow_file_path=self.non_linear_workflow_file_path, working_dir=self.nextflow_test_dir, + nextflow_config_path=self.nextflow_config_file) + lines = [line.strip() for line in open(pipeline_output_file).readlines()] + self.assertEqual(5, len(lines)) + # Due to parallelism, order in which p1 and p4 are executed cannot be guaranteed + self.assertTrue("first_process" in lines[0:2] and "fourth_process" in lines[0:2]) + # Due to parallelism, order in which p2, p3 and p5 are executed cannot be guaranteed + self.assertTrue("second_process" in lines[2:5] and "third_process" in lines[2:5] and + "fifth_process" in lines[2:5])