ensembl-hive  2.6
Analysis.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  An Analysis object represents a "stage" of the Hive pipeline that groups together
10  all jobs that share the same module and the same common parameters.
11 
12  Individual Jobs are said to "belong" to an Analysis.
13 
14  Control rules unblock when their condition Analyses are done.
15 
16 =head1 LICENSE
17 
18  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
19  Copyright [2016-2024] EMBL-European Bioinformatics Institute
20 
21  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
22  You may obtain a copy of the License at
23 
24  http://www.apache.org/licenses/LICENSE-2.0
25 
26  Unless required by applicable law or agreed to in writing, software distributed under the License
27  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28  See the License for the specific language governing permissions and limitations under the License.
29 
30 =head1 CONTACT
31 
32  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
33 
34 =cut
35 
36 
37 package Bio::EnsEMBL::Hive::Analysis;
38 
39 use sort 'stable';
40 use strict;
41 use warnings;
42 
43 use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');
46 
47 
48 use base ( 'Bio::EnsEMBL::Hive::Storable' );
49 
50 
51 sub unikey { # override the default from Cacheable parent
52  return [ 'logic_name' ];
53 }
54 
55 
56 =head1 AUTOLOADED
57 
58  resource_class_id / resource_class
59 
60 =cut
61 
62 
63 sub logic_name {
64  my $self = shift;
65  $self->{'_logic_name'} = shift if(@_);
66  return $self->{'_logic_name'};
67 }
68 
69 sub name { # a useful synonym
70  my $self = shift;
71 
72  return $self->logic_name(@_);
73 }
74 
75 
76 sub module {
77  my $self = shift;
78  $self->{'_module'} = shift if(@_);
79  return $self->{'_module'};
80 }
81 
82 
83 sub language {
84  my $self = shift;
85  $self->{'_language'} = shift if(@_);
86  return $self->{'_language'};
87 }
88 
89 
90 sub parameters {
91  my $self = shift;
92  if(@_) {
93  my $parameters = shift @_;
94  $self->{'_parameters'} = ref($parameters) ? stringify($parameters) : $parameters;
95  }
96  return $self->{'_parameters'};
97 }
98 
99 
100 sub comment {
101  my $self = shift;
102  $self->{'_comment'} = shift if(@_);
103  $self->{'_comment'} //= '';
104  return $self->{'_comment'};
105 }
106 
107 
108 sub tags {
109  my $self = shift;
110  $self->{'_tags'} = shift if(@_);
111  $self->{'_tags'} //= '';
112  return $self->{'_tags'};
113 }
114 
115 
116 sub failed_job_tolerance {
117  my $self = shift;
118  $self->{'_failed_job_tolerance'} = shift if(@_);
119  $self->{'_failed_job_tolerance'} //= 0;
120  return $self->{'_failed_job_tolerance'};
121 }
122 
123 
124 sub max_retry_count {
125  my $self = shift;
126  $self->{'_max_retry_count'} = shift if(@_);
127  return $self->{'_max_retry_count'};
128 }
129 
130 
131 sub can_be_empty {
132  my $self = shift;
133  $self->{'_can_be_empty'} = shift if(@_);
134  $self->{'_can_be_empty'} //= 0;
135  return $self->{'_can_be_empty'};
136 }
137 
138 
139 sub priority {
140  my $self = shift;
141  $self->{'_priority'} = shift if(@_);
142  $self->{'_priority'} //= 0;
143  return $self->{'_priority'};
144 }
145 
146 
147 sub meadow_type {
148  my $self = shift;
149  $self->{'_meadow_type'} = shift if(@_);
150  return $self->{'_meadow_type'};
151 }
152 
153 
154 sub analysis_capacity {
155  my $self = shift;
156  $self->{'_analysis_capacity'} = shift if(@_);
157  return $self->{'_analysis_capacity'};
158 }
159 
160 sub hive_capacity {
161  my $self = shift;
162  $self->{'_hive_capacity'} = shift if(@_);
163  return $self->{'_hive_capacity'};
164 }
165 
166 sub batch_size {
167  my $self = shift;
168  $self->{'_batch_size'} = shift if(@_);
169  $self->{'_batch_size'} //= 1; # only initialize when undefined, so if defined as 0 will stay 0
170  return $self->{'_batch_size'};
171 }
172 
173 sub get_compiled_module_name {
174  my $self = shift;
175 
176  my $runnable_module_name = $self->module
177  or die "Analysis '".$self->logic_name."' does not have its 'module' defined";
178 
179  if ($self->language) {
180  Bio::EnsEMBL::Hive::GuestProcess::assert_runnable_exists($self->language, $runnable_module_name);
181  return 'Bio::EnsEMBL::Hive::GuestProcess';
182  }
183 
184  eval "require $runnable_module_name";
185  die "The runnable module '$runnable_module_name' cannot be loaded or compiled:\n$@" if($@);
186  die "Problem accessing methods in '$runnable_module_name'. Please check that it inherits from Bio::EnsEMBL::Hive::Process and is named correctly.\n"
187  unless($runnable_module_name->isa('Bio::EnsEMBL::Hive::Process'));
188 
189  die "DEPRECATED: the strict_hash_format() method is no longer supported in Runnables - the input_id() in '$runnable_module_name' has to be a hash now.\n"
190  if($runnable_module_name->can('strict_hash_format'));
191 
192  return $runnable_module_name;
193 }
194 
195 
196 sub url_query_params {
197  my ($self) = @_;
198 
199  return {
200  'logic_name' => $self->logic_name,
201  };
202 }
203 
204 
205 sub display_name {
206  my ($self) = @_;
207  return $self->logic_name;
208 }
209 
210 
211 =head2 stats
212 
213  Arg [1] : none
214  Example : $stats = $analysis->stats;
215  Description: returns either the previously cached AnalysisStats object, or if it is missing - pulls a fresh one from the DB.
216  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object
217  Exceptions : none
218  Caller : general
219 
220 =cut
221 
222 sub stats {
223  my $self = shift @_;
224 
225  return $self->hive_pipeline->collection_of( 'AnalysisStats' )->find_one_by('analysis', $self);
226 }
227 
228 
229 # --------------------------------- dispatch the following calls directly to our Stats: ---------------------------------------
230 
231 sub status {
232  my $self = shift @_;
233 
234  return $self->stats->status(@_);
235 }
236 
237 # ------------------------------------------------------------------------------------------------------------------------------
238 
239 
240 sub jobs_collection {
241  my $self = shift @_;
242 
243  $self->{'_jobs_collection'} = shift if(@_);
244 
245  return $self->{'_jobs_collection'} ||= [];
246 }
247 
248 
249 sub control_rules_collection {
250  my $self = shift @_;
251 
252  return $self->hive_pipeline->collection_of( 'AnalysisCtrlRule' )->find_all_by('ctrled_analysis', $self);
253 }
254 
255 
256 sub dataflow_rules_collection {
257  my $self = shift @_;
258 
259  return $self->hive_pipeline->collection_of( 'DataflowRule' )->find_all_by('from_analysis', $self);
260 }
261 
262 
263 =head2 get_grouped_dataflow_rules
264 
265  Args : none
266  Example : $groups = $analysis->get_grouped_dataflow_rules;
267  Description: returns a listref of pairs, where the first element is a separate dfr or a funnel, and the second element is a listref of semaphored fan dfrs
268  Returntype : listref
269 
270 =cut
271 
272 sub get_grouped_dataflow_rules {
273  my $self = shift @_;
274 
275  my %set_of_groups = (); # Note that the key (being a stringified reference) is unusable,
276  # so we end up packing it as the first element of the structure,
277  # and only returning the listref of the values.
278  my @ordered_keys = (); # Perl is missing an "Ordered Hash" structure, so we need to maintain the insertion order ourselves
279 
280  my $all_dataflow_rules = $self->dataflow_rules_collection;
281 
282  foreach my $dfr ((grep {$_->funnel_dataflow_rule} @$all_dataflow_rules), (grep {!$_->funnel_dataflow_rule} @$all_dataflow_rules)) {
283 
284  my $df_targets = $dfr->get_my_targets;
285 
286  if(my $funnel_dfr = $dfr->funnel_dataflow_rule) {
287  unless($set_of_groups{$funnel_dfr}) { # both the type check and the initial push will only be done once per funnel
288  my $funnel_targets = $funnel_dfr->get_my_targets;
289  foreach my $funnel_target (@$funnel_targets) {
290  unless($funnel_target->to_analysis->isa('Bio::EnsEMBL::Hive::Analysis')) {
291  throw("Each conditional branch of a semaphored funnel rule must point at an Analysis");
292  }
293  }
294  push @ordered_keys, $funnel_dfr;
295  $set_of_groups{$funnel_dfr} = [$funnel_dfr, [], $funnel_targets];
296  }
297  my $this_group = $set_of_groups{$funnel_dfr};
298 
299  foreach my $df_target (@$df_targets) {
300  unless($df_target->to_analysis->isa('Bio::EnsEMBL::Hive::Analysis')) {
301  throw("Each conditional branch of a semaphored fan rule must point at an Analysis");
302  }
303  }
304  push @{$this_group->[1]}, $dfr;
305 
306  } elsif (!$set_of_groups{$dfr}) {
307  push @ordered_keys, $dfr;
308  $set_of_groups{$dfr} = [$dfr, [], $df_targets];
309  }
310  }
311  my @sorted_rules = sort { scalar(@{$set_of_groups{$a}->[1]}) <=> scalar(@{$set_of_groups{$b}->[1]}) or $set_of_groups{$a}->[0]->branch_code <=> $set_of_groups{$b}->[0]->branch_code } @ordered_keys;
312  return [map {$set_of_groups{$_}} @sorted_rules];
313 }
314 
315 
316 sub dataflow_rules_by_branch {
317  my $self = shift @_;
318 
319  if (not $self->{'_dataflow_rules_by_branch'}) {
320  my %dataflow_rules_by_branch = ();
321  foreach my $df_rule (@{$self->dataflow_rules_collection}) {
322  my $dfr_bb = $dataflow_rules_by_branch{ $df_rule->branch_code } ||= []; # no autovivification here, have to do it manually
323 
324  if($df_rule->funnel_dataflow_rule) { # sort rules so that semaphored fan rules come before other (potentially fan) rules for the same branch_code
325  unshift @$dfr_bb, $df_rule;
326  } else {
327  push @$dfr_bb, $df_rule;
328  }
329  }
330  $self->{'_dataflow_rules_by_branch'} = \%dataflow_rules_by_branch;
331  }
332 
333  return $self->{'_dataflow_rules_by_branch'};
334 }
335 
336 
337 sub dataflow {
338  my ( $self, $output_ids_for_this_rule, $emitting_job, $same_db_dataflow, $push_emitting_job_on_stack, $df_rule ) = @_;
339 
340  my $param_id_stack = '';
341  my $accu_id_stack = '';
342  my $emitting_job_id = undef;
343 
344  if($same_db_dataflow) {
345  $param_id_stack = $emitting_job->param_id_stack;
346  $accu_id_stack = $emitting_job->accu_id_stack;
347  $emitting_job_id = $emitting_job->dbID;
348 
349  if($push_emitting_job_on_stack) {
350  my $input_id = $emitting_job->input_id;
351  my $accu_hash = $emitting_job->accu_hash;
352 
353  if($input_id and ($input_id ne '{}')) { # add the parent to the param_id_stack if it had non-trivial extra parameters
354  $param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$emitting_job_id;
355  }
356  if(scalar(keys %$accu_hash)) { # add the parent to the accu_id_stack if it had "own" accumulator
357  $accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$emitting_job_id;
358  }
359  }
360  }
361 
362  my $common_params = [
363  'prev_job' => $emitting_job,
364  'analysis' => $self,
365  'hive_pipeline' => $self->hive_pipeline, # Although we may not cache jobs, make sure a new Job "belongs" to the same pipeline as its Analysis
366  'param_id_stack' => $param_id_stack,
367  'accu_id_stack' => $accu_id_stack,
368  ];
369 
370  my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
371  my @output_job_ids = ();
372 
373  if( my $funnel_dataflow_rule = $df_rule->funnel_dataflow_rule ) { # members of a semaphored fan will have to wait in cache until the funnel is created:
374 
375  my $fan_cache_this_branch = $emitting_job->fan_cache->{"$funnel_dataflow_rule"} ||= [];
376  push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new(
377  @$common_params,
378  'input_id' => $_,
379  # controlled_semaphore => to be set when the $controlled_semaphore has been stored
380  ) } @$output_ids_for_this_rule;
381 
382  } else { # either a semaphored funnel or a non-semaphored dataflow:
383 
384  my $fan_jobs = delete $emitting_job->fan_cache->{"$df_rule"}; # clear the cache at the same time
385 
386  if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel
387 
388  if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
389 
390  $emitting_job->transient_error(0);
391  die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
392 
393  } else {
394  my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new(
395  @$common_params,
396  'input_id' => $output_ids_for_this_rule->[0],
397  'status' => 'SEMAPHORED',
398  );
399 
400  # NB: $job_adaptor happens to belong to the $funnel_job, but not necesarily to $fan_jobs or $emitting_job
401  my ($semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
402 
403  push @output_job_ids, $funnel_job_id, @fan_job_ids;
404  }
405  } else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
406  my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
407  @$common_params,
408  'input_id' => $_,
409  'controlled_semaphore' => $emitting_job->controlled_semaphore, # propagate parent's semaphore if any
410  ) } @$output_ids_for_this_rule;
411 
412  # NB: $job_adaptor happens to belong to the @non_semaphored_jobs, but not necessarily to the $emitting_job :
413  push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0, $emitting_job_id) };
414  }
415  } # /if funnel
416 
417  return \@output_job_ids;
418 }
419 
420 
421 sub toString {
422  my $self = shift @_;
423 
424  return 'Analysis['.($self->dbID // '').']: '.$self->display_name.'->('.join(', ', ($self->module // 'no_module').($self->language ? sprintf(' (%s)', $self->language) : ''), $self->parameters // '{}', $self->resource_class ? $self->resource_class->name : 'no_rc').')';
425 }
426 
427 
428 sub print_diagram_node {
429  my ($self, $ref_pipeline, $prefix, $seen_analyses) = @_;
430 
431  if($seen_analyses->{$self}++) {
432  print "(".$self->relative_display_name($ref_pipeline).")\n"; # NB: the prefix of the label itself is done by the previous level
433  return;
434  }
435 
436  print $self->relative_display_name($ref_pipeline)."\n"; # NB: the prefix of the label itself is done by the previous level
437 
438  my $groups = $self->get_grouped_dataflow_rules;
439 
440  foreach my $i (0..scalar(@$groups)-1) {
441 
442  my ($funnel_dfr, $fan_dfrs, $df_targets) = @{ $groups->[$i] };
443 
444  my $this_funnel_offset = '';
445 
446  if(scalar(@$groups)>1 and scalar(@$fan_dfrs)) { # if more than one group (no single backbone) and semaphored, the semaphored group will be offset:
447  print $prefix." │\n";
448  print $prefix." ╘════╤══╗\n"; # " └────┬──┐\n";
449  $this_funnel_offset = ($i < scalar(@$groups)-1) ? '' : ' '; # non-last vs last group
450  }
451 
452  foreach my $j (0..scalar(@$fan_dfrs)-1) { # for each of the dependent fan rules, show them one by one:
453  my $fan_dfr = $fan_dfrs->[$j];
454  my $fan_branch = $fan_dfr->branch_code;
455 
456  print $prefix.$this_funnel_offset." │ ║\n";
457  print $prefix.$this_funnel_offset." │ ║\n";
458  print $prefix.$this_funnel_offset." │ ║#$fan_branch\n";
459 
460  my $fan_df_targets = $fan_dfr->get_my_targets;
461 
462  foreach my $k (0..scalar(@$fan_df_targets)-1) { # for each fan's target
463  my $fan_target = $fan_df_targets->[$k];
464 
465  print $prefix.$this_funnel_offset." │ ║\n";
466 
467  if(my $fan_choice = (scalar(@$fan_df_targets)!=1) || defined($fan_target->on_condition)) {
468  if(defined(my $on_condition = $fan_target->on_condition)) {
469  print $prefix.$this_funnel_offset." │ ║ WHEN $on_condition\n";
470  } else {
471  print $prefix.$this_funnel_offset." │ ║ ELSE\n";
472  }
473  }
474  print $prefix.$this_funnel_offset.' │├─╚═> ';
475 
476  my $next_fan_or_condition_offset = ($j<scalar(@$fan_dfrs)-1 or $k<scalar(@$fan_df_targets)-1) ? ' │ ║ ' : ' │ ';
477 
478  if(my $template = $fan_target->input_id_template) {
479  print "$template\n";
480  print $prefix.$this_funnel_offset.$next_fan_or_condition_offset." │\n";
481  print $prefix.$this_funnel_offset.$next_fan_or_condition_offset." V\n";
482  print $prefix.$this_funnel_offset.$next_fan_or_condition_offset;
483  }
484 
485  $fan_target->to_analysis->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_fan_or_condition_offset, $seen_analyses );
486  }
487  }
488 
489  my $funnel_branch = $funnel_dfr->branch_code;
490 
491  print $prefix.$this_funnel_offset." │\n";
492  print $prefix.$this_funnel_offset." │\n";
493  print $prefix.$this_funnel_offset." │#$funnel_branch\n";
494 
495  foreach my $k (0..scalar(@$df_targets)-1) { # for each funnel's target
496  my $df_target = $df_targets->[$k];
497 
498  print $prefix.$this_funnel_offset." │\n";
499 
500  my $funnel_choice = (scalar(@$df_targets)!=1) || defined($df_target->on_condition);
501 
502  if($funnel_choice) {
503  if(defined(my $on_condition = $df_target->on_condition)) {
504  print $prefix.$this_funnel_offset." │ WHEN $on_condition\n";
505  } else {
506  print $prefix.$this_funnel_offset." │ ELSE\n";
507  }
508  }
509 
510  my $next_funnel_or_condition_offset = '';
511 
512  if( (scalar(@$groups)==1 or $this_funnel_offset) and !$funnel_choice ) { # 'the only group' (backbone) or a semaphore funnel ...
513  print $prefix.$this_funnel_offset." V\n"; # ... make a vertical arrow
514  print $prefix.$this_funnel_offset;
515  } else {
516  print $prefix.$this_funnel_offset.' └─▻ '; # otherwise fork to the right
517  $next_funnel_or_condition_offset = ($i<scalar(@$groups)-1 or $k<scalar(@$df_targets)-1) ? '' : ' ';
518  }
519  if(my $template = $df_target->input_id_template) {
520  print "$template\n";
521  print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." │\n";
522  print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." V\n";
523  print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset;
524  }
525 
526  my $target = $df_target->to_analysis;
527  if($target->can('print_diagram_node')) {
528  $target->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_funnel_or_condition_offset, $seen_analyses );
529  } elsif($target->isa('Bio::EnsEMBL::Hive::NakedTable')) {
530  print '[[ '.$target->relative_display_name($ref_pipeline)." ]]\n";
531  } elsif($target->isa('Bio::EnsEMBL::Hive::Accumulator')) {
532  print '<<-- '.$target->relative_display_name($ref_pipeline)."\n";
533  }
534  }
535  }
536 }
537 
538 1;
539 
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::NakedTable
Definition: NakedTable.pm:10
Bio::EnsEMBL::Hive::GuestProcess
Definition: GuestProcess.pm:105
map
public map()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Storable::new
public Bio::EnsEMBL::Hive::Storable new()
Bio::EnsEMBL::Hive::AnalysisStats
Definition: AnalysisStats.pm:12
Bio::EnsEMBL::Hive::Cacheable::hive_pipeline
public hive_pipeline()
Bio::EnsEMBL::Hive::GuestProcess::assert_runnable_exists
public void assert_runnable_exists()
Bio::EnsEMBL::Hive::Cacheable
Definition: Cacheable.pm:6
Bio::EnsEMBL::Hive::Accumulator
Definition: Accumulator.pm:11
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::AnalysisJob
Definition: AnalysisJob.pm:13
Bio::EnsEMBL::Hive::Analysis
Definition: Analysis.pm:18