ensembl-hive  2.7.0
AnalysisJobAdaptor.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 SYNOPSIS
8 
9  $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
10  $analysisJobAdaptor = $analysisJob->adaptor;
11 
12 =head1 DESCRIPTION
13 
14  Module to encapsulate all db access for persistent class AnalysisJob.
15  There should be just one per application and database connection.
16 
17 =head1 LICENSE
18 
19  See the NOTICE file distributed with this work for additional information
20  regarding copyright ownership.
21 
22  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
23  You may obtain a copy of the License at
24 
25  http://www.apache.org/licenses/LICENSE-2.0
26 
27  Unless required by applicable law or agreed to in writing, software distributed under the License
28  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29  See the License for the specific language governing permissions and limitations under the License.
30 
31 =head1 CONTACT
32 
33  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
34 
35 =head1 APPENDIX
36 
37  The rest of the documentation details each of the object methods.
38  Internal methods are preceded with a _
39 
40 =cut
41 
42 
43 package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
44 
45 use strict;
46 use warnings;
47 
51 use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify');
52 
53 use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
54 
55 
56 # NOTE: These lists must be kept in sync with the schema !
57 # They are used in a number of queries.
58 our $ALL_STATUSES_OF_RUNNING_JOBS = q{'PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP'};
59 our $ALL_STATUSES_OF_TAKEN_JOBS = qq{'CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS};
60 our $ALL_STATUSES_OF_COMPLETE_JOBS = q{'DONE','PASSED_ON'};
61 # Not in any list: SEMAPHORED, READY, COMPILATION (this one is actually not used), FAILED
62 
63 sub default_table_name {
64  return 'job';
65 }
66 
67 
68 sub default_insertion_method {
69  return 'INSERT';
70 }
71 
72 
73 sub object_class {
74  return 'Bio::EnsEMBL::Hive::AnalysisJob';
75 }
76 
77 
78 sub default_overflow_limit {
79  return {
80  'input_id' => 255,
81  'param_id_stack' => 64,
82  'accu_id_stack' => 64,
83  };
84 }
85 
86 
87 =head2 job_status_cast
88 
89  Example : $job_adaptor->job_status_cast();
90  Description : Changes the type of the expression to the type of the
91  job-status ENUM.
92  This is needed for CASE expressions in PostgreSQL which
93  otherwise default to returning a string.
94  Returntype : String
95  Exceptions : none
96 
97 =cut
98 
99 sub job_status_cast {
100  my ($self, $status_string) = @_;
101  if ($self->dbc->driver eq 'pgsql') {
102  return "CAST($status_string AS job_status)";
103  } else {
104  return $status_string;
105  }
106 }
107 
108 
109 =head2 fetch_by_analysis_id_and_input_id
110 
111  Arg [1] : Integer $analysis_id
112  Arg [2] : String $input_id
113  Example : $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
114  Description: Attempts to find the job by contents, then makes another attempt if the input_id is expected to have overflown into analysis_data
115  Returntype : AnalysisJob object
116 
117 =cut
118 
119 sub fetch_by_analysis_id_and_input_id { # It is a special case not covered by AUTOLOAD; note the lowercase _and_
120  my ($self, $analysis_id, $input_id) = @_;
121 
122  my $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, $input_id);
123 
124  if(!$job and length($input_id)>$self->default_overflow_limit->{input_id}) {
125  if(my $ext_data_id = $self->db->get_AnalysisDataAdaptor->fetch_by_data_to_analysis_data_id( $input_id )) {
126  $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, "_extended_data_id $ext_data_id");
127  }
128  }
129  return $job;
130 }
131 
132 
133 sub class_specific_execute {
134  my ($self, $object, $sth, $values) = @_;
135 
136  my $return_code;
137 
138  eval {
139  $return_code = $self->SUPER::class_specific_execute($object, $sth, $values);
140  1;
141  } or do {
142  my $duplicate_regex = {
143  'mysql' => qr/Duplicate entry.+?for key/s,
144  'sqlite' => qr/columns.+?are not unique|UNIQUE constraint failed/s, # versions around 3.8 spit the first msg, versions around 3.15 - the second
145  'pgsql' => qr/duplicate key value violates unique constraint/s,
146  }->{$self->db->dbc->driver};
147 
148  if( $@ =~ $duplicate_regex ) { # implementing 'INSERT IGNORE' of Jobs on the API side
149  my $emitting_job_id = $object->prev_job_id;
150  my $analysis_id = $object->analysis_id;
151  my $input_id = $object->input_id;
152  my $msg = "Attempt to insert a duplicate job (analysis_id=$analysis_id, input_id=$input_id) intercepted and ignored";
153 
154  $self->db->get_LogMessageAdaptor->store_job_message( $emitting_job_id, $msg, 'INFO' );
155 
156  $return_code = '0E0';
157  } else {
158  die $@;
159  }
160  };
161 
162  return $return_code;
163 }
164 
165 
166 =head2 store_jobs_and_adjust_counters
167 
168  Arg [1] : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $jobs_to_store
169  Arg [2] : (optional) boolean $push_new_semaphore
170  Arg [3] : (optional) Int $emitting_job_id
171  Example : my @output_job_ids = @{ $job_adaptor->store_jobs_and_adjust_counters( \@jobs_to_store ) };
172  Description: Attempts to store a list of jobs, returns an arrayref of successfully stored job_ids
173  Returntype : Reference to list of job_dbIDs
174 
175 =cut
176 
177 sub store_jobs_and_adjust_counters {
178  my ($self, $jobs, $push_new_semaphore, $emitting_job_id) = @_;
179 
180  my @output_job_ids = ();
181 
182  # NB: our use patterns assume all jobs from the same storing batch share the same controlled_semaphore:
183  my $controlled_semaphore = scalar(@$jobs) && $jobs->[0]->controlled_semaphore;
184  my @jobs_that_failed_to_store = ();
185 
186  if( $controlled_semaphore && !$push_new_semaphore ) { # only if it has not been done yet
187  $controlled_semaphore->increase_by( $jobs ); # "pre-increase" the semaphore counts before creating the controlling jobs
188  }
189 
190  foreach my $job (@$jobs) {
191 
192  my $analysis = $job->analysis;
193  my $job_adaptor = $analysis ? $analysis->adaptor->db->get_AnalysisJobAdaptor : $self; # if analysis object is undefined, consider the job local
194  my $prev_adaptor= ($job->prev_job && $job->prev_job->adaptor) || '';
195  my $job_is_local_to_parent = $prev_adaptor eq $job_adaptor;
196 
197  if( $controlled_semaphore ) {
198  my $job_hive_pipeline = $job->hive_pipeline;
199 
200  if( $controlled_semaphore->hive_pipeline ne $job_hive_pipeline ) { # if $job happens to be remote to $controlled_semaphore,
201  # introduce another job-local semaphore between $job and $controlled_semaphore:
202  my $job_local_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
203  'hive_pipeline' => $job_hive_pipeline,
204  'dependent_semaphore_url' => $controlled_semaphore->relative_url( $job_hive_pipeline ),
205  'local_jobs_counter' => 1,
206  'remote_jobs_counter' => 0,
207  );
208  $job_adaptor->db->get_SemaphoreAdaptor->store( $job_local_semaphore );
209 
210  $job->controlled_semaphore( $job_local_semaphore );
211  }
212  }
213 
214  if( $job_adaptor ne $prev_adaptor ) {
215  $job->prev_job_id( undef ); # job_ids are local, so for remote jobs they have to be cleaned up before storing
216  }
217 
218  my ($job, $stored_this_time) = $job_adaptor->store( $job );
219 
220  if($stored_this_time) {
221 
222  unless($job_adaptor->db->hive_pipeline->hive_use_triggers()) {
223  $job_adaptor->dbc->do(qq{
224  UPDATE analysis_stats
225  SET total_job_count=total_job_count+1
226  }
227  .(($job->status eq 'READY')
228  ? " ,ready_job_count=ready_job_count+1 "
229  : " ,semaphored_job_count=semaphored_job_count+1 "
230  ).(($job_adaptor->dbc->driver eq 'pgsql')
231  ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
232  : " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
233  )." WHERE analysis_id=".$job->analysis_id
234  );
235  }
236 
237  push @output_job_ids, $job->dbID(); # FIXME: this ID may not make much cross-db sense
238 
239  } else {
240  push @jobs_that_failed_to_store, $job;
241 
242  my $msg = "JobAdaptor failed to store the "
243  . ($job_is_local_to_parent ? 'local' : 'remote')
244  . " Job( analysis_id=".$job->analysis_id.', '.$job->input_id." ), possibly due to a collision";
245  if ($job_is_local_to_parent && $emitting_job_id) {
246  $self->db->get_LogMessageAdaptor->store_job_message($emitting_job_id, $msg, 'PIPELINE_CAUTION');
247  } else {
248  $self->db->get_LogMessageAdaptor->store_hive_message($msg, 'PIPELINE_CAUTION');
249  }
250 
251  }
252  }
253 
254  if( $controlled_semaphore && scalar(@jobs_that_failed_to_store) ) {
255  $controlled_semaphore->decrease_by( \@jobs_that_failed_to_store );
256  }
257 
258  return \@output_job_ids;
259 }
260 
261 
262 =head2 store_a_semaphored_group_of_jobs
263 
264  Arg [1] : Bio::EnsEMBL::Hive::AnalysisJob $funnel_job
265  Arg [2] : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $fan_jobs
266  Arg [3] : (optional) Bio::EnsEMBL::Hive::AnalysisJob $emitting_job
267  Arg [4] : (optional) boolean $no_leeching
268  Example : my ($funnel_semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
269  Description: Attempts to store a semaphored group of jobs, returns a list of successfully stored job_ids
270  Returntype : ($funnel_semaphore_id, $funnel_job_id, @fan_job_ids)
271 
272 =cut
273 
274 sub store_a_semaphored_group_of_jobs {
275  my ($self, $funnel_job, $fan_jobs, $emitting_job, $no_leeching) = @_;
276 
277  my $emitting_job_id;
278 
279  if($emitting_job) {
280  if($funnel_job) {
281  $funnel_job->prev_job( $emitting_job );
282  $funnel_job->controlled_semaphore( $emitting_job->controlled_semaphore ); # propagate parent's semaphore if any
283  }
284  $emitting_job_id = $emitting_job->dbID;
285  }
286 
287  my $funnel_semaphore;
288  my $funnel_semaphore_adaptor = $self->db->get_SemaphoreAdaptor; # assuming $self was $funnel_job_adaptor
289 
290  my ($funnel_job_id) = $funnel_job ? @{ $self->store_jobs_and_adjust_counters( [ $funnel_job ], 0, $emitting_job_id) } : ();
291 
292  if($funnel_job && !$funnel_job_id) { # apparently the funnel_job has been created previously, trying to leech to it:
293 
294  if($no_leeching) {
295  die "The funnel job could not be stored, but leeching was not allowed, so bailing out";
296 
297  } elsif( $funnel_job = $self->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
298  $funnel_job_id = $funnel_job->dbID;
299 
300  # If the job hasn't run yet, we can still block it
301  if ($funnel_job->status eq 'READY') {
302  # Mark the job as SEMAPHORED to make sure it's not taken by any worker
303  $self->semaphore_job_by_id($funnel_job_id);
304  $self->refresh($funnel_job);
305  }
306 
307  if( $funnel_job->status eq 'SEMAPHORED' ) {
308 
309  $funnel_semaphore = $funnel_job->fetch_local_blocking_semaphore();
310 
311  # Create if it was missing
312  unless ($funnel_semaphore) {
313  $funnel_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
314  'hive_pipeline' => $funnel_job->hive_pipeline,
315  'dependent_job_id' => $funnel_job_id,
316  'local_jobs_counter' => 0, # Will be updated below
317  'remote_jobs_counter' => 0, # Will be updated below
318  );
319  $funnel_semaphore_adaptor->store( $funnel_semaphore );
320  }
321 
322  $funnel_semaphore->increase_by( $fan_jobs ); # "pre-increase" the semaphore counts before creating the controlling jobs
323 
324  $self->db->get_LogMessageAdaptor->store_job_message($emitting_job_id, "Discovered and using an existing funnel ".$funnel_job->toString, 'INFO');
325  } else {
326  die "The funnel job (id=$funnel_job_id) fetched from the database was not in SEMAPHORED status";
327  }
328  } else {
329  die "The funnel job could neither be stored nor fetched";
330  }
331  } else { # Either the $funnel_job was successfully stored, or there wasn't any $funnel_job to start with:
332 
333  my $whose_hive_pipeline = $funnel_job || $self->db;
334 
335  my ($local_count, $remote_count) = Bio::EnsEMBL::Hive::Cacheable::count_local_and_remote_objects( $whose_hive_pipeline, $fan_jobs );
336 
337  $funnel_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
338  'hive_pipeline' => $whose_hive_pipeline->hive_pipeline,
339  'dependent_job_id' => $funnel_job_id,
340  'local_jobs_counter' => $local_count,
341  'remote_jobs_counter' => $remote_count,
342  );
343  $funnel_semaphore_adaptor->store( $funnel_semaphore );
344 
345  $funnel_semaphore->release_if_ripe();
346  }
347 
348  foreach my $fan_job (@$fan_jobs) { # set the funnel in every fan's job:
349  $fan_job->controlled_semaphore( $funnel_semaphore );
350  }
351 
352  my (@fan_job_ids) = @{ $self->store_jobs_and_adjust_counters( $fan_jobs, 1, $emitting_job_id) };
353 
354  return ($funnel_semaphore->dbID, $funnel_job_id, @fan_job_ids);
355 }
356 
357 
358 
359 =head2 fetch_all_by_analysis_id_status
360 
361  Arg [1] : (optional) listref $list_of_analyses
362  Arg [2] : (optional) string $status
363  Arg [3] : (optional) int $retry_at_least
364  Example : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
365  $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status( $list_of_analyses, 'DONE');
366  Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
367  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects
368 
369 =cut
370 
371 sub fetch_all_by_analysis_id_status {
372  my ($self, $list_of_analyses, $status, $retry_count_at_least) = @_;
373 
374  my @constraints = ();
375 
376  if($list_of_analyses) {
377  if(ref($list_of_analyses) eq 'ARRAY') {
378  push @constraints, "analysis_id IN (".(join(',', map {$_->dbID} @$list_of_analyses)).")";
379  } else {
380  push @constraints, "analysis_id=$list_of_analyses"; # for compatibility with old interface
381  }
382  }
383 
384  push @constraints, "status='$status'" if ($status);
385  push @constraints, "retry_count >= $retry_count_at_least" if ($retry_count_at_least);
386 
387  return $self->fetch_all( join(" AND ", @constraints) );
388 }
389 
390 
391 sub fetch_some_by_analysis_id_limit {
392  my ($self, $analysis_id, $limit) = @_;
393 
394  return $self->fetch_all( "analysis_id = '$analysis_id' LIMIT $limit" );
395 }
396 
397 
398 sub fetch_all_incomplete_jobs_by_role_id {
399  my ($self, $role_id) = @_;
400 
401  my $constraint = "status IN ($ALL_STATUSES_OF_TAKEN_JOBS) AND role_id='$role_id'";
402  return $self->fetch_all($constraint);
403 }
404 
405 
406 sub fetch_all_unfinished_jobs_with_no_roles {
407  my $self = shift;
408 
409  return $self->fetch_all( "role_id IS NULL AND status IN ($ALL_STATUSES_OF_TAKEN_JOBS)" );
410 }
411 
412 
413 sub fetch_by_url_query {
414  my ($self, $field_name, $field_value) = @_;
415 
416  if($field_name eq 'dbID' and $field_value) {
417 
418  return $self->fetch_by_dbID($field_value);
419 
420  } else {
421 
422  return;
423 
424  }
425 }
426 
427 
428 sub fetch_job_counts_hashed_by_status {
429  my ($self, $requested_analysis_id) = @_;
430 
431  my %job_counts = ();
432 
433  # Note: this seemingly useless dummy_analysis_id is here to force MySQL use existing index on (analysis_id, status)
434  my $sql = "SELECT analysis_id, status, count(*) FROM job WHERE analysis_id=? GROUP BY analysis_id, status";
435  my $sth = $self->prepare($sql);
436  $sth->execute( $requested_analysis_id );
437 
438  while (my ($dummy_analysis_id, $status, $job_count)=$sth->fetchrow_array()) {
439  $job_counts{ $status } = $job_count;
440  }
441 
442  $sth->finish;
443 
444  return \%job_counts;
445 }
446 
447 
448 ########################
449 #
450 # STORE / UPDATE METHODS
451 #
452 ########################
453 
454 sub semaphore_job_by_id { # used in the end of reblocking a semaphore chain
455  my $self = shift @_;
456  my $job_id = shift @_ or return;
457 
458  my $sql = "UPDATE job SET status = 'SEMAPHORED' WHERE job_id=? AND status NOT IN ('COMPILATION', $ALL_STATUSES_OF_TAKEN_JOBS)";
459 
460  $self->dbc->protected_prepare_execute( [ $sql, $job_id ],
461  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'semaphoring a job'.$after, 'INFO' ); }
462  );
463 }
464 
465 sub unsemaphore_job_by_id { # used in semaphore annihilation or unsuccessful creation
466  my $self = shift @_;
467  my $job_id = shift @_ or return;
468 
469  my $sql = "UPDATE job SET status = 'READY' WHERE job_id=? AND status='SEMAPHORED'";
470 
471  $self->dbc->protected_prepare_execute( [ $sql, $job_id ],
472  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'unsemaphoring a job'.$after, 'INFO' ); }
473  );
474 }
475 
476 
477 sub prelock_semaphore_for_update { # currently defunct, but may be needed to resolve situations of heavy load on semaphore/job tables
478  my $self = shift @_;
479  my $job_id = shift @_ or return;
480 
481  if(my $dbc = $self->dbc) {
482  if($dbc->driver ne 'sqlite') {
483  $self->dbc->protected_prepare_execute( [ "SELECT 1 FROM job WHERE job_id=? FOR UPDATE", $job_id ],
484  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( "prelocking semaphore job_id=$job_id".$after, 0 ); }
485  );
486  }
487  }
488 }
489 
490 
491 =head2 check_in_job
492 
493  Arg [1] : $analysis_id
494  Example :
495  Description: updates the job.status in the database
496  Returntype :
497  Exceptions :
498  Caller : general
499 
500 =cut
501 
502 sub check_in_job {
503  my ($self, $job) = @_;
504 
505  my $job_id = $job->dbID;
506 
507  my $sql = "UPDATE job SET status='".$job->status."' ";
508 
509  if($job->status eq 'DONE') {
510  $sql .= ",when_completed=CURRENT_TIMESTAMP";
511  $sql .= ",runtime_msec=".$job->runtime_msec;
512  $sql .= ",query_count=".$job->query_count;
513  } elsif($job->status eq 'PASSED_ON') {
514  $sql .= ", when_completed=CURRENT_TIMESTAMP";
515  } elsif($job->status eq 'READY') {
516  }
517 
518  $sql .= " WHERE job_id='$job_id' ";
519 
520  # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
521  $self->dbc->protected_prepare_execute( [ $sql ],
522  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_job_message( $job_id, "checking the job in".$after, 'INFO' ); }
523  );
524 }
525 
526 
527 =head2 store_out_files
528 
529  Arg [1] : Bio::EnsEMBL::Hive::AnalysisJob $job
530  Example :
531  Description: update locations of log files, if present
532  Returntype :
533  Exceptions :
535 
536 =cut
537 
538 sub store_out_files {
539  my ($self, $job) = @_;
540 
541  # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
542 
543  my $delete_sql = 'DELETE from job_file WHERE job_id=' . $job->dbID . ' AND retry='.$job->retry_count;
544  $self->dbc->do( $delete_sql );
545 
546  if($job->stdout_file or $job->stderr_file) {
547  my $insert_sql = 'INSERT INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
548  my $insert_sth = $self->dbc->prepare($insert_sql);
549  $insert_sth->execute( $job->dbID, $job->retry_count, $job->role_id, $job->stdout_file, $job->stderr_file );
550  $insert_sth->finish();
551  }
552 }
553 
554 
555 =head2 reset_or_grab_job_by_dbID
556 
557  Arg [1] : int $job_id
558  Arg [2] : int $role_id (optional)
559  Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
560  NB: Will also reset a previously 'SEMAPHORED' job to READY.
561  The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
562  but will not change retry_count if a job has never *really* started.
563  Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef
564 
565 =cut
566 
567 sub reset_or_grab_job_by_dbID {
568  my ($self, $job_id, $role_id) = @_;
569 
570  my $new_status = $role_id ? 'CLAIMED' : 'READY';
571 
572  # Note: the order of the fields being updated is critical!
573  my $sql = qq{
574  UPDATE job
575  SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
576  , status=?
577  , role_id=?
578  WHERE job_id=?
579  };
580  my @values = ($new_status, $role_id, $job_id);
581 
582  my $sth = $self->prepare( $sql );
583  my $return_code = $sth->execute( @values )
584  or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
585  $sth->finish;
586 
587  my $job = $self->fetch_by_job_id_AND_status($job_id, $new_status) ;
588 
589  return $job;
590 }
591 
592 
593 =head2 grab_jobs_for_role
594 
595  Arg [1] : Bio::EnsEMBL::Hive::Role object $role
596  Arg [2] : int $how_many_this_role
597  Example:
598  my $jobs = $job_adaptor->grab_jobs_for_role( $role, $how_many );
599  Description:
600  For the specified Role, it will search available jobs,
601  and using the how_many_this_batch parameter, claim/fetch that
602  number of jobs, and then return them.
603  Returntype :
604  reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
605  Caller : Bio::EnsEMBL::Hive::Worker::run
606 
607 =cut
608 
609 sub grab_jobs_for_role {
610  my ($self, $role, $how_many_this_batch) = @_;
611 
612  return [] unless( $how_many_this_batch );
613 
614  my $analysis_id = $role->analysis_id;
615  my $role_id = $role->dbID;
616  my $role_rank = $self->db->get_RoleAdaptor->get_role_rank( $role );
617  my $offset = $how_many_this_batch * $role_rank;
618 
619  my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
620  UPDATE job j
621  JOIN (
622  SELECT job_id
623  FROM job
624  WHERE analysis_id='$analysis_id'
625  AND status='READY'
626  } : qq{
627  UPDATE job
628  SET role_id='$role_id', status='CLAIMED'
629  WHERE job_id in (
630  SELECT job_id
631  FROM job
632  WHERE analysis_id='$analysis_id'
633  AND status='READY'
634  };
635  my $virgin_sql = qq{ AND retry_count=0 };
636  my $limit_sql = qq{ LIMIT $how_many_this_batch };
637  my $offset_sql = qq{ OFFSET $offset };
638  my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
639  ) as x
640  USING (job_id)
641  SET j.role_id='$role_id', j.status='CLAIMED'
642  WHERE j.status='READY'
643  } : qq{
644  )
645  AND status='READY'
646  };
647 
648  my $claim_count;
649 
650  # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
651  if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $virgin_sql . $limit_sql . $offset_sql . $suffix_sql ],
652  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a virgin batch of offset jobs".$after, 'INFO' ); }
653  ))) {
654  if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $limit_sql . $offset_sql . $suffix_sql ],
655  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of offset jobs".$after, 'INFO' ); }
656  ))) {
657  $claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $limit_sql . $suffix_sql ],
658  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of non-offset jobs".$after, 'INFO' ); }
659  );
660  }
661  }
662 
663  $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'ready_job_count', -$claim_count, $analysis_id );
664 
665  return $claim_count ? $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') : [];
666 }
667 
668 
669 sub release_claimed_jobs_from_role {
670  my ($self, $role) = @_;
671 
672  # previous value of role_id is not important, because that Role never had a chance to run the jobs
673  # TODO: If worker died during compilation, do not reset job here.
674  my $num_released_jobs = $self->dbc->protected_prepare_execute( [ "UPDATE job SET status='READY', role_id=NULL WHERE role_id=? AND status='CLAIMED'", $role->dbID ],
675  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "releasing claimed jobs from role".$after, 'INFO' ); }
676  );
677 
678  my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
679  my $analysis_id = $role->analysis_id;
680 
681  $analysis_stats_adaptor->increment_a_counter( 'ready_job_count', $num_released_jobs, $analysis_id );
682 
683 # $analysis_stats_adaptor->update_status( $analysis_id, 'LOADING' );
684 }
685 
686 
687 =head2 release_undone_jobs_from_role
688 
689  Arg [1] : Bio::EnsEMBL::Hive::Role object
690  Arg [2] : optional message to be recorded in 'job_message' table
691  Example :
692  Description: If a Worker has died some of its jobs need to be reset back to 'READY'
693  so they can be rerun.
694  Jobs in state CLAIMED as simply reset back to READY.
695  If jobs was 'in progress' (see the $ALL_STATUSES_OF_RUNNING_JOBS variable)
696  the retry_count is increased and the status set back to READY.
697  If the retry_count >= $max_retry_count (3 by default) the job is set
698  to 'FAILED' and not rerun again.
699  Exceptions : $role must be defined
701 
702 =cut
703 
704 sub release_undone_jobs_from_role {
705  my ($self, $role, $msg) = @_;
706 
707  my $role_id = $role->dbID;
708  my $analysis = $role->analysis;
709  my $max_retry_count = $analysis->max_retry_count;
710  my $worker = $role->worker;
711 
712  #first just reset the claimed jobs, these don't need a retry_count index increment:
713  $self->release_claimed_jobs_from_role( $role );
714 
715  my $sth = $self->prepare( qq{
716  SELECT job_id
717  FROM job
718  WHERE role_id='$role_id'
719  AND status in ($ALL_STATUSES_OF_TAKEN_JOBS)
720  } );
721  $sth->execute();
722 
723  my $cod = $worker->cause_of_death() || 'UNKNOWN';
724  $msg ||= "GarbageCollector: The worker died because of $cod";
725 
726  my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);
727 
728  while(my ($job_id) = $sth->fetchrow_array()) {
729 
730  my $passed_on = 0; # the flag indicating that the garbage_collection was attempted and was successful
731 
732  if( $resource_overusage ) {
733  if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
734  $msg .= ', performing gc_dataflow';
735  }
736  }
737  unless($passed_on) {
738  if($passed_on = $self->gc_dataflow( $analysis, $job_id, 'ANYFAILURE' )) {
739  $msg .= ", performing 'ANYFAILURE' gc_dataflow";
740  }
741  }
742 
743  $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, $passed_on ? 'INFO' : 'WORKER_ERROR');
744 
745  unless($passed_on) {
746  # We can not retry this job if it failed during compilation or due
747  # to a resource constraint. If a gc_dataflow is present, it will
748  # take care of that and create another job.
749  my $worker_status = $worker->status();
750  my $no_retry = ($cod eq 'COMPILATION') || $resource_overusage;
751  $self->release_and_age_job( $job_id, $max_retry_count, not $no_retry );
752  }
753 
754  $role->register_attempt( 0 );
755  }
756  $sth->finish();
757 }
758 
759 
760 sub release_and_age_job {
761  my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
762 
763  # Default values
764  $max_retry_count //= $self->db->hive_pipeline->hive_default_max_retry_count;
765  $may_retry ||= 0;
766  $runtime_msec = "NULL" unless(defined $runtime_msec);
767 
768  # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
769  #
770  # FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
771  #
772  $self->dbc->do(
773  "UPDATE job "
774  .( ($self->dbc->driver eq 'pgsql')
775  ? "SET status = CAST(CASE WHEN ($may_retry != 0) AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS job_status), "
776  : "SET status = CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
777  ).qq{
778  retry_count=retry_count+1,
779  runtime_msec=$runtime_msec
780  WHERE job_id=$job_id
781  AND status in ($ALL_STATUSES_OF_TAKEN_JOBS)
782  } );
783 
784  # FIXME: move the decision making completely to the API side and so avoid the potential race condition.
785  my $job = $self->fetch_by_dbID( $job_id );
786 
787  $self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($job->status eq 'FAILED') ? 'failed_job_count' : 'ready_job_count', 1, $job->analysis_id );
788 }
789 
790 
791 =head2 gc_dataflow
792 
793  Description: perform automatic dataflow from a dead job that overused resources if a corresponding dataflow rule was provided
794  Should only be called once during garbage collection phase, when the job is definitely 'abandoned' and not being worked on.
795 
796 =cut
797 
798 sub gc_dataflow {
799  my ($self, $analysis, $job_id, $branch_name) = @_;
800 
802 
803  unless( $analysis->dataflow_rules_by_branch->{$branch_code} ) {
804  return 0; # just return if no corresponding gc_dataflow rule has been defined
805  }
806 
807  my $job = $self->fetch_by_dbID($job_id);
808  $job->analysis( $analysis );
809 
810  $job->load_parameters(); # input_id_templates still supported, however to a limited extent
811 
812  $job->dataflow_output_id( undef, $branch_name );
813 
814  $job->set_and_update_status('PASSED_ON');
815 
816  # PASSED_ON jobs are included in done_job_count
817  $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'done_job_count', 1, $analysis->dbID );
818 
819  if( my $controlled_semaphore = $job->controlled_semaphore ) {
820  $controlled_semaphore->decrease_by( [ $job ] );
821  }
822 
823  return 1;
824 }
825 
826 
827 =head2 reset_jobs_for_analysis_id
828 
829  Arg [1] : arrayref of Analyses
830  Arg [2] : arrayref of job statuses $input_statuses
831  Description: Resets all the jobs of the selected analyses that have one of the
832  required statuses to 'READY' and their retry_count to 0.
833  Semaphores are updated accordingly.
834  Caller : beekeeper.pl and guiHive
835 
836 =cut
837 
838 sub reset_jobs_for_analysis_id {
839  my ($self, $list_of_analyses, $input_statuses) = @_;
840 
841  return if !scalar(@$input_statuses); # No statuses to reset
842 
843  my $analyses_filter = 'j.analysis_id IN ('.join(',', map { $_->dbID } @$list_of_analyses).')';
844  my $statuses_filter = 'AND j.status IN ('.join(', ', map { "'$_'" } @$input_statuses).')';
845 
846  # Get the list of semaphores, and by how much their local_jobs_counter should be increased.
847  # Only DONE and PASSED_ON jobs of the matching analyses and statuses should be counted
848  #
849  my $sql1 = qq{
850  SELECT COUNT(*) AS local_delta, controlled_semaphore_id
851  FROM job j
852  WHERE controlled_semaphore_id IS NOT NULL
853  AND $analyses_filter $statuses_filter AND status IN ($ALL_STATUSES_OF_COMPLETE_JOBS)
854  GROUP BY controlled_semaphore_id
855  };
856 
857  # Run in a transaction to ensure we see a consistent state of the job
858  # statuses and semaphore counts.
859  $self->dbc->run_in_transaction( sub {
860 
861  my $semaphore_adaptor = $self->db->get_SemaphoreAdaptor;
862 
863  # Update all the semaphored jobs one by one
864  my $sth1 = $self->prepare($sql1);
865  $sth1->execute();
866  while (my ($local_delta, $semaphore_id) = $sth1->fetchrow_array()) {
867 
868  my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
869  $semaphore->reblock_by( $local_delta ); # increase the local_jobs_counter, reblock recursively if needed
870  }
871  $sth1->finish;
872 
873  # change fan jobs' statuses to 'READY', if they are themselves not SEMAPHORED
874  my $sql3 = ($self->dbc->driver eq 'mysql') ? qq{
875  UPDATE job j
876  LEFT JOIN semaphore s
877  ON (j.job_id=s.dependent_job_id)
878  SET j.retry_count = CASE WHEN j.status='READY' THEN 0 ELSE 1 END,
879  j.status = }.$self->job_status_cast("CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED' ELSE 'READY' END").qq{
880  WHERE $analyses_filter $statuses_filter
881  } : ($self->dbc->driver eq 'pgsql') ? qq{
882  UPDATE job
883  SET retry_count = CASE WHEN j.status='READY' THEN 0 ELSE 1 END,
884  status = }.$self->job_status_cast("CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED' ELSE 'READY' END").qq{
885  FROM job j
886  LEFT JOIN semaphore s
887  ON (j.job_id=s.dependent_job_id)
888  WHERE job.job_id=j.job_id AND $analyses_filter $statuses_filter
889  } : qq{
890 
891  REPLACE INTO job (job_id, prev_job_id, analysis_id, input_id, param_id_stack, accu_id_stack, role_id, status, retry_count, when_completed, runtime_msec, query_count, controlled_semaphore_id)
892  SELECT j.job_id,
893  j.prev_job_id,
894  j.analysis_id,
895  j.input_id,
896  j.param_id_stack,
897  j.accu_id_stack,
898  j.role_id,
899  CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED' ELSE 'READY' END,
900  CASE WHEN j.status='READY' THEN 0 ELSE 1 END,
901  j.when_completed,
902  j.runtime_msec,
903  j.query_count,
904  j.controlled_semaphore_id
905  FROM job j
906  LEFT JOIN semaphore s
907  ON (j.job_id=s.dependent_job_id)
908  WHERE $analyses_filter $statuses_filter
909  };
910 
911  $self->dbc->protected_prepare_execute( [$sql3],
912  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'resetting jobs'.$after, 'INFO' ); }
913  );
914 
915  foreach my $analysis ( @$list_of_analyses ) {
916  $self->db->get_AnalysisStatsAdaptor->update_status($analysis->dbID, 'LOADING');
917  }
918 
919  } ); # end of transaction
920 }
921 
922 
923 =head2 unblock_jobs_for_analysis_id
924 
925  Arg [1] : list-ref of int $analysis_id
926  Description: Sets all the SEMAPHORED jobs of the given analyses to READY and also unblocks their upstream semaphores
927  Caller : beekeeper.pl and guiHive
928 
929 =cut
930 
931 sub unblock_jobs_for_analysis_id {
932  my ($self, $list_of_analyses) = @_;
933 
934  my $analyses_filter = 'analysis_id IN ('.join(',', map { $_->dbID } @$list_of_analyses).')';
935 
936  # Get the list of semaphored jobs together with their semaphores, and unblock both (previously semaphored jobs become 'READY')
937 
938  if($self->dbc->driver eq 'mysql') { # MySQL supports updating multiple tables at once
939 
940  my $sql = qq{
941  UPDATE job j
942  JOIN semaphore s
943  ON (j.job_id=s.dependent_job_id)
944  SET s.local_jobs_counter=0, s.remote_jobs_counter=0, j.status = 'READY'
945  WHERE $analyses_filter AND j.status = 'SEMAPHORED'
946  };
947  $self->dbc->protected_prepare_execute( [$sql],
948  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'unblocking jobs'.$after, 'INFO' ); }
949  );
950 
951  } elsif ($self->dbc->driver eq 'pgsql') {
952 
953  my $sql1 = qq{
954  UPDATE semaphore s
955  SET local_jobs_counter=0, remote_jobs_counter=0
956  FROM job j
957  WHERE $analyses_filter AND j.job_id = s.dependent_job_id AND j.status = 'SEMAPHORED'
958  };
959  $self->dbc->do($sql1);
960 
961  my $sql2 = qq{
962  UPDATE job j
963  SET status='READY'
964  WHERE $analyses_filter AND j.status = 'SEMAPHORED'
965  };
966  $self->dbc->do($sql2);
967 
968  } else {
969 
970  my $sql1 = qq{
971  REPLACE INTO semaphore (semaphore_id, local_jobs_counter, remote_jobs_counter, dependent_job_id, dependent_semaphore_url)
972  SELECT s.semaphore_id,
973  0,
974  0,
975  s.dependent_job_id,
976  s.dependent_semaphore_url
977  FROM semaphore s
978  JOIN job j
979  ON (j.job_id = s.dependent_job_id)
980  WHERE $analyses_filter AND j.status = 'SEMAPHORED'
981  };
982  $self->dbc->do($sql1);
983 
984  my $sql2 = qq{
985  UPDATE job
986  SET status='READY'
987  WHERE $analyses_filter AND status = 'SEMAPHORED'
988  };
989  $self->dbc->do($sql2);
990  };
991 
992  foreach my $analysis ( @$list_of_analyses ) {
993  $self->db->get_AnalysisStatsAdaptor->update_status($analysis->dbID, 'LOADING');
994  }
995 }
996 
997 
998 =head2 discard_jobs_for_analysis_id
999 
1000  Arg [1] : list-ref of int $analysis_id
1001  Arg [2] : filter status
1002  Description: Resets all $input_status jobs of the matching analyses to DONE.
1003  Semaphores are updated accordingly.
1004  Caller : beekeeper.pl and guiHive
1005 
1006 =cut
1007 
1008 sub discard_jobs_for_analysis_id {
1009  my ($self, $list_of_analyses, $input_status) = @_;
1010 
1011  $self->balance_semaphores( $list_of_analyses );
1012 
1013  my $analyses_filter = 'analysis_id IN ('.join(',', map { $_->dbID } @$list_of_analyses).')';
1014  my $status_filter = $input_status ? " AND status = '$input_status'" : "";
1015 
1016  # Get the list of semaphores, and by how much their local_jobs_counter should be decreased.
1017  my $sql1 = qq{
1018  SELECT controlled_semaphore_id, COUNT(*) AS local_delta
1019  FROM job
1020  WHERE controlled_semaphore_id IS NOT NULL
1021  AND $analyses_filter $status_filter
1022  GROUP BY controlled_semaphore_id
1023  };
1024 
1025  my $sql2 = qq{
1026  UPDATE job
1027  SET status = 'DONE'
1028  WHERE controlled_semaphore_id = ?
1029  AND $analyses_filter $status_filter
1030  };
1031 
1032  my $sql3 = qq{
1033  UPDATE job
1034  SET status = 'DONE'
1035  WHERE controlled_semaphore_id IS NULL
1036  AND $analyses_filter $status_filter
1037  };
1038 
1039  # Run in a transaction to ensure we see a consistent state of the job
1040  # statuses and semaphore counts.
1041  $self->dbc->run_in_transaction( sub {
1042 
1043  # let's reset work on the jobs that don't have a controlled_semaphore_id
1044  $self->dbc->protected_prepare_execute( [$sql3],
1045  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'discarding jobs'.$after, 'INFO' ); }
1046  );
1047 
1048  my $semaphore_adaptor = $self->db->get_SemaphoreAdaptor;
1049 
1050  # Update all the semaphored jobs one-by-one
1051  my $sth1 = $self->prepare($sql1);
1052  my $sth2 = $self->prepare($sql2);
1053  $sth1->execute();
1054  while (my ($semaphore_id, $local_delta) = $sth1->fetchrow_array()) {
1055 
1056  $sth2->execute( $semaphore_id ); # First mark the jobs as DONE
1057 
1058  my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1059  $semaphore->decrease_by( $local_delta ); # then decrease the local_jobs_counters, recursively releasing if ripe
1060  }
1061  $sth1->finish;
1062  $sth2->finish;
1063 
1064  my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
1065 
1066  foreach my $analysis ( @$list_of_analyses ) {
1067  $analysis_stats_adaptor->update_status($analysis->dbID, 'LOADING');
1068  }
1069 
1070  } ); # end of transaction
1071 }
1072 
1073 
1074 =head2 balance_semaphores
1075 
1076  Description: Reset all semaphore_counts to the numbers of unDONE semaphoring jobs.
1077 
1078 =cut
1079 
1080 sub balance_semaphores {
1081  my ($self, $list_of_analyses) = @_;
1082 
1083  my $analysis_filter = $list_of_analyses
1084  ? "funnel.analysis_id IN (".join(',', map { $_->dbID } @$list_of_analyses).") AND"
1085  : '';
1086 
1087  my $find_sql = qq{
1088  SELECT * FROM (
1089  SELECT s.semaphore_id, s.local_jobs_counter AS was, COALESCE(COUNT(CASE WHEN fan.status NOT IN ($ALL_STATUSES_OF_COMPLETE_JOBS) THEN 1 ELSE NULL END),0) AS should
1090  FROM semaphore s
1091  LEFT JOIN job fan ON (s.semaphore_id=fan.controlled_semaphore_id)
1092  LEFT JOIN job funnel ON (s.dependent_job_id=funnel.job_id)
1093  WHERE $analysis_filter
1094  funnel.status in ('SEMAPHORED', 'READY')
1095  GROUP BY s.semaphore_id
1096  ) AS internal WHERE was<>should OR should=0
1097  };
1098 
1099  my $rebalanced_jobs_counter = 0;
1100 
1101  # Run in a transaction to ensure we see a consistent state of the job
1102  # statuses and semaphore counts.
1103  $self->dbc->run_in_transaction( sub {
1104 
1105  my $find_sth = $self->prepare($find_sql);
1106  my $semaphore_adaptor = $self->db->get_SemaphoreAdaptor;
1107 
1108  $find_sth->execute();
1109  while(my ($semaphore_id, $was, $should) = $find_sth->fetchrow_array()) {
1110  my $msg;
1111  if($should<$was) {
1112 
1113  $msg = "Semaphore $semaphore_id local_jobs_counter has to be decreased $was -> $should, performing it now with a potential release";
1114  $self->db->get_LogMessageAdaptor->store_hive_message( $msg, 'PIPELINE_CAUTION' );
1115 
1116  my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1117  $semaphore->decrease_by( $was-$should ); # decrease the local_jobs_counter, recursively releasing if ripe
1118 
1119  $rebalanced_jobs_counter++;
1120  } elsif($was<$should) {
1121 
1122  $msg = "Semaphore $semaphore_id local_jobs_counter has to be increased $was -> $should, performing it now with a potential reblock";
1123  $self->db->get_LogMessageAdaptor->store_hive_message( $msg, 'PIPELINE_CAUTION' );
1124 
1125  my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1126  $semaphore->reblock_by( $should-$was ); # increase the local_jobs_counter, reblock recursively if needed
1127 
1128  $rebalanced_jobs_counter++;
1129  } else {
1130  my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1131  # check_if_ripe does the same but with an extra call to the database
1132  if( $semaphore->local_jobs_counter + $semaphore->remote_jobs_counter <= 0) {
1133  $msg = "Semaphore $semaphore_id is marked as blocked despite nothing blocking it, releasing it now";
1134  $self->db->get_LogMessageAdaptor->store_hive_message( $msg, 'PIPELINE_CAUTION' );
1135 
1136  $semaphore->release_if_ripe();
1137  }
1138  }
1139  warn "[Semaphore $semaphore_id] $msg\n" if $msg; # TODO: integrate the STDERR diagnostic output with LogMessageAdaptor calls in general
1140  }
1141  $find_sth->finish;
1142 
1143  } ); # end of transaction
1144 
1145  return $rebalanced_jobs_counter;
1146 }
1147 
1148 
1149 sub fetch_input_ids_for_job_ids {
1150  my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
1151  $id_scale ||= 1;
1152  $id_offset ||= 0;
1153 
1154  my %input_ids = ();
1155 
1156  if( $job_ids_csv ) {
1157 
1158  my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)";
1159  my $sth = $self->prepare( $sql );
1160  $sth->execute();
1161 
1162  while(my ($job_id, $input_id) = $sth->fetchrow_array() ) {
1163  if($input_id =~ /^_ext(?:\w+)_data_id (\d+)$/) {
1164  $input_id = $self->db->get_AnalysisDataAdaptor->fetch_by_analysis_data_id_TO_data($1);
1165  }
1166  $input_ids{$job_id * $id_scale + $id_offset} = $input_id;
1167  }
1168  }
1169  return \%input_ids;
1170 }
1171 
1172 
1173 1;
1174 
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code
public branch_name_2_code()
Bio::EnsEMBL::Hive::Storable::adaptor
public Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor adaptor()
Bio::EnsEMBL::Hive::Storable::dbID
public Int dbID()
EnsEMBL
Definition: Filter.pm:1
Bio::EnsEMBL::Hive::AnalysisJob::input_id
public input_id()
map
public map()
Bio::EnsEMBL::Hive::Role
Definition: Role.pm:9
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Semaphore
Definition: Semaphore.pm:13
Bio::EnsEMBL::Hive::Storable::new
public Bio::EnsEMBL::Hive::Storable new()
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
Bio::EnsEMBL::Hive::Queen
Definition: Queen.pm:47
Bio::EnsEMBL::Hive::Cacheable
Definition: Cacheable.pm:6
run
public run()
Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor
Definition: DataflowRuleAdaptor.pm:19
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
Definition: AnalysisJobAdaptor.pm:22
Bio::EnsEMBL::Hive::AnalysisJob
Definition: AnalysisJob.pm:13
Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor::db
public db()
Bio
Definition: AltAlleleGroup.pm:4
Bio::EnsEMBL::Hive::AnalysisJob::status
public status()
Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor::prepare
public prepare()