9 $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
10 $analysisJobAdaptor = $analysisJob->adaptor;
14 Module to encapsulate all db access
for persistent
class AnalysisJob.
15 There should be just one per application and database connection.
19 See the NOTICE file distributed with
this work
for additional information
20 regarding copyright ownership.
22 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
23 You may obtain a copy of the License at
27 Unless required by applicable law or agreed to in writing, software distributed under the License
28 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29 See the License
for the specific language governing permissions and limitations under the License.
33 Please subscribe to the
Hive mailing list: http:
37 The rest of the documentation details each of the
object methods.
38 Internal methods are preceded with a _
43 package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
53 use base (
'Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
56 # NOTE: These lists must be kept in sync with the schema !
57 # They are used in a number of queries.
58 our $ALL_STATUSES_OF_RUNNING_JOBS = q{
'PRE_CLEANUP',
'FETCH_INPUT',
'RUN',
'WRITE_OUTPUT',
'POST_HEALTHCHECK',
'POST_CLEANUP'};
59 our $ALL_STATUSES_OF_TAKEN_JOBS = qq{
'CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS};
60 our $ALL_STATUSES_OF_COMPLETE_JOBS = q{
'DONE',
'PASSED_ON'};
61 # Not in any list: SEMAPHORED, READY, COMPILATION (this one is actually not used), FAILED
63 sub default_table_name {
68 sub default_insertion_method {
74 return 'Bio::EnsEMBL::Hive::AnalysisJob';
78 sub default_overflow_limit {
81 'param_id_stack' => 64,
82 'accu_id_stack' => 64,
87 =head2 job_status_cast
89 Example : $job_adaptor->job_status_cast();
90 Description : Changes the type of the expression to the type of the
92 This is needed
for CASE expressions in PostgreSQL which
93 otherwise
default to returning a
string.
100 my ($self, $status_string) = @_;
101 if ($self->dbc->driver eq
'pgsql') {
102 return "CAST($status_string AS job_status)";
104 return $status_string;
109 =head2 fetch_by_analysis_id_and_input_id
111 Arg [1] : Integer $analysis_id
112 Arg [2] : String $input_id
113 Example : $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
114 Description: Attempts to find the job by contents, then makes another attempt
if the input_id is expected to have overflown into analysis_data
119 sub fetch_by_analysis_id_and_input_id { # It is a special
case not covered by AUTOLOAD; note the lowercase _and_
120 my ($self, $analysis_id, $input_id) = @_;
122 my $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, $input_id);
124 if(!$job and length($input_id)>$self->default_overflow_limit->{input_id}) {
125 if(my $ext_data_id = $self->db->get_AnalysisDataAdaptor->fetch_by_data_to_analysis_data_id( $input_id )) {
126 $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id,
"_extended_data_id $ext_data_id");
133 sub class_specific_execute {
134 my ($self, $object, $sth, $values) = @_;
139 $return_code = $self->SUPER::class_specific_execute($object, $sth, $values);
142 my $duplicate_regex = {
143 'mysql' => qr/Duplicate entry.+?
for key/s,
144 'sqlite' => qr/columns.+?are not unique|UNIQUE constraint failed/s, # versions around 3.8 spit the first msg, versions around 3.15 - the second
145 'pgsql' => qr/duplicate key value violates unique constraint/s,
146 }->{$self->db->dbc->driver};
148 if( $@ =~ $duplicate_regex ) { # implementing
'INSERT IGNORE' of Jobs on the API side
149 my $emitting_job_id = $object->prev_job_id;
150 my $analysis_id = $object->analysis_id;
152 my $msg =
"Attempt to insert a duplicate job (analysis_id=$analysis_id, input_id=$input_id) intercepted and ignored";
154 $self->db->get_LogMessageAdaptor->store_job_message( $emitting_job_id, $msg,
'INFO' );
156 $return_code =
'0E0';
166 =head2 store_jobs_and_adjust_counters
169 Arg [2] : (optional)
boolean $push_new_semaphore
170 Arg [3] : (optional) Int $emitting_job_id
171 Example : my @output_job_ids = @{ $job_adaptor->store_jobs_and_adjust_counters( \@jobs_to_store ) };
172 Description: Attempts to store a list of jobs, returns an arrayref of successfully stored job_ids
173 Returntype : Reference to list of job_dbIDs
177 sub store_jobs_and_adjust_counters {
178 my ($self, $jobs, $push_new_semaphore, $emitting_job_id) = @_;
180 my @output_job_ids = ();
182 # NB: our use patterns assume all jobs from the same storing batch share the same controlled_semaphore:
183 my $controlled_semaphore = scalar(@$jobs) && $jobs->[0]->controlled_semaphore;
184 my @jobs_that_failed_to_store = ();
186 if( $controlled_semaphore && !$push_new_semaphore ) { # only
if it has not been done yet
187 $controlled_semaphore->increase_by( $jobs ); #
"pre-increase" the semaphore counts before creating the controlling jobs
190 foreach my $job (@$jobs) {
192 my $analysis = $job->analysis;
193 my $job_adaptor = $analysis ? $analysis->
adaptor->
db->get_AnalysisJobAdaptor : $self; #
if analysis
object is undefined, consider the job local
194 my $prev_adaptor= ($job->prev_job && $job->prev_job->adaptor) ||
'';
195 my $job_is_local_to_parent = $prev_adaptor eq $job_adaptor;
197 if( $controlled_semaphore ) {
198 my $job_hive_pipeline = $job->hive_pipeline;
200 if( $controlled_semaphore->hive_pipeline ne $job_hive_pipeline ) { #
if $job happens to be remote to $controlled_semaphore,
201 # introduce another job-local semaphore between $job and $controlled_semaphore:
203 'hive_pipeline' => $job_hive_pipeline,
204 'dependent_semaphore_url' => $controlled_semaphore->relative_url( $job_hive_pipeline ),
205 'local_jobs_counter' => 1,
206 'remote_jobs_counter' => 0,
208 $job_adaptor->db->get_SemaphoreAdaptor->store( $job_local_semaphore );
210 $job->controlled_semaphore( $job_local_semaphore );
214 if( $job_adaptor ne $prev_adaptor ) {
215 $job->prev_job_id( undef ); # job_ids are local, so
for remote jobs they have to be cleaned up before storing
218 my ($job, $stored_this_time) = $job_adaptor->store( $job );
220 if($stored_this_time) {
222 unless($job_adaptor->db->hive_pipeline->hive_use_triggers()) {
223 $job_adaptor->dbc->do(qq{
224 UPDATE analysis_stats
225 SET total_job_count=total_job_count+1
227 .(($job->status eq
'READY')
228 ?
" ,ready_job_count=ready_job_count+1 "
229 :
" ,semaphored_job_count=semaphored_job_count+1 "
230 ).(($job_adaptor->dbc->driver eq
'pgsql')
231 ?
" ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
232 :
" ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
233 ).
" WHERE analysis_id=".$job->analysis_id
237 push @output_job_ids, $job->
dbID(); # FIXME:
this ID may not make much cross-db sense
240 push @jobs_that_failed_to_store, $job;
242 my $msg =
"JobAdaptor failed to store the "
243 . ($job_is_local_to_parent ?
'local' :
'remote')
244 .
" Job( analysis_id=".$job->analysis_id.
', '.$job->input_id.
" ), possibly due to a collision";
245 if ($job_is_local_to_parent && $emitting_job_id) {
246 $self->db->get_LogMessageAdaptor->store_job_message($emitting_job_id, $msg,
'PIPELINE_CAUTION');
248 $self->db->get_LogMessageAdaptor->store_hive_message($msg,
'PIPELINE_CAUTION');
254 if( $controlled_semaphore && scalar(@jobs_that_failed_to_store) ) {
255 $controlled_semaphore->decrease_by( \@jobs_that_failed_to_store );
258 return \@output_job_ids;
262 =head2 store_a_semaphored_group_of_jobs
267 Arg [4] : (optional)
boolean $no_leeching
268 Example : my ($funnel_semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
269 Description: Attempts to store a semaphored group of jobs, returns a list of successfully stored job_ids
270 Returntype : ($funnel_semaphore_id, $funnel_job_id, @fan_job_ids)
274 sub store_a_semaphored_group_of_jobs {
275 my ($self, $funnel_job, $fan_jobs, $emitting_job, $no_leeching) = @_;
281 $funnel_job->prev_job( $emitting_job );
282 $funnel_job->controlled_semaphore( $emitting_job->controlled_semaphore ); # propagate parent
's semaphore if any
284 $emitting_job_id = $emitting_job->dbID;
287 my $funnel_semaphore;
288 my $funnel_semaphore_adaptor = $self->db->get_SemaphoreAdaptor; # assuming $self was $funnel_job_adaptor
290 my ($funnel_job_id) = $funnel_job ? @{ $self->store_jobs_and_adjust_counters( [ $funnel_job ], 0, $emitting_job_id) } : ();
292 if($funnel_job && !$funnel_job_id) { # apparently the funnel_job has been created previously, trying to leech to it:
295 die "The funnel job could not be stored, but leeching was not allowed, so bailing out";
297 } elsif( $funnel_job = $self->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
298 $funnel_job_id = $funnel_job->dbID;
300 # If the job hasn't
run yet, we can still block it
301 if ($funnel_job->status eq
'READY') {
302 # Mark the job as SEMAPHORED to make sure it's not taken by any worker
303 $self->semaphore_job_by_id($funnel_job_id);
304 $self->refresh($funnel_job);
307 if( $funnel_job->status eq
'SEMAPHORED' ) {
309 $funnel_semaphore = $funnel_job->fetch_local_blocking_semaphore();
311 # Create if it was missing
312 unless ($funnel_semaphore) {
314 'hive_pipeline' => $funnel_job->hive_pipeline,
315 'dependent_job_id' => $funnel_job_id,
316 'local_jobs_counter' => 0, # Will be updated below
317 'remote_jobs_counter' => 0, # Will be updated below
319 $funnel_semaphore_adaptor->store( $funnel_semaphore );
322 $funnel_semaphore->increase_by( $fan_jobs ); #
"pre-increase" the semaphore counts before creating the controlling jobs
324 $self->db->get_LogMessageAdaptor->store_job_message($emitting_job_id,
"Discovered and using an existing funnel ".$funnel_job->toString,
'INFO');
326 die
"The funnel job (id=$funnel_job_id) fetched from the database was not in SEMAPHORED status";
329 die
"The funnel job could neither be stored nor fetched";
331 }
else { # Either the $funnel_job was successfully stored, or there wasn
't any $funnel_job to start with:
333 my $whose_hive_pipeline = $funnel_job || $self->db;
335 my ($local_count, $remote_count) = Bio::EnsEMBL::Hive::Cacheable::count_local_and_remote_objects( $whose_hive_pipeline, $fan_jobs );
337 $funnel_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
338 'hive_pipeline
' => $whose_hive_pipeline->hive_pipeline,
339 'dependent_job_id
' => $funnel_job_id,
340 'local_jobs_counter
' => $local_count,
341 'remote_jobs_counter
' => $remote_count,
343 $funnel_semaphore_adaptor->store( $funnel_semaphore );
345 $funnel_semaphore->release_if_ripe();
348 foreach my $fan_job (@$fan_jobs) { # set the funnel in every fan's job:
349 $fan_job->controlled_semaphore( $funnel_semaphore );
352 my (@fan_job_ids) = @{ $self->store_jobs_and_adjust_counters( $fan_jobs, 1, $emitting_job_id) };
354 return ($funnel_semaphore->dbID, $funnel_job_id, @fan_job_ids);
359 =head2 fetch_all_by_analysis_id_status
361 Arg [1] : (optional) listref $list_of_analyses
362 Arg [2] : (optional)
string $status
363 Arg [3] : (optional)
int $retry_at_least
364 Example : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef,
'FAILED');
365 $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status( $list_of_analyses,
'DONE');
366 Description: Returns a list of all jobs filtered by given analysis_id (
if specified) and given status (
if specified).
371 sub fetch_all_by_analysis_id_status {
372 my ($self, $list_of_analyses, $status, $retry_count_at_least) = @_;
374 my @constraints = ();
376 if($list_of_analyses) {
377 if(ref($list_of_analyses) eq
'ARRAY') {
378 push @constraints,
"analysis_id IN (".(join(
',',
map {$_->dbID} @$list_of_analyses)).
")";
380 push @constraints,
"analysis_id=$list_of_analyses"; #
for compatibility with old interface
384 push @constraints,
"status='$status'" if ($status);
385 push @constraints,
"retry_count >= $retry_count_at_least" if ($retry_count_at_least);
387 return $self->fetch_all( join(
" AND ", @constraints) );
391 sub fetch_some_by_analysis_id_limit {
392 my ($self, $analysis_id, $limit) = @_;
394 return $self->fetch_all(
"analysis_id = '$analysis_id' LIMIT $limit" );
398 sub fetch_all_incomplete_jobs_by_role_id {
399 my ($self, $role_id) = @_;
401 my $constraint =
"status IN ($ALL_STATUSES_OF_TAKEN_JOBS) AND role_id='$role_id'";
402 return $self->fetch_all($constraint);
406 sub fetch_all_unfinished_jobs_with_no_roles {
409 return $self->fetch_all(
"role_id IS NULL AND status IN ($ALL_STATUSES_OF_TAKEN_JOBS)" );
413 sub fetch_by_url_query {
414 my ($self, $field_name, $field_value) = @_;
416 if($field_name eq
'dbID' and $field_value) {
418 return $self->fetch_by_dbID($field_value);
428 sub fetch_job_counts_hashed_by_status {
429 my ($self, $requested_analysis_id) = @_;
433 # Note: this seemingly useless dummy_analysis_id is here to force MySQL use existing index on (analysis_id, status)
434 my $sql =
"SELECT analysis_id, status, count(*) FROM job WHERE analysis_id=? GROUP BY analysis_id, status";
435 my $sth = $self->prepare($sql);
436 $sth->execute( $requested_analysis_id );
438 while (my ($dummy_analysis_id, $status, $job_count)=$sth->fetchrow_array()) {
439 $job_counts{ $status } = $job_count;
448 ########################
450 # STORE / UPDATE METHODS
452 ########################
454 sub semaphore_job_by_id { # used in the end of reblocking a semaphore chain
456 my $job_id = shift @_ or
return;
458 my $sql =
"UPDATE job SET status = 'SEMAPHORED' WHERE job_id=? AND status NOT IN ('COMPILATION', $ALL_STATUSES_OF_TAKEN_JOBS)";
460 $self->dbc->protected_prepare_execute( [ $sql, $job_id ],
461 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message(
'semaphoring a job'.$after,
'INFO' ); }
465 sub unsemaphore_job_by_id { # used in semaphore annihilation or unsuccessful creation
467 my $job_id = shift @_ or
return;
469 my $sql =
"UPDATE job SET status = 'READY' WHERE job_id=? AND status='SEMAPHORED'";
471 $self->dbc->protected_prepare_execute( [ $sql, $job_id ],
472 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message(
'unsemaphoring a job'.$after,
'INFO' ); }
477 sub prelock_semaphore_for_update { # currently defunct, but may be needed to resolve situations of heavy load on semaphore/job tables
479 my $job_id = shift @_ or
return;
481 if(my $dbc = $self->dbc) {
482 if($dbc->driver ne
'sqlite') {
483 $self->dbc->protected_prepare_execute( [
"SELECT 1 FROM job WHERE job_id=? FOR UPDATE", $job_id ],
484 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message(
"prelocking semaphore job_id=$job_id".$after, 0 ); }
493 Arg [1] : $analysis_id
495 Description: updates the job.
status in the database
503 my ($self, $job) = @_;
505 my $job_id = $job->dbID;
507 my $sql =
"UPDATE job SET status='".$job->status.
"' ";
509 if($job->status eq
'DONE') {
510 $sql .=
",when_completed=CURRENT_TIMESTAMP";
511 $sql .=
",runtime_msec=".$job->runtime_msec;
512 $sql .=
",query_count=".$job->query_count;
513 } elsif($job->status eq
'PASSED_ON') {
514 $sql .=
", when_completed=CURRENT_TIMESTAMP";
515 } elsif($job->status eq
'READY') {
518 $sql .=
" WHERE job_id='$job_id' ";
520 # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
521 $self->dbc->protected_prepare_execute( [ $sql ],
522 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_job_message( $job_id,
"checking the job in".$after,
'INFO' ); }
527 =head2 store_out_files
531 Description: update locations of log files,
if present
538 sub store_out_files {
539 my ($self, $job) = @_;
541 # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
543 my $delete_sql =
'DELETE from job_file WHERE job_id=' . $job->
dbID .
' AND retry='.$job->retry_count;
544 $self->dbc->do( $delete_sql );
546 if($job->stdout_file or $job->stderr_file) {
547 my $insert_sql =
'INSERT INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
548 my $insert_sth = $self->dbc->prepare($insert_sql);
549 $insert_sth->execute( $job->dbID, $job->retry_count, $job->role_id, $job->stdout_file, $job->stderr_file );
550 $insert_sth->finish();
555 =head2 reset_or_grab_job_by_dbID
557 Arg [1] :
int $job_id
558 Arg [2] :
int $role_id (optional)
559 Description: resets a job to to
'READY' (
if no $role_id given) or directly to
'CLAIMED' so it can be
run again, and fetches it.
560 NB: Will also reset a previously
'SEMAPHORED' job to READY.
561 The retry_count will be set to 1
for previously
run jobs (partially or wholly) to trigger PRE_CLEANUP
for them,
562 but will not change retry_count
if a job has never *really* started.
567 sub reset_or_grab_job_by_dbID {
568 my ($self, $job_id, $role_id) = @_;
570 my $new_status = $role_id ?
'CLAIMED' :
'READY';
572 # Note: the order of the fields being updated is critical!
575 SET retry_count = CASE WHEN (status=
'READY' OR status=
'CLAIMED') THEN retry_count ELSE 1 END
580 my @values = ($new_status, $role_id, $job_id);
582 my $sth = $self->prepare( $sql );
583 my $return_code = $sth->execute( @values )
584 or die "Could not
run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
587 my $job = $self->fetch_by_job_id_AND_status($job_id, $new_status) ;
593 =head2 grab_jobs_for_role
595 Arg [1] :
Bio::
EnsEMBL::Hive::Role
object $role
596 Arg [2] :
int $how_many_this_role
598 my $jobs = $job_adaptor->grab_jobs_for_role( $role, $how_many );
600 For the specified Role, it will search available jobs,
601 and using the how_many_this_batch parameter, claim/fetch that
602 number of jobs, and then return them.
604 reference to array of
Bio::
EnsEMBL::Hive::AnalysisJob objects
609 sub grab_jobs_for_role {
610 my ($self, $role, $how_many_this_batch) = @_;
612 return [] unless( $how_many_this_batch );
614 my $analysis_id = $role->analysis_id;
615 my $role_id = $role->dbID;
616 my $role_rank = $self->db->get_RoleAdaptor->get_role_rank( $role );
617 my $offset = $how_many_this_batch * $role_rank;
619 my $prefix_sql = ($self->dbc->driver eq
'mysql') ? qq{
624 WHERE analysis_id=
'$analysis_id'
628 SET role_id=
'$role_id', status=
'CLAIMED'
632 WHERE analysis_id=
'$analysis_id'
635 my $virgin_sql = qq{ AND retry_count=0 };
636 my $limit_sql = qq{ LIMIT $how_many_this_batch };
637 my $offset_sql = qq{ OFFSET $offset };
638 my $suffix_sql = ($self->dbc->driver eq
'mysql') ? qq{
641 SET j.role_id=
'$role_id', j.status=
'CLAIMED'
642 WHERE j.status=
'READY'
650 # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
651 if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $virgin_sql . $limit_sql . $offset_sql . $suffix_sql ],
652 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker,
"grabbing a virgin batch of offset jobs".$after,
'INFO' ); }
654 if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $limit_sql . $offset_sql . $suffix_sql ],
655 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker,
"grabbing a non-virgin batch of offset jobs".$after,
'INFO' ); }
657 $claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $limit_sql . $suffix_sql ],
658 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker,
"grabbing a non-virgin batch of non-offset jobs".$after,
'INFO' ); }
663 $self->db->get_AnalysisStatsAdaptor->increment_a_counter(
'ready_job_count', -$claim_count, $analysis_id );
665 return $claim_count ? $self->fetch_all_by_role_id_AND_status($role_id,
'CLAIMED') : [];
669 sub release_claimed_jobs_from_role {
670 my ($self, $role) = @_;
672 # previous value of role_id is not important, because that Role never had a chance to run the jobs
673 # TODO: If worker died during compilation, do not reset job here.
674 my $num_released_jobs = $self->dbc->protected_prepare_execute( [
"UPDATE job SET status='READY', role_id=NULL WHERE role_id=? AND status='CLAIMED'", $role->dbID ],
675 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker,
"releasing claimed jobs from role".$after,
'INFO' ); }
678 my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
679 my $analysis_id = $role->analysis_id;
681 $analysis_stats_adaptor->increment_a_counter(
'ready_job_count', $num_released_jobs, $analysis_id );
683 # $analysis_stats_adaptor->update_status( $analysis_id, 'LOADING' );
687 =head2 release_undone_jobs_from_role
690 Arg [2] : optional message to be recorded in
'job_message' table
692 Description: If a Worker has died some of its jobs need to be reset back to
'READY'
693 so they can be rerun.
694 Jobs in state CLAIMED as simply reset back to READY.
695 If jobs was
'in progress' (see the $ALL_STATUSES_OF_RUNNING_JOBS variable)
696 the retry_count is increased and the status set back to READY.
697 If the retry_count >= $max_retry_count (3 by
default) the job is set
698 to
'FAILED' and not rerun again.
699 Exceptions : $role must be defined
704 sub release_undone_jobs_from_role {
705 my ($self, $role, $msg) = @_;
707 my $role_id = $role->dbID;
708 my $analysis = $role->analysis;
709 my $max_retry_count = $analysis->max_retry_count;
710 my $worker = $role->worker;
712 #first just reset the claimed jobs, these don't need a retry_count index increment:
713 $self->release_claimed_jobs_from_role( $role );
718 WHERE role_id=
'$role_id'
719 AND status in ($ALL_STATUSES_OF_TAKEN_JOBS)
723 my $cod = $worker->cause_of_death() ||
'UNKNOWN';
724 $msg ||=
"GarbageCollector: The worker died because of $cod";
726 my $resource_overusage = ($cod eq
'MEMLIMIT') || ($cod eq
'RUNLIMIT' and $worker->work_done()==0);
728 while(my ($job_id) = $sth->fetchrow_array()) {
730 my $passed_on = 0; # the flag indicating that the garbage_collection was attempted and was successful
732 if( $resource_overusage ) {
733 if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
734 $msg .=
', performing gc_dataflow';
738 if($passed_on = $self->gc_dataflow( $analysis, $job_id,
'ANYFAILURE' )) {
739 $msg .=
", performing 'ANYFAILURE' gc_dataflow";
743 $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, $passed_on ?
'INFO' :
'WORKER_ERROR');
746 # We can not retry this job if it failed during compilation or due
747 # to a resource constraint. If a gc_dataflow is present, it will
748 # take care of that and create another job.
749 my $worker_status = $worker->status();
750 my $no_retry = ($cod eq
'COMPILATION') || $resource_overusage;
751 $self->release_and_age_job( $job_id, $max_retry_count, not $no_retry );
754 $role->register_attempt( 0 );
760 sub release_and_age_job {
761 my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
766 $runtime_msec =
"NULL" unless(defined $runtime_msec);
768 # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
770 # FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
774 .( ($self->dbc->driver eq
'pgsql')
775 ?
"SET status = CAST(CASE WHEN ($may_retry != 0) AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS job_status), "
776 :
"SET status = CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
778 retry_count=retry_count+1,
779 runtime_msec=$runtime_msec
781 AND status in ($ALL_STATUSES_OF_TAKEN_JOBS)
784 # FIXME: move the decision making completely to the API side and so avoid the potential race condition.
785 my $job = $self->fetch_by_dbID( $job_id );
787 $self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($job->status eq
'FAILED') ?
'failed_job_count' :
'ready_job_count', 1, $job->analysis_id );
793 Description: perform automatic dataflow from a dead job that overused resources
if a corresponding dataflow rule was provided
794 Should only be called once during garbage collection phase, when the job is definitely
'abandoned' and not being worked on.
799 my ($self, $analysis, $job_id, $branch_name) = @_;
803 unless( $analysis->dataflow_rules_by_branch->{$branch_code} ) {
804 return 0; # just
return if no corresponding gc_dataflow rule has been defined
807 my $job = $self->fetch_by_dbID($job_id);
808 $job->analysis( $analysis );
810 $job->load_parameters(); # input_id_templates still supported, however to a limited extent
812 $job->dataflow_output_id( undef, $branch_name );
814 $job->set_and_update_status(
'PASSED_ON');
816 # PASSED_ON jobs are included in done_job_count
817 $self->db->get_AnalysisStatsAdaptor->increment_a_counter(
'done_job_count', 1, $analysis->dbID );
819 if( my $controlled_semaphore = $job->controlled_semaphore ) {
820 $controlled_semaphore->decrease_by( [ $job ] );
827 =head2 reset_jobs_for_analysis_id
829 Arg [1] : arrayref of Analyses
830 Arg [2] : arrayref of job statuses $input_statuses
831 Description: Resets all the jobs of the selected analyses that have one of the
832 required statuses to
'READY' and their retry_count to 0.
833 Semaphores are updated accordingly.
834 Caller : beekeeper.pl and guiHive
838 sub reset_jobs_for_analysis_id {
839 my ($self, $list_of_analyses, $input_statuses) = @_;
841 return if !scalar(@$input_statuses); # No statuses to reset
843 my $analyses_filter =
'j.analysis_id IN ('.join(
',',
map { $_->dbID } @$list_of_analyses).
')';
844 my $statuses_filter =
'AND j.status IN ('.join(
', ',
map {
"'$_'" } @$input_statuses).
')';
846 # Get the list of semaphores, and by how much their local_jobs_counter should be increased.
847 # Only DONE and PASSED_ON jobs of the matching analyses and statuses should be counted
850 SELECT COUNT(*) AS local_delta, controlled_semaphore_id
852 WHERE controlled_semaphore_id IS NOT NULL
853 AND $analyses_filter $statuses_filter AND status IN ($ALL_STATUSES_OF_COMPLETE_JOBS)
854 GROUP BY controlled_semaphore_id
857 # Run in a transaction to ensure we see a consistent state of the job
858 # statuses and semaphore counts.
859 $self->dbc->run_in_transaction( sub {
861 my $semaphore_adaptor = $self->db->get_SemaphoreAdaptor;
863 # Update all the semaphored jobs one by one
864 my $sth1 = $self->prepare($sql1);
866 while (my ($local_delta, $semaphore_id) = $sth1->fetchrow_array()) {
868 my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
869 $semaphore->reblock_by( $local_delta ); # increase the local_jobs_counter, reblock recursively if needed
873 # change fan jobs
' statuses to 'READY
', if they are themselves not SEMAPHORED
874 my $sql3 = ($self->dbc->driver eq 'mysql
') ? qq{
876 LEFT JOIN semaphore s
877 ON (j.job_id=s.dependent_job_id)
878 SET j.retry_count = CASE WHEN j.status='READY
' THEN 0 ELSE 1 END,
879 j.status = }.$self->job_status_cast("CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED
' ELSE 'READY
' END").qq{
880 WHERE $analyses_filter $statuses_filter
881 } : ($self->dbc->driver eq 'pgsql
') ? qq{
883 SET retry_count = CASE WHEN j.status='READY
' THEN 0 ELSE 1 END,
884 status = }.$self->job_status_cast("CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED
' ELSE 'READY
' END").qq{
886 LEFT JOIN semaphore s
887 ON (j.job_id=s.dependent_job_id)
888 WHERE job.job_id=j.job_id AND $analyses_filter $statuses_filter
891 REPLACE INTO job (job_id, prev_job_id, analysis_id, input_id, param_id_stack, accu_id_stack, role_id, status, retry_count, when_completed, runtime_msec, query_count, controlled_semaphore_id)
899 CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED
' ELSE 'READY
' END,
900 CASE WHEN j.status='READY
' THEN 0 ELSE 1 END,
904 j.controlled_semaphore_id
906 LEFT JOIN semaphore s
907 ON (j.job_id=s.dependent_job_id)
908 WHERE $analyses_filter $statuses_filter
911 $self->dbc->protected_prepare_execute( [$sql3],
912 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'resetting jobs
'.$after, 'INFO
' ); }
915 foreach my $analysis ( @$list_of_analyses ) {
916 $self->db->get_AnalysisStatsAdaptor->update_status($analysis->dbID, 'LOADING
');
919 } ); # end of transaction
923 =head2 unblock_jobs_for_analysis_id
925 Arg [1] : list-ref of int $analysis_id
926 Description: Sets all the SEMAPHORED jobs of the given analyses to READY and also unblocks their upstream semaphores
927 Caller : beekeeper.pl and guiHive
931 sub unblock_jobs_for_analysis_id {
932 my ($self, $list_of_analyses) = @_;
934 my $analyses_filter = 'analysis_id IN (
'.join(',
', map { $_->dbID } @$list_of_analyses).')
';
936 # Get the list of semaphored jobs together with their semaphores, and unblock both (previously semaphored jobs become 'READY
')
938 if($self->dbc->driver eq 'mysql
') { # MySQL supports updating multiple tables at once
943 ON (j.job_id=s.dependent_job_id)
944 SET s.local_jobs_counter=0, s.remote_jobs_counter=0, j.status = 'READY
'
945 WHERE $analyses_filter AND j.status = 'SEMAPHORED
'
947 $self->dbc->protected_prepare_execute( [$sql],
948 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'unblocking jobs
'.$after, 'INFO
' ); }
951 } elsif ($self->dbc->driver eq 'pgsql
') {
955 SET local_jobs_counter=0, remote_jobs_counter=0
957 WHERE $analyses_filter AND j.job_id = s.dependent_job_id AND j.status = 'SEMAPHORED
'
959 $self->dbc->do($sql1);
964 WHERE $analyses_filter AND j.status = 'SEMAPHORED
'
966 $self->dbc->do($sql2);
971 REPLACE INTO semaphore (semaphore_id, local_jobs_counter, remote_jobs_counter, dependent_job_id, dependent_semaphore_url)
972 SELECT s.semaphore_id,
976 s.dependent_semaphore_url
979 ON (j.job_id = s.dependent_job_id)
980 WHERE $analyses_filter AND j.status = 'SEMAPHORED
'
982 $self->dbc->do($sql1);
987 WHERE $analyses_filter AND status = 'SEMAPHORED
'
989 $self->dbc->do($sql2);
992 foreach my $analysis ( @$list_of_analyses ) {
993 $self->db->get_AnalysisStatsAdaptor->update_status($analysis->dbID, 'LOADING
');
998 =head2 discard_jobs_for_analysis_id
1000 Arg [1] : list-ref of int $analysis_id
1001 Arg [2] : filter status
1002 Description: Resets all $input_status jobs of the matching analyses to DONE.
1003 Semaphores are updated accordingly.
1004 Caller : beekeeper.pl and guiHive
1008 sub discard_jobs_for_analysis_id {
1009 my ($self, $list_of_analyses, $input_status) = @_;
1011 $self->balance_semaphores( $list_of_analyses );
1013 my $analyses_filter = 'analysis_id IN (
'.join(',
', map { $_->dbID } @$list_of_analyses).')
';
1014 my $status_filter = $input_status ? " AND status = '$input_status
'" : "";
1016 # Get the list of semaphores, and by how much their local_jobs_counter should be decreased.
1018 SELECT controlled_semaphore_id, COUNT(*) AS local_delta
1020 WHERE controlled_semaphore_id IS NOT NULL
1021 AND $analyses_filter $status_filter
1022 GROUP BY controlled_semaphore_id
1028 WHERE controlled_semaphore_id = ?
1029 AND $analyses_filter $status_filter
1035 WHERE controlled_semaphore_id IS NULL
1036 AND $analyses_filter $status_filter
1039 # Run in a transaction to ensure we see a consistent state of the job
1040 # statuses and semaphore counts.
1041 $self->dbc->run_in_transaction( sub {
1043 # let's reset work on the jobs that don
't have a controlled_semaphore_id
1044 $self->dbc->protected_prepare_execute( [$sql3],
1045 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'discarding jobs
'.$after, 'INFO
' ); }
1048 my $semaphore_adaptor = $self->db->get_SemaphoreAdaptor;
1050 # Update all the semaphored jobs one-by-one
1051 my $sth1 = $self->prepare($sql1);
1052 my $sth2 = $self->prepare($sql2);
1054 while (my ($semaphore_id, $local_delta) = $sth1->fetchrow_array()) {
1056 $sth2->execute( $semaphore_id ); # First mark the jobs as DONE
1058 my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1059 $semaphore->decrease_by( $local_delta ); # then decrease the local_jobs_counters, recursively releasing if ripe
1064 my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
1066 foreach my $analysis ( @$list_of_analyses ) {
1067 $analysis_stats_adaptor->update_status($analysis->dbID, 'LOADING
');
1070 } ); # end of transaction
1074 =head2 balance_semaphores
1076 Description: Reset all semaphore_counts to the numbers of unDONE semaphoring jobs.
1080 sub balance_semaphores {
1081 my ($self, $list_of_analyses) = @_;
1083 my $analysis_filter = $list_of_analyses
1084 ? "funnel.analysis_id IN (".join(',
', map { $_->dbID } @$list_of_analyses).") AND"
1089 SELECT s.semaphore_id, s.local_jobs_counter AS was, COALESCE(COUNT(CASE WHEN fan.status NOT IN ($ALL_STATUSES_OF_COMPLETE_JOBS) THEN 1 ELSE NULL END),0) AS should
1091 LEFT JOIN job fan ON (s.semaphore_id=fan.controlled_semaphore_id)
1092 LEFT JOIN job funnel ON (s.dependent_job_id=funnel.job_id)
1093 WHERE $analysis_filter
1094 funnel.status in ('SEMAPHORED
', 'READY
')
1095 GROUP BY s.semaphore_id
1096 ) AS internal WHERE was<>should OR should=0
1099 my $rebalanced_jobs_counter = 0;
1101 # Run in a transaction to ensure we see a consistent state of the job
1102 # statuses and semaphore counts.
1103 $self->dbc->run_in_transaction( sub {
1105 my $find_sth = $self->prepare($find_sql);
1106 my $semaphore_adaptor = $self->db->get_SemaphoreAdaptor;
1108 $find_sth->execute();
1109 while(my ($semaphore_id, $was, $should) = $find_sth->fetchrow_array()) {
1113 $msg = "Semaphore $semaphore_id local_jobs_counter has to be decreased $was -> $should, performing it now with a potential release";
1114 $self->db->get_LogMessageAdaptor->store_hive_message( $msg, 'PIPELINE_CAUTION
' );
1116 my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1117 $semaphore->decrease_by( $was-$should ); # decrease the local_jobs_counter, recursively releasing if ripe
1119 $rebalanced_jobs_counter++;
1120 } elsif($was<$should) {
1122 $msg = "Semaphore $semaphore_id local_jobs_counter has to be increased $was -> $should, performing it now with a potential reblock";
1123 $self->db->get_LogMessageAdaptor->store_hive_message( $msg, 'PIPELINE_CAUTION
' );
1125 my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1126 $semaphore->reblock_by( $should-$was ); # increase the local_jobs_counter, reblock recursively if needed
1128 $rebalanced_jobs_counter++;
1130 my $semaphore = $semaphore_adaptor->fetch_by_dbID( $semaphore_id );
1131 # check_if_ripe does the same but with an extra call to the database
1132 if( $semaphore->local_jobs_counter + $semaphore->remote_jobs_counter <= 0) {
1133 $msg = "Semaphore $semaphore_id is marked as blocked despite nothing blocking it, releasing it now";
1134 $self->db->get_LogMessageAdaptor->store_hive_message( $msg, 'PIPELINE_CAUTION
' );
1136 $semaphore->release_if_ripe();
1139 warn "[Semaphore $semaphore_id] $msg\n" if $msg; # TODO: integrate the STDERR diagnostic output with LogMessageAdaptor calls in general
1143 } ); # end of transaction
1145 return $rebalanced_jobs_counter;
1149 sub fetch_input_ids_for_job_ids {
1150 my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
1156 if( $job_ids_csv ) {
1158 my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)";
1159 my $sth = $self->prepare( $sql );
1162 while(my ($job_id, $input_id) = $sth->fetchrow_array() ) {
1163 if($input_id =~ /^_ext(?:\w+)_data_id (\d+)$/) {
1164 $input_id = $self->db->get_AnalysisDataAdaptor->fetch_by_analysis_data_id_TO_data($1);
1166 $input_ids{$job_id * $id_scale + $id_offset} = $input_id;