diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm index 614d0e2b0..273e3e08f 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm @@ -355,6 +355,74 @@ sub store_a_semaphored_group_of_jobs { } +=head2 store_nested_semaphores + + Arg [1] : Hashref $semaphore_hash + Arg [2] : (optional) Bio::EnsEMBL::Hive::AnalysisJob $emitting_job + Arg [3] : (optional) Bio::EnsEMBL::Hive::Semaphore $controlled_semaphore + Example : my $funnel_job = $job_adaptor->store_nested_semaphores($semaphore_hash, $emitting_job); + Description: Creates a nested hierarchy of jobs and semaphores following $semaphore_hash. This + method allows certain combinations of jobs and semaphores that can't be achieved + through the PipeConfig language for dataflows. + $semaphore_hash is expected to have two keys: + - "analysis" (mapped to a valid Bio::EnsEMBL::Hive::Analysis object), + - "input_id" (mapped to the input_id - a string or a hashref - of a job to create). + An optional third key, "required_jobs", should map to an arrayref of hashref of the + same structure that describe the jobs that are required to complete before this job + is allowed to start (a semaphore will be used to ensure this). + The jobs will be created as children of $emitting_job if the latter is provided, and + will inherit its stack parameters and accu. + The job created will be added to $controlled_semaphore if provided. If the latter is + not provided but $emitting_job has a controlled_semaphore, the method will in effect + escape from $emitting_job's semaphore. + NOTE: this method has only been tested with jobs and analyses belonging to the same + database. + Example: + { + 'analysis' => $analysis1, + 'input_id' => {'alpha' => 2}, + 'required_jobs' => [ + { + 'analysis' => $analysis2, + 'input_id' => {'beta' => 21}, + }, + { + 'analysis' => $analysis1, + 'input_id' => {'beta' => 22}, + }, + ], + } + Returntype : Bio::EnsEMBL::Hive::AnalysisJob $job + +=cut + +sub store_nested_semaphores { + my ($self, $semaphore_hash, $emitting_job, $controlled_semaphore) = @_; + + my $job = Bio::EnsEMBL::Hive::AnalysisJob->new( + 'prev_job' => $emitting_job, + 'analysis' => $semaphore_hash->{'analysis'}, + 'hive_pipeline' => $semaphore_hash->{'analysis'}->hive_pipeline, + 'input_id' => $semaphore_hash->{'input_id'}, + 'controlled_semaphore' => $controlled_semaphore, + ); + + if ($emitting_job) { + $job->param_id_stack($emitting_job->param_id_stack); + $job->accu_id_stack($emitting_job->accu_id_stack); + } + + if (my @required_jobs = @{$semaphore_hash->{'required_jobs'} // []}) { + my ($semaphore_id, $funnel_job_id, @fan_job_ids) = $self->store_a_semaphored_group_of_jobs($job, []); + my $semaphore = $self->db->get_SemaphoreAdaptor->fetch_by_dbID($semaphore_id); + my @fan_jobs = map {$self->store_nested_semaphores($_, $emitting_job, $semaphore)} @required_jobs; + $self->semaphore_job_by_id($funnel_job_id); + } else { + my ($job_id) = $self->store_jobs_and_adjust_counters([$job], 0); + } + return $job; +} + =head2 fetch_all_by_analysis_id_status diff --git a/t/02.api/job_adaptor.t b/t/02.api/job_adaptor.t new file mode 100755 index 000000000..e43974d2c --- /dev/null +++ b/t/02.api/job_adaptor.t @@ -0,0 +1,211 @@ +#!/usr/bin/env perl +# Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute +# Copyright [2016-2020] EMBL-European Bioinformatics Institute +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +use strict; +use warnings; + +use Data::Dumper; + +use Test::More; +use Test::Exception; + +use Bio::EnsEMBL::Hive::AnalysisJob; +use Bio::EnsEMBL::Hive::HivePipeline; +use Bio::EnsEMBL::Hive::Utils::Test qw(get_test_url_or_die make_hive_db run_sql_on_db); + +# eHive needs this to initialize the pipeline (and run db_cmd.pl) +use Cwd (); +use File::Basename (); +$ENV{'EHIVE_ROOT_DIR'} ||= File::Basename::dirname( File::Basename::dirname( File::Basename::dirname( Cwd::realpath($0) ) ) ); + +my $pipeline_url = get_test_url_or_die(); +my $dbc = make_hive_db($pipeline_url); + +# Minimal pipeline on which we can create jobs +my $pipeline = Bio::EnsEMBL::Hive::HivePipeline->new(-url => $pipeline_url, -no_sql_schema_version_check => 1); +my ($rc) = $pipeline->add_new_or_update('ResourceClass', name => 'res'); +my ($analysis1) = $pipeline->add_new_or_update('Analysis', logic_name => 'first', module => 'Mod', resource_class => $rc); +my ($analysis2) = $pipeline->add_new_or_update('Analysis', logic_name => 'second', module => 'Mod', resource_class => $rc); +$pipeline->save_collections; +my $job_adaptor = $pipeline->hive_dba->get_AnalysisJobAdaptor; +my $semaphore_adaptor = $pipeline->hive_dba->get_SemaphoreAdaptor; + +sub _assert_store_nested_semaphores { + my ($title, $semaphore_hash, $expected_jobs, $expected_semaphores, $min_job_id, $min_semaphore_id, $emitting_job, $controlled_semaphore) = @_; + + # Store the jobs and the semaphores + $job_adaptor->store_nested_semaphores($semaphore_hash, $emitting_job, $controlled_semaphore); + + # Fetch the jobs that have been stored + my $jobs = $job_adaptor->fetch_all("job_id >= $min_job_id"); + my @job_fields = qw(dbID analysis_id prev_job_id input_id param_id_stack accu_id_stack controlled_semaphore_id); + my @got_jobs; + foreach my $job (sort {$a->dbID <=> $b->dbID} @$jobs) { + push @got_jobs, [map {$job->$_} @job_fields]; + } + + # Fetch the semaphores that have been stored + my $semaphores = $semaphore_adaptor->fetch_all("semaphore_id >= $min_semaphore_id"); + my @semaphore_fields = qw(dbID local_jobs_counter dependent_job_id); + my @got_semaphores; + foreach my $semaphore (sort {$a->dbID <=> $b->dbID} @$semaphores) { + push @got_semaphores, [map {$semaphore->$_} @semaphore_fields]; + } + + # Check that the jobs and the semaphores are correct + subtest $title => sub { + is_deeply(\@got_jobs, $expected_jobs, 'The jobs were correctly stored and linked') + or diag Dumper(\@got_jobs); + is_deeply(\@got_semaphores, $expected_semaphores, 'The semaphores were correctly stored and linked') + or diag Dumper(\@got_semaphores); + }; +} + +subtest 'store_nested_semaphores' => sub { + + _assert_store_nested_semaphores( + 'Single job with no semaphore', + { + 'analysis' => $analysis1, + 'input_id' => {'alpha' => 1}, + }, [ + # 1 job created + [1, 1, undef, '{"alpha" => 1}', '', '', undef], + ], [ + # No semaphores + ], + 1, 1, + ); + + _assert_store_nested_semaphores( + 'Depth 1, all jobs belonging to the same analysis', + { + 'analysis' => $analysis1, + 'input_id' => {'alpha' => 2}, + 'required_jobs' => [ + { + 'analysis' => $analysis1, + 'input_id' => {'beta' => 21}, + }, + { + 'analysis' => $analysis1, + 'input_id' => {'beta' => 22}, + }, + ], + }, [ + # 3 jobs created + [2, 1, undef, '{"alpha" => 2}', '', '', undef], + [3, 1, undef, '{"beta" => 21}', '', '', 1], + [4, 1, undef, '{"beta" => 22}', '', '', 1], + ], [ + # 1 semaphore created + [1, 2, 2], + ], + 2, 1, + ); + + _assert_store_nested_semaphores( + 'Depth 2, mixed analyses', + { + 'analysis' => $analysis1, + 'input_id' => {'alpha' => 3}, + 'required_jobs' => [ + { + 'analysis' => $analysis2, + 'input_id' => {'beta' => 31}, + 'required_jobs' => [ + { + 'analysis' => $analysis1, + 'input_id' => {'gamma' => 311}, + }, + { + 'analysis' => $analysis1, + 'input_id' => {'gamma' => 312}, + }, + { + 'analysis' => $analysis1, + 'input_id' => {'gamma' => 313}, + }, + ], + }, + { + 'analysis' => $analysis1, + 'input_id' => {'beta' => 32}, + }, + ], + }, [ + # 6 jobs created + [5, 1, undef, '{"alpha" => 3}', '', '', undef], + [6, 2, undef, '{"beta" => 31}', '', '', 2], + [7, 1, undef, '{"gamma" => 311}', '', '', 3], + [8, 1, undef, '{"gamma" => 312}', '', '', 3], + [9, 1, undef, '{"gamma" => 313}', '', '', 3], + [10, 1, undef, '{"beta" => 32}', '', '', 2], + ], [ + # 2 semaphores created + [2, 2, 5], + [3, 3, 6], + ], + 5, 2, + ); + + my $emitting_job = Bio::EnsEMBL::Hive::AnalysisJob->new( + 'analysis' => $analysis2, + 'hive_pipeline' => $pipeline, + 'input_id' => {}, + 'param_id_stack' => '100', + 'accu_id_stack' => '101', + ); + + _assert_store_nested_semaphores( + 'Single job but with an emitting_job', + { + 'analysis' => $analysis1, + 'input_id' => {'alpha' => 4}, + }, [ + # 1 job created + [11, 1, undef, '{"alpha" => 4}', '100', '101', undef], + ], [ + # No new semaphores + ], + 11, 4, + $emitting_job, + ); + + _assert_store_nested_semaphores( + 'Single job in an existing semaphore and with an emitting_job', + { + 'analysis' => $analysis1, + 'input_id' => {'alpha' => 5}, + }, [ + # 1 job created + [12, 1, undef, '{"alpha" => 5}', '100', '101', 3], + ], [ + # The semaphore should have been topped-up + [3, 4, 6], + ], + 12, 3, + $emitting_job, + $semaphore_adaptor->fetch_by_dbID(3), # Reuse an existing semaphore + ); +}; + +$dbc->disconnect_if_idle(); +run_sql_on_db($pipeline_url, 'DROP DATABASE'); + +done_testing(); +