ensembl-hive  2.5
AnalysisJob.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  An AnalysisJob is the link between the input_id control data, the analysis and
10  the rule system. It also tracks the state of the job as it is processed
11 
12 =head1 LICENSE
13 
14  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
15  Copyright [2016-2022] EMBL-European Bioinformatics Institute
16 
17  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
18  You may obtain a copy of the License at
19 
20  http://www.apache.org/licenses/LICENSE-2.0
21 
22  Unless required by applicable law or agreed to in writing, software distributed under the License
23  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24  See the License for the specific language governing permissions and limitations under the License.
25 
26 =head1 CONTACT
27 
28  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
29 
30 =head1 APPENDIX
31 
32  The rest of the documentation details each of the object methods.
33  Internal methods are usually preceded with a _
34 
35 =cut
36 
37 
38 package Bio::EnsEMBL::Hive::AnalysisJob;
39 
40 use strict;
41 use warnings;
42 
43 use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify', 'throw');
46 
47 use base ( 'Bio::EnsEMBL::Hive::Storable', # inherit dbID(), adaptor() and new() methods, but also hive_pipeline()
48  'Bio::EnsEMBL::Hive::Params', # inherit param management functionality
49  );
50 
51 
52 =head1 AUTOLOADED
53 
54  prev_job_id / prev_job
55 
56  analysis_id / analysis
57 
58  controlled_semaphore_id / controlled_semaphore
59 
60 =cut
61 
62 
63 sub input_id {
64  my $self = shift;
65  if(@_) {
66  my $input_id = shift @_;
67  $self->{'_input_id'} = ref($input_id) ? stringify($input_id) : $input_id;
68  }
69 
70  return $self->{'_input_id'};
71 }
72 
73 sub param_id_stack {
74  my $self = shift;
75  $self->{'_param_id_stack'} = shift if(@_);
76  $self->{'_param_id_stack'} = '' unless(defined($self->{'_param_id_stack'}));
77  return $self->{'_param_id_stack'};
78 }
79 
80 sub accu_id_stack {
81  my $self = shift;
82  $self->{'_accu_id_stack'} = shift if(@_);
83  $self->{'_accu_id_stack'} = '' unless(defined($self->{'_accu_id_stack'}));
84  return $self->{'_accu_id_stack'};
85 }
86 
87 sub role_id {
88  my $self = shift;
89  $self->{'_role_id'} = shift if(@_);
90  return $self->{'_role_id'};
91 }
92 
93 sub status {
94  my $self = shift;
95  $self->{'_status'} = shift if(@_);
96  return $self->{'_status'} || 'READY';
97 }
98 
99 sub retry_count {
100  my $self = shift;
101  $self->{'_retry_count'} = shift if(@_);
102  $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
103  return $self->{'_retry_count'};
104 }
105 
106 sub when_completed {
107  my $self = shift;
108  $self->{'_when_completed'} = shift if(@_);
109  return $self->{'_when_completed'};
110 }
111 
112 sub runtime_msec {
113  my $self = shift;
114  $self->{'_runtime_msec'} = shift if(@_);
115  $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
116  return $self->{'_runtime_msec'};
117 }
118 
119 sub query_count {
120  my $self = shift;
121  $self->{'_query_count'} = shift if(@_);
122  $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
123  return $self->{'_query_count'};
124 }
125 
126 
127 sub set_and_update_status {
128  my ($self, $status ) = @_;
129 
130  $self->status($status);
131 
132  if(my $adaptor = $self->adaptor) {
133  $adaptor->check_in_job($self);
134  }
135 }
136 
137 sub stdout_file {
138  my $self = shift;
139  $self->{'_stdout_file'} = shift if(@_);
140  return $self->{'_stdout_file'};
141 }
142 
143 sub stderr_file {
144  my $self = shift;
145  $self->{'_stderr_file'} = shift if(@_);
146  return $self->{'_stderr_file'};
147 }
148 
149 sub accu_hash {
150  my $self = shift;
151  $self->{'_accu_hash'} = shift if(@_);
152  $self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'}));
153  return $self->{'_accu_hash'};
154 }
155 
156 
157 =head2 autoflow
158 
159  Title : autoflow
160  Function: Gets/sets flag for whether the job should
161  be automatically dataflowed on branch 1 when the job completes.
162  If the subclass manually sends a job along branch 1 with dataflow_output_id,
163  the autoflow will turn itself off.
164  Returns : boolean (1=default|0)
165 
166 =cut
167 
168 sub autoflow {
169  my $self = shift;
170 
171  $self->{'_autoflow'} = shift if(@_);
172  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));
173 
174  return $self->{'_autoflow'};
175 }
176 
177 
178 ##-----------------[indicators to the Worker]--------------------------------
179 
180 
181 sub lethal_for_worker { # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
182  # if it believes that the state of things will not allow the Worker to continue normally.
183  # The Worker will check the flag and commit suicide if it is set to true.
184  my $self = shift;
185  $self->{'_lethal_for_worker'} = shift if(@_);
186  return $self->{'_lethal_for_worker'};
187 }
188 
189 sub transient_error { # Job should set this to 1 prior to dying (or before running code that might cause death)
190  # if it believes that it makes sense to retry the same job without any changes.
191  # It may also set it to 0 prior to dying (or before running code that might cause death)
192  # if it believes that there is no point in re-trying (say, if the parameters are wrong).
193  # The Worker will check the flag and make necessary adjustments to the database state.
194  # Errors are considered transient by default
195  my $self = shift;
196  $self->{'_transient_error'} = shift if(@_);
197  return ($self->{'_transient_error'} // 1);
198 }
199 
200 sub incomplete { # Job should set this to 0 prior to throwing if the job is done,
201  # but it wants the thrown message to be recorded with is_error=0.
202  my $self = shift;
203  $self->{'_incomplete'} = shift if(@_);
204  return $self->{'_incomplete'};
205 }
206 
207 
208 sub died_somewhere {
209  my $self = shift;
210 
211  $self->{'_died_somewhere'} ||= shift if(@_); # NB: the '||=' only applies in this case - do not copy around!
212  return $self->{'_died_somewhere'} ||=0;
213 }
214 
215 ##-----------------[/indicators to the Worker]-------------------------------
216 
217 
218 sub load_stack_and_accu {
219  my ( $self ) = @_;
220 
221  if(my $job_adaptor = $self->adaptor) {
222  my $job_id = $self->dbID;
223  my $accu_adaptor = $job_adaptor->db->get_AccumulatorAdaptor;
224 
225  if($self->param_id_stack or $self->accu_id_stack) {
226  my $input_ids_hash = $job_adaptor->fetch_input_ids_for_job_ids( $self->param_id_stack, 2, 0 ); # input_ids have lower precedence (FOR EACH ID)
227  my $accu_hash = $accu_adaptor->fetch_structures_for_job_ids( $self->accu_id_stack, 2, 1 ); # accus have higher precedence (FOR EACH ID)
228  my %input_id_accu_hash = ( %$input_ids_hash, %$accu_hash );
229  $self->{'_unsubstituted_stack_items'} = [ @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash } ]; # take a slice. Mmm...
230  }
231 
232  $self->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );
233  }
234 }
235 
236 
237 sub load_parameters {
238  my ($self, $runnable_object) = @_;
239 
240  $self->load_stack_and_accu();
241 
242  my @params_precedence = (
243  $runnable_object ? $runnable_object->param_defaults : (),
244  $self->hive_pipeline->params_as_hash,
245  $self->analysis ? $self->analysis->parameters : (),
246  $self->{'_unsubstituted_stack_items'} ? @{ $self->{'_unsubstituted_stack_items'}} : (),
247  $self->input_id,
248  $self->accu_hash,
249  );
250 
251  my $prev_transient_error = $self->transient_error(); # make a note of previously set transience status
252  $self->transient_error(0);
253  $self->param_init( @params_precedence );
254  $self->transient_error($prev_transient_error);
255 }
256 
257 
258 sub flattened_stack_and_accu { # here we assume $self->load_stack_and_accu() has already been called by $self->load_parameters()
259  my ( $self, $overriding_hash, $extend_param_stack ) = @_;
260 
261  return $self->fuse_param_hashes( $extend_param_stack ? (@{$self->{'_unsubstituted_stack_items'}}, $self->input_id) : (),
262  $self->accu_hash,
263  $overriding_hash );
264 }
265 
266 
267 sub fan_cache { # a self-initializing getter (no setting)
268  # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
269  my $self = shift;
270 
271  return $self->{'_fan_cache'} ||= {};
272 }
273 
274 =head2 dataflow_output_id
275 
276  Title : dataflow_output_id
277  Arg[1](req) : <string> $output_id
278  Arg[2](opt) : <int> $branch_name_or_code (optional, defaults to 1)
279  Usage : $self->dataflow_output_id($output_id, $branch_name_or_code);
280  Function:
281  If a RunnableDB(Process) needs to create jobs, this allows it to have jobs
282  created and flowed through the dataflow rules of the workflow graph.
283  This 'output_id' becomes the 'input_id' of the newly created job at
284  the ends of the dataflow pipes. The optional 'branch_name_or_code' determines
285  which dataflow pipe(s) to flow the job through.
286 
287 =cut
288 
289 sub dataflow_output_id {
290  my ($self, $output_ids, $branch_name_or_code) = @_;
291 
292  my $input_id = $self->input_id();
293  my $hive_use_param_stack = $self->hive_pipeline->hive_use_param_stack;
294 
295  $output_ids = destringify($output_ids) unless ref($output_ids); # destringify the string
296  $output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
297 
298  my @destringified_output_ids;
299  foreach my $output_id (@$output_ids) {
300  $output_id = destringify($output_id) unless ref($output_id); # destringify the string
301  if ((defined $output_id) and (ref($output_id) ne 'HASH')) { # Only undefs and hashrefs work as input_ids
302  die stringify($output_id)." is not a hashref ! Cannot dataflow";
303  }
304  push @destringified_output_ids, $output_id;
305  }
306 
307  # map branch names to numbers:
308  my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
309 
310  # if branch_code is set to 1 (explicitly or implicitly), turn off automatic dataflow:
311  $self->autoflow(0) if($branch_code == 1);
312 
313  my @output_job_ids = ();
314 
315  # fan rules come sorted before funnel rules for the same branch_code:
316  foreach my $df_rule ( @{ $self->analysis->dataflow_rules_by_branch->{$branch_code} || [] } ) {
317 
318  my $targets_grouped_by_condition = $df_rule->get_my_targets_grouped_by_condition; # the pairs are deliberately ordered to put the DEFAULT branch last
319  my @conditions = map { $_->[0] } @$targets_grouped_by_condition;
320 
321  my $total_output_ids_for_the_rule = 0;
322 
323  foreach my $output_id (@destringified_output_ids) { # filter the output_ids and place them into the [2] part of $targets_grouped_by_condition
324  my $condition_match_count = 0;
325  foreach my $condition_idx (0..@conditions-1) {
326  my $unsubstituted_condition = $conditions[$condition_idx];
327 
328  if(defined($unsubstituted_condition)) {
329  if(my $substituted_condition = $self->param_substitute('#expr('.$unsubstituted_condition.')expr#', $output_id)) {
330  $condition_match_count++;
331  } else {
332  next; # non-DEFAULT condition branch failed
333  }
334  } elsif($condition_match_count) {
335  next; # DEFAULT condition branch failed, because one of the conditions fired
336  } else {
337  # DEFAULT condition branch succeeded => follow to the push
338  }
339 
340  push @{$targets_grouped_by_condition->[$condition_idx][2]}, $output_id;
341  $total_output_ids_for_the_rule += scalar( @{ $targets_grouped_by_condition->[$condition_idx][1] } );
342  }
343  }
344 
345  my $fan_cache_for_this_rule = exists($self->fan_cache->{"$df_rule"}) && $self->fan_cache->{"$df_rule"};
346  if($fan_cache_for_this_rule && @$fan_cache_for_this_rule && $total_output_ids_for_the_rule!=1) {
347  die "The total number of funnel output_ids (considering ".scalar(@conditions)." conditions) was $total_output_ids_for_the_rule, but expected to be 1. Please investigate";
348  }
349 
350  foreach my $triple (@$targets_grouped_by_condition) {
351  my ($unsubstituted_condition, $df_targets, $filtered_output_ids) = @$triple;
352 
353  if($filtered_output_ids && @$filtered_output_ids) {
354 
355  foreach my $df_target (@$df_targets) {
356 
357  my $extend_param_stack = $hive_use_param_stack || $df_target->extend_param_stack; # this boolean is df_target-specific
358  my $default_param_hash = $extend_param_stack ? {} : $input_id; # this is what undefs will turn into
359 
360  my @pre_substituted_output_ids = map { $_ // $default_param_hash } @$filtered_output_ids;
361 
362  # parameter substitution into input_id_template is also df_target-specific:
363  my $output_ids_for_this_rule;
364  if(my $template_string = $df_target->input_id_template()) {
365  my $template_hash = destringify($template_string);
366  $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @pre_substituted_output_ids ];
367  } else {
368  $output_ids_for_this_rule = \@pre_substituted_output_ids;
369  }
370 
371  my $target_object = $df_target->to_analysis;
372  my $same_db_dataflow = $self->analysis->hive_pipeline == $target_object->hive_pipeline;
373 
374  unless($same_db_dataflow) {
375  my $prev_transient_error = $self->transient_error(); # make a note of previously set transience status
376  $self->transient_error(0);
377  @$output_ids_for_this_rule = map { $self->flattened_stack_and_accu( $_, $extend_param_stack ); } @$output_ids_for_this_rule;
378  $self->transient_error($prev_transient_error);
379  }
380 
381  my ($stored_listref) = $target_object->dataflow( $output_ids_for_this_rule, $self, $same_db_dataflow, $extend_param_stack, $df_rule );
382 
383  push @output_job_ids, @$stored_listref;
384 
385  } # /foreach my $df_target
386  } # /if(filtered_output_ids are workable)
387  } # /foreach my $unsubstituted_condition
388  } # /foreach my $df_rule
389 
390  return \@output_job_ids;
391 }
392 
393 
394 sub url_query_params {
395  my ($self) = @_;
396 
397  return {
398  'job_id' => $self->dbID,
399  };
400 }
401 
402 
403 sub toString {
404  my $self = shift @_;
405 
406  my $analysis_label = $self->analysis
407  ? ( $self->analysis->logic_name.'('.$self->analysis_id.')' )
408  : '(NULL)';
409 
410  return 'Job dbID='.($self->dbID || '(NULL)')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", retry_count=".$self->retry_count;
411 }
412 
413 
414 sub fetch_local_blocking_semaphore { # ToDo: we may want to perform smart caching in future
415  my $self = shift @_;
416 
417  return $self->adaptor->db->get_SemaphoreAdaptor->fetch_by_dependent_job_id( $self->dbID );
418 }
419 
420 1;
421