ensembl-hive  2.5
Analysis.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  An Analysis object represents a "stage" of the Hive pipeline that groups together
10  all jobs that share the same module and the same common parameters.
11 
12  Individual Jobs are said to "belong" to an Analysis.
13 
14  Control rules unblock when their condition Analyses are done.
15 
16 =head1 LICENSE
17 
18  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
19  Copyright [2016-2022] EMBL-European Bioinformatics Institute
20 
21  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
22  You may obtain a copy of the License at
23 
24  http://www.apache.org/licenses/LICENSE-2.0
25 
26  Unless required by applicable law or agreed to in writing, software distributed under the License
27  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28  See the License for the specific language governing permissions and limitations under the License.
29 
30 =head1 CONTACT
31 
32  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
33 
34 =cut
35 
36 
37 package Bio::EnsEMBL::Hive::Analysis;
38 
39 use sort 'stable';
40 use strict;
41 use warnings;
42 
43 use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');
46 
47 
48 use base ( 'Bio::EnsEMBL::Hive::Storable' );
49 
50 
51 sub unikey { # override the default from Cacheable parent
52  return [ 'logic_name' ];
53 }
54 
55 
56 =head1 AUTOLOADED
57 
58  resource_class_id / resource_class
59 
60 =cut
61 
62 
63 sub logic_name {
64  my $self = shift;
65  $self->{'_logic_name'} = shift if(@_);
66  return $self->{'_logic_name'};
67 }
68 
69 sub name { # a useful synonym
70  my $self = shift;
71 
72  return $self->logic_name(@_);
73 }
74 
75 
76 sub module {
77  my $self = shift;
78  $self->{'_module'} = shift if(@_);
79  return $self->{'_module'};
80 }
81 
82 
83 sub language {
84  my $self = shift;
85  $self->{'_language'} = shift if(@_);
86  return $self->{'_language'};
87 }
88 
89 
90 sub parameters {
91  my $self = shift;
92  if(@_) {
93  my $parameters = shift @_;
94  $self->{'_parameters'} = ref($parameters) ? stringify($parameters) : $parameters;
95  }
96  return $self->{'_parameters'};
97 }
98 
99 
100 sub comment {
101  my $self = shift;
102  $self->{'_comment'} = shift if(@_);
103  $self->{'_comment'} //= '';
104  return $self->{'_comment'};
105 }
106 
107 
108 sub tags {
109  my $self = shift;
110  $self->{'_tags'} = shift if(@_);
111  $self->{'_tags'} //= '';
112  return $self->{'_tags'};
113 }
114 
115 
116 sub failed_job_tolerance {
117  my $self = shift;
118  $self->{'_failed_job_tolerance'} = shift if(@_);
119  $self->{'_failed_job_tolerance'} //= 0;
120  return $self->{'_failed_job_tolerance'};
121 }
122 
123 
124 sub max_retry_count {
125  my $self = shift;
126  $self->{'_max_retry_count'} = shift if(@_);
127  return $self->{'_max_retry_count'};
128 }
129 
130 
131 sub can_be_empty {
132  my $self = shift;
133  $self->{'_can_be_empty'} = shift if(@_);
134  $self->{'_can_be_empty'} //= 0;
135  return $self->{'_can_be_empty'};
136 }
137 
138 
139 sub priority {
140  my $self = shift;
141  $self->{'_priority'} = shift if(@_);
142  $self->{'_priority'} //= 0;
143  return $self->{'_priority'};
144 }
145 
146 
147 sub meadow_type {
148  my $self = shift;
149  $self->{'_meadow_type'} = shift if(@_);
150  return $self->{'_meadow_type'};
151 }
152 
153 
154 sub analysis_capacity {
155  my $self = shift;
156  $self->{'_analysis_capacity'} = shift if(@_);
157  return $self->{'_analysis_capacity'};
158 }
159 
160 sub hive_capacity {
161  my $self = shift;
162  $self->{'_hive_capacity'} = shift if(@_);
163  return $self->{'_hive_capacity'};
164 }
165 
166 sub batch_size {
167  my $self = shift;
168  $self->{'_batch_size'} = shift if(@_);
169  $self->{'_batch_size'} //= 1; # only initialize when undefined, so if defined as 0 will stay 0
170  return $self->{'_batch_size'};
171 }
172 
173 sub get_compiled_module_name {
174  my $self = shift;
175 
176  my $runnable_module_name = $self->module
177  or die "Analysis '".$self->logic_name."' does not have its 'module' defined";
178 
179  if ($self->language) {
181  if (system($wrapper, 'compile', $runnable_module_name)) {
182  die "The runnable module '$runnable_module_name' cannot be loaded or compiled:\n";
183  }
184  return 'Bio::EnsEMBL::Hive::GuestProcess';
185  }
186 
187  eval "require $runnable_module_name";
188  die "The runnable module '$runnable_module_name' cannot be loaded or compiled:\n$@" if($@);
189  die "Problem accessing methods in '$runnable_module_name'. Please check that it inherits from Bio::EnsEMBL::Hive::Process and is named correctly.\n"
190  unless($runnable_module_name->isa('Bio::EnsEMBL::Hive::Process'));
191 
192  die "DEPRECATED: the strict_hash_format() method is no longer supported in Runnables - the input_id() in '$runnable_module_name' has to be a hash now.\n"
193  if($runnable_module_name->can('strict_hash_format'));
194 
195  return $runnable_module_name;
196 }
197 
198 
199 sub url_query_params {
200  my ($self) = @_;
201 
202  return {
203  'logic_name' => $self->logic_name,
204  };
205 }
206 
207 
208 sub display_name {
209  my ($self) = @_;
210  return $self->logic_name;
211 }
212 
213 
214 =head2 stats
215 
216  Arg [1] : none
217  Example : $stats = $analysis->stats;
218  Description: returns either the previously cached AnalysisStats object, or if it is missing - pulls a fresh one from the DB.
219  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object
220  Exceptions : none
221  Caller : general
222 
223 =cut
224 
225 sub stats {
226  my $self = shift @_;
227 
228  return $self->hive_pipeline->collection_of( 'AnalysisStats' )->find_one_by('analysis', $self);
229 }
230 
231 
232 # --------------------------------- dispatch the following calls directly to our Stats: ---------------------------------------
233 
234 sub status {
235  my $self = shift @_;
236 
237  return $self->stats->status(@_);
238 }
239 
240 # ------------------------------------------------------------------------------------------------------------------------------
241 
242 
243 sub jobs_collection {
244  my $self = shift @_;
245 
246  $self->{'_jobs_collection'} = shift if(@_);
247 
248  return $self->{'_jobs_collection'} ||= [];
249 }
250 
251 
252 sub control_rules_collection {
253  my $self = shift @_;
254 
255  return $self->hive_pipeline->collection_of( 'AnalysisCtrlRule' )->find_all_by('ctrled_analysis', $self);
256 }
257 
258 
259 sub dataflow_rules_collection {
260  my $self = shift @_;
261 
262  return $self->hive_pipeline->collection_of( 'DataflowRule' )->find_all_by('from_analysis', $self);
263 }
264 
265 
266 =head2 get_grouped_dataflow_rules
267 
268  Args : none
269  Example : $groups = $analysis->get_grouped_dataflow_rules;
270  Description: returns a listref of pairs, where the first element is a separate dfr or a funnel, and the second element is a listref of semaphored fan dfrs
271  Returntype : listref
272 
273 =cut
274 
275 sub get_grouped_dataflow_rules {
276  my $self = shift @_;
277 
278  my %set_of_groups = (); # Note that the key (being a stringified reference) is unusable,
279  # so we end up packing it as the first element of the structure,
280  # and only returning the listref of the values.
281  my @ordered_keys = (); # Perl is missing an "Ordered Hash" structure, so we need to maintain the insertion order ourselves
282 
283  my $all_dataflow_rules = $self->dataflow_rules_collection;
284 
285  foreach my $dfr ((grep {$_->funnel_dataflow_rule} @$all_dataflow_rules), (grep {!$_->funnel_dataflow_rule} @$all_dataflow_rules)) {
286 
287  my $df_targets = $dfr->get_my_targets;
288 
289  if(my $funnel_dfr = $dfr->funnel_dataflow_rule) {
290  unless($set_of_groups{$funnel_dfr}) { # both the type check and the initial push will only be done once per funnel
291  my $funnel_targets = $funnel_dfr->get_my_targets;
292  foreach my $funnel_target (@$funnel_targets) {
293  unless($funnel_target->to_analysis->isa('Bio::EnsEMBL::Hive::Analysis')) {
294  throw("Each conditional branch of a semaphored funnel rule must point at an Analysis");
295  }
296  }
297  push @ordered_keys, $funnel_dfr;
298  $set_of_groups{$funnel_dfr} = [$funnel_dfr, [], $funnel_targets];
299  }
300  my $this_group = $set_of_groups{$funnel_dfr};
301 
302  foreach my $df_target (@$df_targets) {
303  unless($df_target->to_analysis->isa('Bio::EnsEMBL::Hive::Analysis')) {
304  throw("Each conditional branch of a semaphored fan rule must point at an Analysis");
305  }
306  }
307  push @{$this_group->[1]}, $dfr;
308 
309  } elsif (!$set_of_groups{$dfr}) {
310  push @ordered_keys, $dfr;
311  $set_of_groups{$dfr} = [$dfr, [], $df_targets];
312  }
313  }
314  my @sorted_rules = sort { scalar(@{$set_of_groups{$a}->[1]}) <=> scalar(@{$set_of_groups{$b}->[1]}) or $set_of_groups{$a}->[0]->branch_code <=> $set_of_groups{$b}->[0]->branch_code } @ordered_keys;
315  return [map {$set_of_groups{$_}} @sorted_rules];
316 }
317 
318 
319 sub dataflow_rules_by_branch {
320  my $self = shift @_;
321 
322  if (not $self->{'_dataflow_rules_by_branch'}) {
323  my %dataflow_rules_by_branch = ();
324  foreach my $df_rule (@{$self->dataflow_rules_collection}) {
325  my $dfr_bb = $dataflow_rules_by_branch{ $df_rule->branch_code } ||= []; # no autovivification here, have to do it manually
326 
327  if($df_rule->funnel_dataflow_rule) { # sort rules so that semaphored fan rules come before other (potentially fan) rules for the same branch_code
328  unshift @$dfr_bb, $df_rule;
329  } else {
330  push @$dfr_bb, $df_rule;
331  }
332  }
333  $self->{'_dataflow_rules_by_branch'} = \%dataflow_rules_by_branch;
334  }
335 
336  return $self->{'_dataflow_rules_by_branch'};
337 }
338 
339 
340 sub dataflow {
341  my ( $self, $output_ids_for_this_rule, $emitting_job, $same_db_dataflow, $push_emitting_job_on_stack, $df_rule ) = @_;
342 
343  my $param_id_stack = '';
344  my $accu_id_stack = '';
345  my $emitting_job_id = undef;
346 
347  if($same_db_dataflow) {
348  $param_id_stack = $emitting_job->param_id_stack;
349  $accu_id_stack = $emitting_job->accu_id_stack;
350  $emitting_job_id = $emitting_job->dbID;
351 
352  if($push_emitting_job_on_stack) {
353  my $input_id = $emitting_job->input_id;
354  my $accu_hash = $emitting_job->accu_hash;
355 
356  if($input_id and ($input_id ne '{}')) { # add the parent to the param_id_stack if it had non-trivial extra parameters
357  $param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$emitting_job_id;
358  }
359  if(scalar(keys %$accu_hash)) { # add the parent to the accu_id_stack if it had "own" accumulator
360  $accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$emitting_job_id;
361  }
362  }
363  }
364 
365  my $common_params = [
366  'prev_job' => $emitting_job,
367  'analysis' => $self,
368  'hive_pipeline' => $self->hive_pipeline, # Although we may not cache jobs, make sure a new Job "belongs" to the same pipeline as its Analysis
369  'param_id_stack' => $param_id_stack,
370  'accu_id_stack' => $accu_id_stack,
371  ];
372 
373  my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
374  my @output_job_ids = ();
375 
376  if( my $funnel_dataflow_rule = $df_rule->funnel_dataflow_rule ) { # members of a semaphored fan will have to wait in cache until the funnel is created:
377 
378  my $fan_cache_this_branch = $emitting_job->fan_cache->{"$funnel_dataflow_rule"} ||= [];
379  push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new(
380  @$common_params,
381  'input_id' => $_,
382  # controlled_semaphore => to be set when the $controlled_semaphore has been stored
383  ) } @$output_ids_for_this_rule;
384 
385  } else { # either a semaphored funnel or a non-semaphored dataflow:
386 
387  my $fan_jobs = delete $emitting_job->fan_cache->{"$df_rule"}; # clear the cache at the same time
388 
389  if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel
390 
391  if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
392 
393  $emitting_job->transient_error(0);
394  die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
395 
396  } else {
397  my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new(
398  @$common_params,
399  'input_id' => $output_ids_for_this_rule->[0],
400  'status' => 'SEMAPHORED',
401  );
402 
403  # NB: $job_adaptor happens to belong to the $funnel_job, but not necesarily to $fan_jobs or $emitting_job
404  my ($semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
405 
406  push @output_job_ids, $funnel_job_id, @fan_job_ids;
407  }
408  } else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
409  my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
410  @$common_params,
411  'input_id' => $_,
412  'controlled_semaphore' => $emitting_job->controlled_semaphore, # propagate parent's semaphore if any
413  ) } @$output_ids_for_this_rule;
414 
415  # NB: $job_adaptor happens to belong to the @non_semaphored_jobs, but not necessarily to the $emitting_job :
416  push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0, $emitting_job_id) };
417  }
418  } # /if funnel
419 
420  return \@output_job_ids;
421 }
422 
423 
424 sub toString {
425  my $self = shift @_;
426 
427  return 'Analysis['.($self->dbID // '').']: '.$self->display_name.'->('.join(', ', ($self->module // 'no_module').($self->language ? sprintf(' (%s)', $self->language) : ''), $self->parameters // '{}', $self->resource_class ? $self->resource_class->name : 'no_rc').')';
428 }
429 
430 
431 sub print_diagram_node {
432  my ($self, $ref_pipeline, $prefix, $seen_analyses) = @_;
433 
434  if($seen_analyses->{$self}++) {
435  print "(".$self->relative_display_name($ref_pipeline).")\n"; # NB: the prefix of the label itself is done by the previous level
436  return;
437  }
438 
439  print $self->relative_display_name($ref_pipeline)."\n"; # NB: the prefix of the label itself is done by the previous level
440 
441  my $groups = $self->get_grouped_dataflow_rules;
442 
443  foreach my $i (0..scalar(@$groups)-1) {
444 
445  my ($funnel_dfr, $fan_dfrs, $df_targets) = @{ $groups->[$i] };
446 
447  my $this_funnel_offset = '';
448 
449  if(scalar(@$groups)>1 and scalar(@$fan_dfrs)) { # if more than one group (no single backbone) and semaphored, the semaphored group will be offset:
450  print $prefix." │\n";
451  print $prefix." ╘════╤══╗\n"; # " └────┬──┐\n";
452  $this_funnel_offset = ($i < scalar(@$groups)-1) ? '' : ' '; # non-last vs last group
453  }
454 
455  foreach my $j (0..scalar(@$fan_dfrs)-1) { # for each of the dependent fan rules, show them one by one:
456  my $fan_dfr = $fan_dfrs->[$j];
457  my $fan_branch = $fan_dfr->branch_code;
458 
459  print $prefix.$this_funnel_offset." │ ║\n";
460  print $prefix.$this_funnel_offset." │ ║\n";
461  print $prefix.$this_funnel_offset." │ ║#$fan_branch\n";
462 
463  my $fan_df_targets = $fan_dfr->get_my_targets;
464 
465  foreach my $k (0..scalar(@$fan_df_targets)-1) { # for each fan's target
466  my $fan_target = $fan_df_targets->[$k];
467 
468  print $prefix.$this_funnel_offset." │ ║\n";
469 
470  if(my $fan_choice = (scalar(@$fan_df_targets)!=1) || defined($fan_target->on_condition)) {
471  if(defined(my $on_condition = $fan_target->on_condition)) {
472  print $prefix.$this_funnel_offset." │ ║ WHEN $on_condition\n";
473  } else {
474  print $prefix.$this_funnel_offset." │ ║ ELSE\n";
475  }
476  }
477  print $prefix.$this_funnel_offset.' │├─╚═> ';
478 
479  my $next_fan_or_condition_offset = ($j<scalar(@$fan_dfrs)-1 or $k<scalar(@$fan_df_targets)-1) ? ' │ ║ ' : ' │ ';
480 
481  if(my $template = $fan_target->input_id_template) {
482  print "$template\n";
483  print $prefix.$this_funnel_offset.$next_fan_or_condition_offset." │\n";
484  print $prefix.$this_funnel_offset.$next_fan_or_condition_offset." V\n";
485  print $prefix.$this_funnel_offset.$next_fan_or_condition_offset;
486  }
487 
488  $fan_target->to_analysis->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_fan_or_condition_offset, $seen_analyses );
489  }
490  }
491 
492  my $funnel_branch = $funnel_dfr->branch_code;
493 
494  print $prefix.$this_funnel_offset." │\n";
495  print $prefix.$this_funnel_offset." │\n";
496  print $prefix.$this_funnel_offset." │#$funnel_branch\n";
497 
498  foreach my $k (0..scalar(@$df_targets)-1) { # for each funnel's target
499  my $df_target = $df_targets->[$k];
500 
501  print $prefix.$this_funnel_offset." │\n";
502 
503  my $funnel_choice = (scalar(@$df_targets)!=1) || defined($df_target->on_condition);
504 
505  if($funnel_choice) {
506  if(defined(my $on_condition = $df_target->on_condition)) {
507  print $prefix.$this_funnel_offset." │ WHEN $on_condition\n";
508  } else {
509  print $prefix.$this_funnel_offset." │ ELSE\n";
510  }
511  }
512 
513  my $next_funnel_or_condition_offset = '';
514 
515  if( (scalar(@$groups)==1 or $this_funnel_offset) and !$funnel_choice ) { # 'the only group' (backbone) or a semaphore funnel ...
516  print $prefix.$this_funnel_offset." V\n"; # ... make a vertical arrow
517  print $prefix.$this_funnel_offset;
518  } else {
519  print $prefix.$this_funnel_offset.' └─▻ '; # otherwise fork to the right
520  $next_funnel_or_condition_offset = ($i<scalar(@$groups)-1 or $k<scalar(@$df_targets)-1) ? '' : ' ';
521  }
522  if(my $template = $df_target->input_id_template) {
523  print "$template\n";
524  print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." │\n";
525  print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." V\n";
526  print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset;
527  }
528 
529  my $target = $df_target->to_analysis;
530  if($target->can('print_diagram_node')) {
531  $target->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_funnel_or_condition_offset, $seen_analyses );
532  } elsif($target->isa('Bio::EnsEMBL::Hive::NakedTable')) {
533  print '[[ '.$target->relative_display_name($ref_pipeline)." ]]\n";
534  } elsif($target->isa('Bio::EnsEMBL::Hive::Accumulator')) {
535  print '<<-- '.$target->relative_display_name($ref_pipeline)."\n";
536  }
537  }
538  }
539 }
540 
541 1;
542 
protected String _get_wrapper_for_language()
public Bio::EnsEMBL::Hive::Storable new()