From 6eb145368ddb6ea24b298c1974b689e7ac334686 Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Tue, 26 Jul 2022 11:46:25 +0100 Subject: [PATCH 1/5] Allow nextflow pipeline to be joined without dependencies --- .../nextflow/nextflow_pipeline.py | 11 ++++--- tests/test_nextflow.py | 33 +++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py b/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py index e2cd45d..08e48c2 100644 --- a/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py +++ b/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py @@ -100,7 +100,7 @@ def run_pipeline(self, workflow_file_path: str, nextflow_binary_path: str = 'nex run_command_with_output(f"Running pipeline {workflow_file_path}...", workflow_command) @staticmethod - def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline') -> 'NextFlowPipeline': + def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline', with_dependencies=True) -> 'NextFlowPipeline': """ Join two pipelines. @@ -113,10 +113,11 @@ def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextF # Aggregate dependency maps of both pipelines joined_pipeline.process_dependency_map = nx.compose(main_pipeline.process_dependency_map, dependent_pipeline.process_dependency_map) - for final_process_in_main_pipeline in main_pipeline._get_final_processes(): - for root_process_in_dependent_pipeline in dependent_pipeline._get_root_processes(): - joined_pipeline.add_process_dependency(root_process_in_dependent_pipeline, - final_process_in_main_pipeline) + if with_dependencies: + for final_process_in_main_pipeline in main_pipeline._get_final_processes(): + for root_process_in_dependent_pipeline in dependent_pipeline._get_root_processes(): + joined_pipeline.add_process_dependency(root_process_in_dependent_pipeline, + final_process_in_main_pipeline) return joined_pipeline def _get_root_processes(self) -> List[NextFlowProcess]: diff --git a/tests/test_nextflow.py b/tests/test_nextflow.py index 3a8cfda..5bce879 100644 --- a/tests/test_nextflow.py +++ b/tests/test_nextflow.py @@ -165,3 +165,36 @@ def test_non_linear_pipeline_join(self): # Due to parallelism, order in which p4 and p5 are executed cannot be guaranteed self.assertTrue("fourth_process" in lines[3:5] and "fifth_process" in lines[3:5]) self.assertEqual("sixth_process", lines[5]) + + def test_pipeline_join_no_dependencies(self): + pipeline_output_file = os.path.join(self.nextflow_test_dir, "non_linear_pipeline_output.txt") + if os.path.isfile(pipeline_output_file): + os.remove(pipeline_output_file) + + p1 = NextFlowProcess(process_name="first_process", + command_to_run=f"echo first_process >> {pipeline_output_file}") + p2 = NextFlowProcess(process_name="second_process", + command_to_run=f"echo second_process >> {pipeline_output_file}") + p3 = NextFlowProcess(process_name="third_process", + command_to_run=f"echo third_process >> {pipeline_output_file}") + p4 = NextFlowProcess(process_name="fourth_process", + command_to_run=f"echo fourth_process >> {pipeline_output_file}") + p5 = NextFlowProcess(process_name="fifth_process", + command_to_run=f"echo fifth_process >> {pipeline_output_file}") + # Dependency graphs + # p1 p1 p4 p1--- p4 + # / \ + \ / = / \ \ / + # p2 p3 p5 p2 p3 p5 + # + pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]}) + pipe2 = NextFlowPipeline(process_dependency_map={p5: [p1, p4]}) + pipe3 = NextFlowPipeline.join_pipelines(pipe1, pipe2, with_dependencies=False) + pipe3.run_pipeline(workflow_file_path=self.non_linear_workflow_file_path, working_dir=self.nextflow_test_dir, + nextflow_config_path=self.nextflow_config_file) + lines = [line.strip() for line in open(pipeline_output_file).readlines()] + self.assertEqual(5, len(lines)) + # Due to parallelism, order in which p1 and p4 are executed cannot be guaranteed + self.assertTrue("first_process" in lines[0:2] and "fourth_process" in lines[0:2]) + # Due to parallelism, order in which p2, p3 and p5 are executed cannot be guaranteed + self.assertTrue("second_process" in lines[2:5] and "third_process" in lines[2:5] and + "fifth_process" in lines[2:5]) From c41a008e95148dfb7c29d6eea058b37c741d98ec Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Tue, 26 Jul 2022 12:27:31 +0100 Subject: [PATCH 2/5] Better desciption in docstring --- ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py b/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py index 08e48c2..09735bf 100644 --- a/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py +++ b/ebi_eva_common_pyutils/nextflow/nextflow_pipeline.py @@ -100,14 +100,20 @@ def run_pipeline(self, workflow_file_path: str, nextflow_binary_path: str = 'nex run_command_with_output(f"Running pipeline {workflow_file_path}...", workflow_command) @staticmethod - def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline', with_dependencies=True) -> 'NextFlowPipeline': + def join_pipelines(main_pipeline: 'NextFlowPipeline', dependent_pipeline: 'NextFlowPipeline', + with_dependencies: bool = True) -> 'NextFlowPipeline': """ - Join two pipelines. + Join two pipelines with or without dependencies - Returns a new pipeline where: + With Dependencies it returns a new pipeline where: 1) root processes are those of the main pipeline. 2) final processes are those of the dependent pipeline and 3) every root process of the dependent pipeline depends on the final processes of the main pipeline. + Without Dependencies it returns a new pipeline where: + 1) the two pipeline are left independent + 2) Only shared dependencies + 3) every root process of the dependent pipeline depends on the final processes of the main pipeline. + """ joined_pipeline = NextFlowPipeline() # Aggregate dependency maps of both pipelines From 099db1f1a36f5dcd4207e5087552922c1453d50f Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Tue, 26 Jul 2022 12:30:53 +0100 Subject: [PATCH 3/5] move to CI under 3.9 and deprecate 3.6 (pymongo requires 3.7) --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 42621ef..8e42a2b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: mongodb-version: [4.0.18] - python-version: [3.6, 3.8] + python-version: [3.8, 3.9] java-version: [1.8] steps: From f3f0575e2046f7a82e510f4df895b9502828c463 Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Tue, 26 Jul 2022 13:04:44 +0100 Subject: [PATCH 4/5] fix dependency graphs descriptions --- tests/test_nextflow.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_nextflow.py b/tests/test_nextflow.py index 5bce879..18d0f35 100644 --- a/tests/test_nextflow.py +++ b/tests/test_nextflow.py @@ -143,14 +143,14 @@ def test_non_linear_pipeline_join(self): p6 = NextFlowProcess(process_name="sixth_process", command_to_run=f"echo sixth_process >> {pipeline_output_file}") # Dependency graphs - # p1 p4 p5 p1 + # p1 p4 p5 p1 # / \ + \ / = / \ # p2 p3 p6 p2 p3 - # / \/\ - # / /\ \ - # p4 p5 - # \ / - # p6 + # /\ /\ + # | \/ | + # p4 p5 + # \ / + # p6 pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]}) pipe2 = NextFlowPipeline(process_dependency_map={p6: [p4, p5]}) pipe3 = NextFlowPipeline.join_pipelines(pipe1, pipe2) @@ -182,9 +182,9 @@ def test_pipeline_join_no_dependencies(self): p5 = NextFlowProcess(process_name="fifth_process", command_to_run=f"echo fifth_process >> {pipeline_output_file}") # Dependency graphs - # p1 p1 p4 p1--- p4 + # p1 p1 p4 p1--- p4 # / \ + \ / = / \ \ / - # p2 p3 p5 p2 p3 p5 + # p2 p3 p5 p2 p3 p5 # pipe1 = NextFlowPipeline(process_dependency_map={p2: [p1], p3: [p1]}) pipe2 = NextFlowPipeline(process_dependency_map={p5: [p1, p4]}) From 991c801d8fcb8ee7ad1de7b3849582e014b30974 Mon Sep 17 00:00:00 2001 From: Timothee Cezard Date: Tue, 26 Jul 2022 13:25:59 +0100 Subject: [PATCH 5/5] fix dependency graphs descriptions --- tests/test_nextflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_nextflow.py b/tests/test_nextflow.py index 18d0f35..58f1e21 100644 --- a/tests/test_nextflow.py +++ b/tests/test_nextflow.py @@ -182,7 +182,7 @@ def test_pipeline_join_no_dependencies(self): p5 = NextFlowProcess(process_name="fifth_process", command_to_run=f"echo fifth_process >> {pipeline_output_file}") # Dependency graphs - # p1 p1 p4 p1--- p4 + # p1 p1 p4 -p1-- p4 # / \ + \ / = / \ \ / # p2 p3 p5 p2 p3 p5 #