9 An AnalysisJob is the link between the input_id control data, the analysis and
10 the rule system. It also tracks the state of the job as it is processed
14 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
15 Copyright [2016-2024] EMBL-European Bioinformatics Institute
17 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
18 You may obtain a copy of the License at
22 Unless required by applicable law or agreed to in writing, software distributed under the License
23 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24 See the License
for the specific language governing permissions and limitations under the License.
28 Please subscribe to the
Hive mailing list: http:
32 The rest of the documentation details each of the
object methods.
33 Internal methods are usually preceded with a _
38 package Bio::EnsEMBL::Hive::AnalysisJob;
47 use base (
'Bio::EnsEMBL::Hive::Storable', # inherit dbID(), adaptor() and
new() methods, but also hive_pipeline()
48 'Bio::EnsEMBL::Hive::Params', # inherit param management functionality
54 prev_job_id / prev_job
56 analysis_id / analysis
58 controlled_semaphore_id / controlled_semaphore
66 my $input_id = shift @_;
67 $self->{
'_input_id'} = ref($input_id) ? stringify($input_id) : $input_id;
70 return $self->{
'_input_id'};
75 $self->{
'_param_id_stack'} = shift
if(@_);
76 $self->{
'_param_id_stack'} =
'' unless(defined($self->{
'_param_id_stack'}));
77 return $self->{
'_param_id_stack'};
82 $self->{
'_accu_id_stack'} = shift
if(@_);
83 $self->{
'_accu_id_stack'} =
'' unless(defined($self->{
'_accu_id_stack'}));
84 return $self->{
'_accu_id_stack'};
89 $self->{
'_role_id'} = shift
if(@_);
90 return $self->{
'_role_id'};
95 $self->{
'_status'} = shift
if(@_);
96 return $self->{
'_status'} ||
'READY';
101 $self->{
'_retry_count'} = shift
if(@_);
102 $self->{
'_retry_count'} = 0 unless(defined($self->{
'_retry_count'}));
103 return $self->{
'_retry_count'};
108 $self->{
'_when_completed'} = shift
if(@_);
109 return $self->{
'_when_completed'};
114 $self->{
'_runtime_msec'} = shift
if(@_);
115 $self->{
'_runtime_msec'} = 0 unless(defined($self->{
'_runtime_msec'}));
116 return $self->{
'_runtime_msec'};
121 $self->{
'_query_count'} = shift
if(@_);
122 $self->{
'_query_count'} = 0 unless(defined($self->{
'_query_count'}));
123 return $self->{
'_query_count'};
127 sub set_and_update_status {
128 my ($self, $status ) = @_;
130 $self->status($status);
132 if(my $adaptor = $self->adaptor) {
133 $adaptor->check_in_job($self);
139 $self->{
'_stdout_file'} = shift
if(@_);
140 return $self->{
'_stdout_file'};
145 $self->{
'_stderr_file'} = shift
if(@_);
146 return $self->{
'_stderr_file'};
151 $self->{
'_accu_hash'} = shift
if(@_);
152 $self->{
'_accu_hash'} = {} unless(defined($self->{
'_accu_hash'}));
153 return $self->{
'_accu_hash'};
160 Function: Gets/sets flag
for whether the job should
161 be automatically dataflowed on branch 1 when the job completes.
162 If the subclass manually sends a job along branch 1 with dataflow_output_id,
163 the autoflow will turn itself off.
164 Returns : boolean (1=
default|0)
171 $self->{
'_autoflow'} = shift
if(@_);
172 $self->{
'_autoflow'} = 1 unless(defined($self->{
'_autoflow'}));
174 return $self->{
'_autoflow'};
178 ##-----------------[indicators to the Worker]--------------------------------
181 sub lethal_for_worker { # Job should set
this to 1 prior to dying (or before running code that might cause death - such as RunnableDB
's compilation)
182 # if it believes that the state of things will not allow the Worker to continue normally.
183 # The Worker will check the flag and commit suicide if it is set to true.
185 $self->{'_lethal_for_worker
'} = shift if(@_);
186 return $self->{'_lethal_for_worker
'};
189 sub transient_error { # Job should set this to 1 prior to dying (or before running code that might cause death)
190 # if it believes that it makes sense to retry the same job without any changes.
191 # It may also set it to 0 prior to dying (or before running code that might cause death)
192 # if it believes that there is no point in re-trying (say, if the parameters are wrong).
193 # The Worker will check the flag and make necessary adjustments to the database state.
194 # Errors are considered transient by default
196 $self->{'_transient_error
'} = shift if(@_);
197 return ($self->{'_transient_error
'} // 1);
200 sub incomplete { # Job should set this to 0 prior to throwing if the job is done,
201 # but it wants the thrown message to be recorded with is_error=0.
203 $self->{'_incomplete
'} = shift if(@_);
204 return $self->{'_incomplete
'};
211 $self->{'_died_somewhere
'} ||= shift if(@_); # NB: the '||=
' only applies in this case - do not copy around!
212 return $self->{'_died_somewhere
'} ||=0;
215 ##-----------------[/indicators to the Worker]-------------------------------
218 sub load_stack_and_accu {
221 if(my $job_adaptor = $self->adaptor) {
222 my $job_id = $self->dbID;
223 my $accu_adaptor = $job_adaptor->db->get_AccumulatorAdaptor;
225 if($self->param_id_stack or $self->accu_id_stack) {
226 my $input_ids_hash = $job_adaptor->fetch_input_ids_for_job_ids( $self->param_id_stack, 2, 0 ); # input_ids have lower precedence (FOR EACH ID)
227 my $accu_hash = $accu_adaptor->fetch_structures_for_job_ids( $self->accu_id_stack, 2, 1 ); # accus have higher precedence (FOR EACH ID)
228 my %input_id_accu_hash = ( %$input_ids_hash, %$accu_hash );
229 $self->{'_unsubstituted_stack_items
'} = [ @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash } ]; # take a slice. Mmm...
232 $self->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );
237 sub load_parameters {
238 my ($self, $runnable_object) = @_;
240 $self->load_stack_and_accu();
242 my @params_precedence = (
243 $runnable_object ? $runnable_object->param_defaults : (),
244 $self->hive_pipeline->params_as_hash,
245 $self->analysis ? $self->analysis->parameters : (),
246 $self->{'_unsubstituted_stack_items
'} ? @{ $self->{'_unsubstituted_stack_items
'}} : (),
251 my $prev_transient_error = $self->transient_error(); # make a note of previously set transience status
252 $self->transient_error(0);
253 $self->param_init( @params_precedence );
254 $self->transient_error($prev_transient_error);
258 sub flattened_stack_and_accu { # here we assume $self->load_stack_and_accu() has already been called by $self->load_parameters()
259 my ( $self, $overriding_hash, $extend_param_stack ) = @_;
261 return $self->fuse_param_hashes( $extend_param_stack ? (@{$self->{'_unsubstituted_stack_items
'}}, $self->input_id) : (),
267 sub fan_cache { # a self-initializing getter (no setting)
268 # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
271 return $self->{'_fan_cache
'} ||= {};
274 =head2 dataflow_output_id
276 Title : dataflow_output_id
277 Arg[1](req) : <string> $output_id
278 Arg[2](opt) : <int> $branch_name_or_code (optional, defaults to 1)
279 Usage : $self->dataflow_output_id($output_id, $branch_name_or_code);
281 If a RunnableDB(Process) needs to create jobs, this allows it to have jobs
282 created and flowed through the dataflow rules of the workflow graph.
283 This 'output_id
' becomes the 'input_id
' of the newly created job at
284 the ends of the dataflow pipes. The optional 'branch_name_or_code
' determines
285 which dataflow pipe(s) to flow the job through.
289 sub dataflow_output_id {
290 my ($self, $output_ids, $branch_name_or_code) = @_;
292 my $input_id = $self->input_id();
293 my $hive_use_param_stack = $self->hive_pipeline->hive_use_param_stack;
295 $output_ids = destringify($output_ids) unless ref($output_ids); # destringify the string
296 $output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY
'); # force previously used single values into an arrayref
298 my @destringified_output_ids;
299 foreach my $output_id (@$output_ids) {
300 $output_id = destringify($output_id) unless ref($output_id); # destringify the string
301 if ((defined $output_id) and (ref($output_id) ne 'HASH
')) { # Only undefs and hashrefs work as input_ids
302 die stringify($output_id)." is not a hashref ! Cannot dataflow";
304 push @destringified_output_ids, $output_id;
307 # map branch names to numbers:
308 my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
310 # if branch_code is set to 1 (explicitly or implicitly), turn off automatic dataflow:
311 $self->autoflow(0) if($branch_code == 1);
313 my @output_job_ids = ();
315 # fan rules come sorted before funnel rules for the same branch_code:
316 foreach my $df_rule ( @{ $self->analysis->dataflow_rules_by_branch->{$branch_code} || [] } ) {
318 my $targets_grouped_by_condition = $df_rule->get_my_targets_grouped_by_condition; # the pairs are deliberately ordered to put the DEFAULT branch last
319 my @conditions = map { $_->[0] } @$targets_grouped_by_condition;
321 my $total_output_ids_for_the_rule = 0;
323 foreach my $output_id (@destringified_output_ids) { # filter the output_ids and place them into the [2] part of $targets_grouped_by_condition
324 my $condition_match_count = 0;
325 foreach my $condition_idx (0..@conditions-1) {
326 my $unsubstituted_condition = $conditions[$condition_idx];
328 if(defined($unsubstituted_condition)) {
329 if(my $substituted_condition = $self->param_substitute('#expr(
'.$unsubstituted_condition.')expr#
', $output_id)) {
330 $condition_match_count++;
332 next; # non-DEFAULT condition branch failed
334 } elsif($condition_match_count) {
335 next; # DEFAULT condition branch failed, because one of the conditions fired
337 # DEFAULT condition branch succeeded => follow to the push
340 push @{$targets_grouped_by_condition->[$condition_idx][2]}, $output_id;
341 $total_output_ids_for_the_rule += scalar( @{ $targets_grouped_by_condition->[$condition_idx][1] } );
345 my $fan_cache_for_this_rule = exists($self->fan_cache->{"$df_rule"}) && $self->fan_cache->{"$df_rule"};
346 if($fan_cache_for_this_rule && @$fan_cache_for_this_rule && $total_output_ids_for_the_rule!=1) {
347 die "The total number of funnel output_ids (considering ".scalar(@conditions)." conditions) was $total_output_ids_for_the_rule, but expected to be 1. Please investigate";
350 foreach my $triple (@$targets_grouped_by_condition) {
351 my ($unsubstituted_condition, $df_targets, $filtered_output_ids) = @$triple;
353 if($filtered_output_ids && @$filtered_output_ids) {
355 foreach my $df_target (@$df_targets) {
357 my $extend_param_stack = $hive_use_param_stack || $df_target->extend_param_stack; # this boolean is df_target-specific
358 my $default_param_hash = $extend_param_stack ? {} : $input_id; # this is what undefs will turn into
360 my @pre_substituted_output_ids = map { $_ // $default_param_hash } @$filtered_output_ids;
362 # parameter substitution into input_id_template is also df_target-specific:
363 my $output_ids_for_this_rule;
364 if(my $template_string = $df_target->input_id_template()) {
365 my $template_hash = destringify($template_string);
366 $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @pre_substituted_output_ids ];
368 $output_ids_for_this_rule = \@pre_substituted_output_ids;
371 my $target_object = $df_target->to_analysis;
372 my $same_db_dataflow = $self->analysis->hive_pipeline == $target_object->hive_pipeline;
374 unless($same_db_dataflow) {
375 my $prev_transient_error = $self->transient_error(); # make a note of previously set transience status
376 $self->transient_error(0);
377 @$output_ids_for_this_rule = map { $self->flattened_stack_and_accu( $_, $extend_param_stack ); } @$output_ids_for_this_rule;
378 $self->transient_error($prev_transient_error);
381 my ($stored_listref) = $target_object->dataflow( $output_ids_for_this_rule, $self, $same_db_dataflow, $extend_param_stack, $df_rule );
383 push @output_job_ids, @$stored_listref;
385 } # /foreach my $df_target
386 } # /if(filtered_output_ids are workable)
387 } # /foreach my $unsubstituted_condition
388 } # /foreach my $df_rule
390 return \@output_job_ids;
394 sub url_query_params {
398 'job_id
' => $self->dbID,
406 my $analysis_label = $self->analysis
407 ? ( $self->analysis->logic_name.'(
'.$self->analysis_id.')
' )
410 return 'Job dbID=
'.($self->dbID || '(NULL)
')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", retry_count=".$self->retry_count;
414 sub fetch_local_blocking_semaphore { # ToDo: we may want to perform smart caching in future
417 return $self->adaptor->db->get_SemaphoreAdaptor->fetch_by_dependent_job_id( $self->dbID );