9 An Analysis
object represents a
"stage" of the Hive pipeline that groups together
10 all jobs that share the same module and the same common parameters.
12 Individual Jobs are said to
"belong" to an Analysis.
14 Control rules unblock when their condition Analyses are done.
18 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
19 Copyright [2016-2022] EMBL-European Bioinformatics Institute
21 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
22 You may obtain a copy of the License at
26 Unless required by applicable law or agreed to in writing, software distributed under the License
27 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28 See the License
for the specific language governing permissions and limitations under the License.
32 Please subscribe to the
Hive mailing list: http:
37 package Bio::EnsEMBL::Hive::Analysis;
48 use base (
'Bio::EnsEMBL::Hive::Storable' );
51 sub unikey { #
override the
default from
Cacheable parent
52 return [
'logic_name' ];
58 resource_class_id / resource_class
65 $self->{
'_logic_name'} = shift
if(@_);
66 return $self->{
'_logic_name'};
69 sub name { # a useful synonym
72 return $self->logic_name(@_);
78 $self->{
'_module'} = shift
if(@_);
79 return $self->{
'_module'};
85 $self->{
'_language'} = shift
if(@_);
86 return $self->{
'_language'};
93 my $parameters = shift @_;
94 $self->{
'_parameters'} = ref($parameters) ? stringify($parameters) : $parameters;
96 return $self->{
'_parameters'};
102 $self->{
'_comment'} = shift
if(@_);
104 return $self->{
'_comment'};
110 $self->{
'_tags'} = shift
if(@_);
112 return $self->{
'_tags'};
116 sub failed_job_tolerance {
118 $self->{
'_failed_job_tolerance'} = shift
if(@_);
119 $self->{
'_failed_job_tolerance'}
120 return $self->{
'_failed_job_tolerance'};
124 sub max_retry_count {
126 $self->{
'_max_retry_count'} = shift
if(@_);
127 return $self->{
'_max_retry_count'};
133 $self->{
'_can_be_empty'} = shift
if(@_);
134 $self->{
'_can_be_empty'}
135 return $self->{
'_can_be_empty'};
141 $self->{
'_priority'} = shift
if(@_);
143 return $self->{
'_priority'};
149 $self->{
'_meadow_type'} = shift
if(@_);
150 return $self->{
'_meadow_type'};
154 sub analysis_capacity {
156 $self->{
'_analysis_capacity'} = shift
if(@_);
157 return $self->{
'_analysis_capacity'};
162 $self->{
'_hive_capacity'} = shift
if(@_);
163 return $self->{
'_hive_capacity'};
168 $self->{
'_batch_size'} = shift
if(@_);
169 $self->{
'_batch_size'}
170 return $self->{
'_batch_size'};
173 sub get_compiled_module_name {
176 my $runnable_module_name = $self->module
177 or die
"Analysis '".$self->logic_name.
"' does not have its 'module' defined";
179 if ($self->language) {
181 if (system($wrapper,
'compile', $runnable_module_name)) {
182 die
"The runnable module '$runnable_module_name' cannot be loaded or compiled:\n";
184 return 'Bio::EnsEMBL::Hive::GuestProcess';
187 eval
"require $runnable_module_name";
188 die
"The runnable module '$runnable_module_name' cannot be loaded or compiled:\n$@" if($@);
189 die
"Problem accessing methods in '$runnable_module_name'. Please check that it inherits from Bio::EnsEMBL::Hive::Process and is named correctly.\n" 190 unless($runnable_module_name->isa(
'Bio::EnsEMBL::Hive::Process'));
192 die
"DEPRECATED: the strict_hash_format() method is no longer supported in Runnables - the input_id() in '$runnable_module_name' has to be a hash now.\n" 193 if($runnable_module_name->can(
'strict_hash_format'));
195 return $runnable_module_name;
199 sub url_query_params {
203 'logic_name' => $self->logic_name,
210 return $self->logic_name;
217 Example : $stats = $analysis->stats;
218 Description: returns either the previously cached
AnalysisStats object, or
if it is missing - pulls a fresh one from the DB.
228 return $self->
hive_pipeline->collection_of(
'AnalysisStats' )->find_one_by(
'analysis', $self);
232 # --------------------------------- dispatch the following calls directly to our Stats: --------------------------------------- 237 return $self->stats->status(@_);
240 # ------------------------------------------------------------------------------------------------------------------------------ 243 sub jobs_collection {
246 $self->{
'_jobs_collection'} = shift
if(@_);
248 return $self->{
'_jobs_collection'} ||= [];
252 sub control_rules_collection {
255 return $self->hive_pipeline->collection_of(
'AnalysisCtrlRule' )->find_all_by(
'ctrled_analysis', $self);
259 sub dataflow_rules_collection {
262 return $self->hive_pipeline->collection_of(
'DataflowRule' )->find_all_by(
'from_analysis', $self);
266 =head2 get_grouped_dataflow_rules
269 Example : $groups = $analysis->get_grouped_dataflow_rules;
270 Description: returns a listref of pairs, where the first element is a separate dfr or a funnel, and the second element is a listref of semaphored fan dfrs
275 sub get_grouped_dataflow_rules {
278 my %set_of_groups = (); # Note that the key (being a stringified reference) is unusable,
279 # so we end up packing it as the first element of the structure, 280 # and only returning the listref of the values. 281 my @ordered_keys = (); # Perl is missing an
"Ordered Hash" structure, so we need to maintain the insertion order ourselves
283 my $all_dataflow_rules = $self->dataflow_rules_collection;
285 foreach my $dfr ((grep {$_->funnel_dataflow_rule} @$all_dataflow_rules), (grep {!$_->funnel_dataflow_rule} @$all_dataflow_rules)) {
287 my $df_targets = $dfr->get_my_targets;
289 if(my $funnel_dfr = $dfr->funnel_dataflow_rule) {
290 unless($set_of_groups{$funnel_dfr}) { # both the type check and the initial push will only be done once per funnel
291 my $funnel_targets = $funnel_dfr->get_my_targets;
292 foreach my $funnel_target (@$funnel_targets) {
293 unless($funnel_target->to_analysis->isa(
'Bio::EnsEMBL::Hive::Analysis')) {
294 throw(
"Each conditional branch of a semaphored funnel rule must point at an Analysis");
297 push @ordered_keys, $funnel_dfr;
298 $set_of_groups{$funnel_dfr} = [$funnel_dfr, [], $funnel_targets];
300 my $this_group = $set_of_groups{$funnel_dfr};
302 foreach my $df_target (@$df_targets) {
303 unless($df_target->to_analysis->isa(
'Bio::EnsEMBL::Hive::Analysis')) {
304 throw(
"Each conditional branch of a semaphored fan rule must point at an Analysis");
307 push @{$this_group->[1]}, $dfr;
309 } elsif (!$set_of_groups{$dfr}) {
310 push @ordered_keys, $dfr;
311 $set_of_groups{$dfr} = [$dfr, [], $df_targets];
314 my @sorted_rules = sort { scalar(@{$set_of_groups{$a}->[1]}) <=> scalar(@{$set_of_groups{$b}->[1]}) or $set_of_groups{$a}->[0]->branch_code <=> $set_of_groups{$b}->[0]->branch_code } @ordered_keys;
315 return [map {$set_of_groups{$_}} @sorted_rules];
319 sub dataflow_rules_by_branch {
322 if (not $self->{
'_dataflow_rules_by_branch'}) {
323 my %dataflow_rules_by_branch = ();
324 foreach my $df_rule (@{$self->dataflow_rules_collection}) {
325 my $dfr_bb = $dataflow_rules_by_branch{ $df_rule->branch_code } ||= []; # no autovivification here, have to
do it manually
327 if($df_rule->funnel_dataflow_rule) { # sort rules so that semaphored fan rules come before other (potentially fan) rules
for the same branch_code
328 unshift @$dfr_bb, $df_rule;
330 push @$dfr_bb, $df_rule;
333 $self->{
'_dataflow_rules_by_branch'} = \%dataflow_rules_by_branch;
336 return $self->{
'_dataflow_rules_by_branch'};
341 my ( $self, $output_ids_for_this_rule, $emitting_job, $same_db_dataflow, $push_emitting_job_on_stack, $df_rule ) = @_;
343 my $param_id_stack =
'';
344 my $accu_id_stack =
'';
345 my $emitting_job_id = undef;
347 if($same_db_dataflow) {
348 $param_id_stack = $emitting_job->param_id_stack;
349 $accu_id_stack = $emitting_job->accu_id_stack;
350 $emitting_job_id = $emitting_job->dbID;
352 if($push_emitting_job_on_stack) {
353 my $input_id = $emitting_job->input_id;
354 my $accu_hash = $emitting_job->accu_hash;
356 if($input_id and ($input_id ne
'{}')) { # add the parent to the param_id_stack
if it had non-trivial extra parameters
357 $param_id_stack = ($param_id_stack ? $param_id_stack.
',' :
'').$emitting_job_id;
359 if(scalar(keys %$accu_hash)) { # add the parent to the accu_id_stack
if it had
"own" accumulator
360 $accu_id_stack = ($accu_id_stack ? $accu_id_stack.
',' :
'').$emitting_job_id;
365 my $common_params = [
366 'prev_job' => $emitting_job,
368 'hive_pipeline' => $self->hive_pipeline, # Although we may not cache jobs, make sure a
new Job
"belongs" to the same pipeline as its
Analysis 369 'param_id_stack' => $param_id_stack,
370 'accu_id_stack' => $accu_id_stack,
373 my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
374 my @output_job_ids = ();
376 if( my $funnel_dataflow_rule = $df_rule->funnel_dataflow_rule ) { # members of a semaphored fan will have to wait in cache until the funnel is created:
378 my $fan_cache_this_branch = $emitting_job->fan_cache->{
"$funnel_dataflow_rule"} ||= [];
382 # controlled_semaphore => to be
set when the $controlled_semaphore has been stored
383 ) } @$output_ids_for_this_rule;
385 }
else { # either a semaphored funnel or a non-semaphored dataflow:
387 my $fan_jobs =
delete $emitting_job->fan_cache->{
"$df_rule"}; # clear the cache at the same time
389 if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel
391 if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
393 $emitting_job->transient_error(0);
394 die
"Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
399 'input_id' => $output_ids_for_this_rule->[0],
400 'status' =>
'SEMAPHORED',
403 # NB: $job_adaptor happens to belong to the $funnel_job, but not necesarily to $fan_jobs or $emitting_job 404 my ($semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
406 push @output_job_ids, $funnel_job_id, @fan_job_ids;
408 }
else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
412 'controlled_semaphore' => $emitting_job->controlled_semaphore, # propagate parent
's semaphore if any 413 ) } @$output_ids_for_this_rule; 415 # NB: $job_adaptor happens to belong to the @non_semaphored_jobs, but not necessarily to the $emitting_job : 416 push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0, $emitting_job_id) }; 420 return \@output_job_ids; 427 return 'Analysis[
'.($self->dbID // '').']:
'.$self->display_name.'->(
'.join(',
', ($self->module // 'no_module
').($self->language ? sprintf(' (%s)
', $self->language) : ''), $self->parameters // '{}
', $self->resource_class ? $self->resource_class->name : 'no_rc
').')
'; 431 sub print_diagram_node { 432 my ($self, $ref_pipeline, $prefix, $seen_analyses) = @_; 434 if($seen_analyses->{$self}++) { 435 print "(".$self->relative_display_name($ref_pipeline).")\n"; # NB: the prefix of the label itself is done by the previous level 439 print $self->relative_display_name($ref_pipeline)."\n"; # NB: the prefix of the label itself is done by the previous level 441 my $groups = $self->get_grouped_dataflow_rules; 443 foreach my $i (0..scalar(@$groups)-1) { 445 my ($funnel_dfr, $fan_dfrs, $df_targets) = @{ $groups->[$i] }; 447 my $this_funnel_offset = ''; 449 if(scalar(@$groups)>1 and scalar(@$fan_dfrs)) { # if more than one group (no single backbone) and semaphored, the semaphored group will be offset: 450 print $prefix." │\n"; 451 print $prefix." ╘════╤══╗\n"; # " └────┬──┐\n"; 452 $this_funnel_offset = ($i < scalar(@$groups)-1) ? ' │
' : ' '; # non-last vs last group 455 foreach my $j (0..scalar(@$fan_dfrs)-1) { # for each of the dependent fan rules, show them one by one: 456 my $fan_dfr = $fan_dfrs->[$j]; 457 my $fan_branch = $fan_dfr->branch_code; 459 print $prefix.$this_funnel_offset." │ ║\n"; 460 print $prefix.$this_funnel_offset." │ ║\n"; 461 print $prefix.$this_funnel_offset." │ ║#$fan_branch\n"; 463 my $fan_df_targets = $fan_dfr->get_my_targets; 465 foreach my $k (0..scalar(@$fan_df_targets)-1) { # for each fan's target
466 my $fan_target = $fan_df_targets->[$k];
468 print $prefix.$this_funnel_offset.
" │ ║\n";
470 if(my $fan_choice = (scalar(@$fan_df_targets)!=1) || defined($fan_target->on_condition)) {
471 if(defined(my $on_condition = $fan_target->on_condition)) {
472 print $prefix.$this_funnel_offset.
" │ ║ WHEN $on_condition\n";
474 print $prefix.$this_funnel_offset.
" │ ║ ELSE\n";
477 print $prefix.$this_funnel_offset.
' │├─╚═> ';
479 my $next_fan_or_condition_offset = ($j<scalar(@$fan_dfrs)-1 or $k<scalar(@$fan_df_targets)-1) ?
' │ ║ ' :
' │ ';
481 if(my $template = $fan_target->input_id_template) {
483 print $prefix.$this_funnel_offset.$next_fan_or_condition_offset.
" │\n";
484 print $prefix.$this_funnel_offset.$next_fan_or_condition_offset.
" V\n";
485 print $prefix.$this_funnel_offset.$next_fan_or_condition_offset;
488 $fan_target->to_analysis->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_fan_or_condition_offset, $seen_analyses );
492 my $funnel_branch = $funnel_dfr->branch_code;
494 print $prefix.$this_funnel_offset.
" │\n";
495 print $prefix.$this_funnel_offset.
" │\n";
496 print $prefix.$this_funnel_offset.
" │#$funnel_branch\n";
498 foreach my $k (0..scalar(@$df_targets)-1) { #
for each funnel
's target 499 my $df_target = $df_targets->[$k]; 501 print $prefix.$this_funnel_offset." │\n"; 503 my $funnel_choice = (scalar(@$df_targets)!=1) || defined($df_target->on_condition); 506 if(defined(my $on_condition = $df_target->on_condition)) { 507 print $prefix.$this_funnel_offset." │ WHEN $on_condition\n"; 509 print $prefix.$this_funnel_offset." │ ELSE\n"; 513 my $next_funnel_or_condition_offset = ''; 515 if( (scalar(@$groups)==1 or $this_funnel_offset) and !$funnel_choice ) { # 'the only group
' (backbone) or a semaphore funnel ... 516 print $prefix.$this_funnel_offset." V\n"; # ... make a vertical arrow 517 print $prefix.$this_funnel_offset; 519 print $prefix.$this_funnel_offset.' └─▻
'; # otherwise fork to the right 520 $next_funnel_or_condition_offset = ($i<scalar(@$groups)-1 or $k<scalar(@$df_targets)-1) ? ' │
' : ' '; 522 if(my $template = $df_target->input_id_template) { 524 print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." │\n"; 525 print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." V\n"; 526 print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset; 529 my $target = $df_target->to_analysis; 530 if($target->can('print_diagram_node
')) { 531 $target->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_funnel_or_condition_offset, $seen_analyses ); 533 print '[[
'.$target->relative_display_name($ref_pipeline)." ]]\n"; 535 print '<<--
'.$target->relative_display_name($ref_pipeline)."\n";
protected String _get_wrapper_for_language()
public Bio::EnsEMBL::Hive::Storable new()