9 An Analysis
object represents a
"stage" of the Hive pipeline that groups together
10 all jobs that share the same module and the same common parameters.
12 Individual Jobs are said to
"belong" to an Analysis.
14 Control rules unblock when their condition Analyses are done.
18 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
19 Copyright [2016-2024] EMBL-European Bioinformatics Institute
21 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
22 You may obtain a copy of the License at
26 Unless required by applicable law or agreed to in writing, software distributed under the License
27 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28 See the License
for the specific language governing permissions and limitations under the License.
32 Please subscribe to the
Hive mailing list: http:
37 package Bio::EnsEMBL::Hive::Analysis;
48 use base (
'Bio::EnsEMBL::Hive::Storable' );
51 sub unikey { #
override the
default from
Cacheable parent
52 return [
'logic_name' ];
58 resource_class_id / resource_class
65 $self->{
'_logic_name'} = shift
if(@_);
66 return $self->{
'_logic_name'};
69 sub name { # a useful synonym
72 return $self->logic_name(@_);
78 $self->{
'_module'} = shift
if(@_);
79 return $self->{
'_module'};
85 $self->{
'_language'} = shift
if(@_);
86 return $self->{
'_language'};
93 my $parameters = shift @_;
94 $self->{
'_parameters'} = ref($parameters) ? stringify($parameters) : $parameters;
96 return $self->{
'_parameters'};
102 $self->{
'_comment'} = shift
if(@_);
104 return $self->{
'_comment'};
110 $self->{
'_tags'} = shift
if(@_);
112 return $self->{
'_tags'};
116 sub failed_job_tolerance {
118 $self->{
'_failed_job_tolerance'} = shift
if(@_);
119 $self->{
'_failed_job_tolerance'}
120 return $self->{
'_failed_job_tolerance'};
124 sub max_retry_count {
126 $self->{
'_max_retry_count'} = shift
if(@_);
127 return $self->{
'_max_retry_count'};
133 $self->{
'_can_be_empty'} = shift
if(@_);
134 $self->{
'_can_be_empty'}
135 return $self->{
'_can_be_empty'};
141 $self->{
'_priority'} = shift
if(@_);
143 return $self->{
'_priority'};
149 $self->{
'_meadow_type'} = shift
if(@_);
150 return $self->{
'_meadow_type'};
154 sub analysis_capacity {
156 $self->{
'_analysis_capacity'} = shift
if(@_);
157 return $self->{
'_analysis_capacity'};
162 $self->{
'_hive_capacity'} = shift
if(@_);
163 return $self->{
'_hive_capacity'};
168 $self->{
'_batch_size'} = shift
if(@_);
169 $self->{
'_batch_size'}
170 return $self->{
'_batch_size'};
173 sub get_compiled_module_name {
176 my $runnable_module_name = $self->module
177 or die
"Analysis '".$self->logic_name.
"' does not have its 'module' defined";
179 if ($self->language) {
181 return 'Bio::EnsEMBL::Hive::GuestProcess';
184 eval
"require $runnable_module_name";
185 die
"The runnable module '$runnable_module_name' cannot be loaded or compiled:\n$@" if($@);
186 die
"Problem accessing methods in '$runnable_module_name'. Please check that it inherits from Bio::EnsEMBL::Hive::Process and is named correctly.\n"
187 unless($runnable_module_name->isa(
'Bio::EnsEMBL::Hive::Process'));
189 die
"DEPRECATED: the strict_hash_format() method is no longer supported in Runnables - the input_id() in '$runnable_module_name' has to be a hash now.\n"
190 if($runnable_module_name->can(
'strict_hash_format'));
192 return $runnable_module_name;
196 sub url_query_params {
200 'logic_name' => $self->logic_name,
207 return $self->logic_name;
214 Example : $stats = $analysis->stats;
215 Description: returns either the previously cached AnalysisStats object, or
if it is missing - pulls a fresh one from the DB.
225 return $self->
hive_pipeline->collection_of(
'AnalysisStats' )->find_one_by(
'analysis', $self);
229 # --------------------------------- dispatch the following calls directly to our Stats: ---------------------------------------
234 return $self->stats->status(@_);
237 # ------------------------------------------------------------------------------------------------------------------------------
240 sub jobs_collection {
243 $self->{
'_jobs_collection'} = shift
if(@_);
245 return $self->{
'_jobs_collection'} ||= [];
249 sub control_rules_collection {
252 return $self->hive_pipeline->collection_of(
'AnalysisCtrlRule' )->find_all_by(
'ctrled_analysis', $self);
256 sub dataflow_rules_collection {
259 return $self->hive_pipeline->collection_of(
'DataflowRule' )->find_all_by(
'from_analysis', $self);
263 =head2 get_grouped_dataflow_rules
266 Example : $groups = $analysis->get_grouped_dataflow_rules;
267 Description: returns a listref of pairs, where the first element is a separate dfr or a funnel, and the second element is a listref of semaphored fan dfrs
272 sub get_grouped_dataflow_rules {
275 my %set_of_groups = (); # Note that the key (being a stringified reference) is unusable,
276 # so we end up packing it as the first element of the structure,
277 # and only returning the listref of the values.
278 my @ordered_keys = (); # Perl is missing an
"Ordered Hash" structure, so we need to maintain the insertion order ourselves
280 my $all_dataflow_rules = $self->dataflow_rules_collection;
282 foreach my $dfr ((grep {$_->funnel_dataflow_rule} @$all_dataflow_rules), (grep {!$_->funnel_dataflow_rule} @$all_dataflow_rules)) {
284 my $df_targets = $dfr->get_my_targets;
286 if(my $funnel_dfr = $dfr->funnel_dataflow_rule) {
287 unless($set_of_groups{$funnel_dfr}) { # both the type check and the initial push will only be done once per funnel
288 my $funnel_targets = $funnel_dfr->get_my_targets;
289 foreach my $funnel_target (@$funnel_targets) {
290 unless($funnel_target->to_analysis->isa(
'Bio::EnsEMBL::Hive::Analysis')) {
291 throw(
"Each conditional branch of a semaphored funnel rule must point at an Analysis");
294 push @ordered_keys, $funnel_dfr;
295 $set_of_groups{$funnel_dfr} = [$funnel_dfr, [], $funnel_targets];
297 my $this_group = $set_of_groups{$funnel_dfr};
299 foreach my $df_target (@$df_targets) {
300 unless($df_target->to_analysis->isa(
'Bio::EnsEMBL::Hive::Analysis')) {
301 throw(
"Each conditional branch of a semaphored fan rule must point at an Analysis");
304 push @{$this_group->[1]}, $dfr;
306 } elsif (!$set_of_groups{$dfr}) {
307 push @ordered_keys, $dfr;
308 $set_of_groups{$dfr} = [$dfr, [], $df_targets];
311 my @sorted_rules = sort { scalar(@{$set_of_groups{$a}->[1]}) <=> scalar(@{$set_of_groups{$b}->[1]}) or $set_of_groups{$a}->[0]->branch_code <=> $set_of_groups{$b}->[0]->branch_code } @ordered_keys;
312 return [
map {$set_of_groups{$_}} @sorted_rules];
316 sub dataflow_rules_by_branch {
319 if (not $self->{
'_dataflow_rules_by_branch'}) {
320 my %dataflow_rules_by_branch = ();
321 foreach my $df_rule (@{$self->dataflow_rules_collection}) {
322 my $dfr_bb = $dataflow_rules_by_branch{ $df_rule->branch_code } ||= []; # no autovivification here, have to
do it manually
324 if($df_rule->funnel_dataflow_rule) { # sort rules so that semaphored fan rules come before other (potentially fan) rules
for the same branch_code
325 unshift @$dfr_bb, $df_rule;
327 push @$dfr_bb, $df_rule;
330 $self->{
'_dataflow_rules_by_branch'} = \%dataflow_rules_by_branch;
333 return $self->{
'_dataflow_rules_by_branch'};
338 my ( $self, $output_ids_for_this_rule, $emitting_job, $same_db_dataflow, $push_emitting_job_on_stack, $df_rule ) = @_;
340 my $param_id_stack =
'';
341 my $accu_id_stack =
'';
342 my $emitting_job_id = undef;
344 if($same_db_dataflow) {
345 $param_id_stack = $emitting_job->param_id_stack;
346 $accu_id_stack = $emitting_job->accu_id_stack;
347 $emitting_job_id = $emitting_job->dbID;
349 if($push_emitting_job_on_stack) {
350 my $input_id = $emitting_job->input_id;
351 my $accu_hash = $emitting_job->accu_hash;
353 if($input_id and ($input_id ne
'{}')) { # add the parent to the param_id_stack
if it had non-trivial extra parameters
354 $param_id_stack = ($param_id_stack ? $param_id_stack.
',' :
'').$emitting_job_id;
356 if(scalar(keys %$accu_hash)) { # add the parent to the accu_id_stack
if it had
"own" accumulator
357 $accu_id_stack = ($accu_id_stack ? $accu_id_stack.
',' :
'').$emitting_job_id;
362 my $common_params = [
363 'prev_job' => $emitting_job,
365 'hive_pipeline' => $self->hive_pipeline, # Although we may not cache jobs, make sure a
new Job
"belongs" to the same pipeline as its Analysis
366 'param_id_stack' => $param_id_stack,
367 'accu_id_stack' => $accu_id_stack,
370 my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
371 my @output_job_ids = ();
373 if( my $funnel_dataflow_rule = $df_rule->funnel_dataflow_rule ) { # members of a semaphored fan will have to wait in cache until the funnel is created:
375 my $fan_cache_this_branch = $emitting_job->fan_cache->{
"$funnel_dataflow_rule"} ||= [];
379 # controlled_semaphore => to be set when the $controlled_semaphore has been stored
380 ) } @$output_ids_for_this_rule;
382 }
else { # either a semaphored funnel or a non-semaphored dataflow:
384 my $fan_jobs =
delete $emitting_job->fan_cache->{
"$df_rule"}; # clear the cache at the same time
386 if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel
388 if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
390 $emitting_job->transient_error(0);
391 die
"Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
396 'input_id' => $output_ids_for_this_rule->[0],
397 'status' =>
'SEMAPHORED',
400 # NB: $job_adaptor happens to belong to the $funnel_job, but not necesarily to $fan_jobs or $emitting_job
401 my ($semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
403 push @output_job_ids, $funnel_job_id, @fan_job_ids;
405 }
else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
409 'controlled_semaphore' => $emitting_job->controlled_semaphore, # propagate parent
's semaphore if any
410 ) } @$output_ids_for_this_rule;
412 # NB: $job_adaptor happens to belong to the @non_semaphored_jobs, but not necessarily to the $emitting_job :
413 push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0, $emitting_job_id) };
417 return \@output_job_ids;
424 return 'Analysis[
'.($self->dbID // '').']:
'.$self->display_name.'->(
'.join(',
', ($self->module // 'no_module
').($self->language ? sprintf(' (%s)
', $self->language) : ''), $self->parameters // '{}
', $self->resource_class ? $self->resource_class->name : 'no_rc
').')
';
428 sub print_diagram_node {
429 my ($self, $ref_pipeline, $prefix, $seen_analyses) = @_;
431 if($seen_analyses->{$self}++) {
432 print "(".$self->relative_display_name($ref_pipeline).")\n"; # NB: the prefix of the label itself is done by the previous level
436 print $self->relative_display_name($ref_pipeline)."\n"; # NB: the prefix of the label itself is done by the previous level
438 my $groups = $self->get_grouped_dataflow_rules;
440 foreach my $i (0..scalar(@$groups)-1) {
442 my ($funnel_dfr, $fan_dfrs, $df_targets) = @{ $groups->[$i] };
444 my $this_funnel_offset = '';
446 if(scalar(@$groups)>1 and scalar(@$fan_dfrs)) { # if more than one group (no single backbone) and semaphored, the semaphored group will be offset:
447 print $prefix." │\n";
448 print $prefix." ╘════╤══╗\n"; # " └────┬──┐\n";
449 $this_funnel_offset = ($i < scalar(@$groups)-1) ? ' │
' : ' '; # non-last vs last group
452 foreach my $j (0..scalar(@$fan_dfrs)-1) { # for each of the dependent fan rules, show them one by one:
453 my $fan_dfr = $fan_dfrs->[$j];
454 my $fan_branch = $fan_dfr->branch_code;
456 print $prefix.$this_funnel_offset." │ ║\n";
457 print $prefix.$this_funnel_offset." │ ║\n";
458 print $prefix.$this_funnel_offset." │ ║#$fan_branch\n";
460 my $fan_df_targets = $fan_dfr->get_my_targets;
462 foreach my $k (0..scalar(@$fan_df_targets)-1) { # for each fan's target
463 my $fan_target = $fan_df_targets->[$k];
465 print $prefix.$this_funnel_offset.
" │ ║\n";
467 if(my $fan_choice = (scalar(@$fan_df_targets)!=1) || defined($fan_target->on_condition)) {
468 if(defined(my $on_condition = $fan_target->on_condition)) {
469 print $prefix.$this_funnel_offset.
" │ ║ WHEN $on_condition\n";
471 print $prefix.$this_funnel_offset.
" │ ║ ELSE\n";
474 print $prefix.$this_funnel_offset.
' │├─╚═> ';
476 my $next_fan_or_condition_offset = ($j<scalar(@$fan_dfrs)-1 or $k<scalar(@$fan_df_targets)-1) ?
' │ ║ ' :
' │ ';
478 if(my $template = $fan_target->input_id_template) {
480 print $prefix.$this_funnel_offset.$next_fan_or_condition_offset.
" │\n";
481 print $prefix.$this_funnel_offset.$next_fan_or_condition_offset.
" V\n";
482 print $prefix.$this_funnel_offset.$next_fan_or_condition_offset;
485 $fan_target->to_analysis->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_fan_or_condition_offset, $seen_analyses );
489 my $funnel_branch = $funnel_dfr->branch_code;
491 print $prefix.$this_funnel_offset.
" │\n";
492 print $prefix.$this_funnel_offset.
" │\n";
493 print $prefix.$this_funnel_offset.
" │#$funnel_branch\n";
495 foreach my $k (0..scalar(@$df_targets)-1) { #
for each funnel
's target
496 my $df_target = $df_targets->[$k];
498 print $prefix.$this_funnel_offset." │\n";
500 my $funnel_choice = (scalar(@$df_targets)!=1) || defined($df_target->on_condition);
503 if(defined(my $on_condition = $df_target->on_condition)) {
504 print $prefix.$this_funnel_offset." │ WHEN $on_condition\n";
506 print $prefix.$this_funnel_offset." │ ELSE\n";
510 my $next_funnel_or_condition_offset = '';
512 if( (scalar(@$groups)==1 or $this_funnel_offset) and !$funnel_choice ) { # 'the only group
' (backbone) or a semaphore funnel ...
513 print $prefix.$this_funnel_offset." V\n"; # ... make a vertical arrow
514 print $prefix.$this_funnel_offset;
516 print $prefix.$this_funnel_offset.' └─▻
'; # otherwise fork to the right
517 $next_funnel_or_condition_offset = ($i<scalar(@$groups)-1 or $k<scalar(@$df_targets)-1) ? ' │
' : ' ';
519 if(my $template = $df_target->input_id_template) {
521 print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." │\n";
522 print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." V\n";
523 print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset;
526 my $target = $df_target->to_analysis;
527 if($target->can('print_diagram_node
')) {
528 $target->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_funnel_or_condition_offset, $seen_analyses );
530 print '[[
'.$target->relative_display_name($ref_pipeline)." ]]\n";
532 print '<<--
'.$target->relative_display_name($ref_pipeline)."\n";