9 The Queen of the Hive based job control system is responsible to
'birthing' the
10 correct number of workers of the right type so that they can find jobs to
do.
11 It will also free up jobs of Workers that died unexpectantly so that other workers
14 Hive based processing is a concept based on a more controlled version
15 of an autonomous agent type system. Each worker is not told what to
do
16 (like a centralized control system - like the current pipeline system)
17 but rather queries a central database
for jobs (give me jobs).
19 Each worker is linked to an analysis_id, registers its
self on creation
20 into the Hive, creates a RunnableDB instance of the Analysis->module,
21 gets $analysis->batch_size jobs from the job table, does its work,
22 creates the next layer of job entries by interfacing to
23 the DataflowRuleAdaptor to determine the analyses it needs to pass its
24 output data to and creates jobs on the next analysis database.
25 It repeats
this cycle until it has lived its lifetime or until there are no
27 The lifetime limit is just a safety limit to prevent these from
'infecting'
30 The Queens job is to simply birth Workers of the correct analysis_id to get the
31 work down. The only other thing the Queen does is free up jobs that were
32 claimed by Workers that died unexpectantly so that other workers can take
35 The Beekeeper is in charge of interfacing between the Queen and a compute resource
36 or
'compute farm'. Its job is to query Queens
if they need any workers and to
37 send the requested number of workers to open machines via the runWorker.pl script.
38 It is also responsible
for interfacing with the Queen to identify worker which died
43 See the NOTICE file distributed with
this work
for additional information
44 regarding copyright ownership.
46 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
47 You may obtain a copy of the License at
51 Unless required by applicable law or agreed to in writing, software distributed under the License
52 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
53 See the License
for the specific language governing permissions and limitations under the License.
57 Please subscribe to the
Hive mailing list: http:
61 The rest of the documentation details each of the
object methods.
62 Internal methods are usually preceded with a _
67 package Bio::EnsEMBL::Hive::Queen;
74 use
Bio::EnsEMBL::Hive::Utils (
'destringify',
'dir_revhash',
'whoami',
'print_aligned_fields'); # NB: some are needed by invisible code
82 use base (
'Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
84 sub default_table_name {
89 sub default_input_column_mapping {
92 'when_submitted' => $self->dbc->_interval_seconds_sql(
'when_submitted') .
' seconds_since_when_submitted',
97 sub do_not_update_columns {
98 return [
'when_submitted'];
104 return 'Bio::EnsEMBL::Hive::Worker';
108 ############################
112 ############################
115 =head2 create_new_worker
117 Description: Creates an entry in the worker table,
118 populates some non-storable attributes
119 and returns a
Worker object based on that insert.
120 This guarantees that each worker registered in
this Queen's hive is properly registered.
121 Returntype : Bio::EnsEMBL::Hive::Worker
122 Caller : runWorker.pl
126 sub create_new_worker {
130 my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id,
131 $no_write, $debug, $worker_base_temp_dir, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize,
132 $worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files)
133 = @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id
134 -no_write -debug -worker_base_temp_dir -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize
135 -worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)};
137 sleep( $worker_delay_startup_seconds // 0 ); # NB: undefined parameter would have caused eternal sleep!
139 if( defined( $worker_crash_on_startup_prob ) ) {
140 if( rand(1) < $worker_crash_on_startup_prob ) {
141 die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)";
145 my $default_config = Bio::EnsEMBL::Hive::Utils::Config->new(@$config_files);
146 my ($meadow, $process_id, $meadow_host, $meadow_user) = Bio::EnsEMBL::Hive::Valley->new( $default_config )->whereami();
147 die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user);
148 my $meadow_type = $meadow->type;
149 my $meadow_name = $meadow->cached_name;
151 foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) {
152 # So far 'RELOCATED events
' has been detected on LSF 9.0 in response to sending signal #99 or #100
153 # Since I don't know how to avoid them, I am trying to
register them when they happen.
154 # The following snippet buries the previous incarnation of the
Worker before starting a
new one.
156 # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
157 # LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
158 # So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
160 $self->register_worker_death( $prev_worker_incarnation );
167 my $max_registration_seconds = $meadow->config_get(
'MaxRegistrationSeconds');
168 my $seconds_waited = 0;
169 my $seconds_more = 5; # step increment
171 until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
172 my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
173 if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
174 my $msg =
"Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
175 $log_message_adaptor->store_hive_message($msg,
'WORKER_ERROR' );
178 $log_message_adaptor->store_hive_message(
"Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...",
'WORKER_CAUTION' );
179 sleep($seconds_more);
180 $seconds_waited += $seconds_more;
184 # only update the fields that were not available at the time of submission:
185 $worker->meadow_host( $meadow_host );
186 $worker->meadow_user( $meadow_user );
187 $worker->when_born(
'CURRENT_TIMESTAMP' );
188 $worker->status(
'READY' );
190 $self->update( $worker );
195 if( defined($resource_class_name) ) {
196 $resource_class = $self->db->hive_pipeline->collection_of(
'ResourceClass')->find_one_by(
'name' => $resource_class_name)
197 or die
"resource_class with name='$resource_class_name' could not be fetched from the database";
198 } elsif( defined($resource_class_id) ) {
199 $resource_class = $self->db->hive_pipeline->collection_of(
'ResourceClass')->find_one_by(
'dbID', $resource_class_id)
200 or die
"resource_class with dbID='$resource_class_id' could not be fetched from the database";
204 'meadow_type' => $meadow_type,
205 'meadow_name' => $meadow_name,
206 'process_id' => $process_id,
207 'resource_class' => $resource_class,
208 'beekeeper_id' => $beekeeper_id,
210 'meadow_host' => $meadow_host,
211 'meadow_user' => $meadow_user,
215 $self->store( $worker );
217 $worker->when_born(
'CURRENT_TIMESTAMP' );
218 $self->update_when_born( $worker );
220 $self->refresh( $worker );
224 $worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
225 $worker->set_temp_directory_name( $worker_base_temp_dir || $meadow->config_get(
'BaseTempDirectory') );
229 if(defined($job_limit)) {
230 $worker->job_limiter($job_limit);
231 $worker->life_span(0);
234 $worker->life_span($life_span * 60)
if($life_span); # $life_span min -> sec
236 $worker->execute_writes(0)
if($no_write);
238 $worker->perform_cleanup(0)
if($no_cleanup);
240 $worker->debug($debug)
if($debug);
242 $worker->retry_throwing_jobs($retry_throwing_jobs)
if(defined $retry_throwing_jobs);
244 $worker->can_respecialize($can_respecialize)
if(defined $can_respecialize);
250 =head2 specialize_worker
252 Description: If analysis_id or logic_name is specified it will
try to specialize the Worker into
this analysis.
253 If not specified the Queen will analyze the hive and pick the most suitable analysis.
258 sub specialize_worker {
260 my $worker = shift @_;
261 my $flags = shift @_;
263 my ($analyses_pattern, $job_id, $force)
264 = @$flags{qw(-analyses_pattern -job_id -force)};
266 if( $analyses_pattern and $job_id ) {
267 die
"At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
274 $worker->
worker_say(
"resetting and fetching job for job_id '$job_id'");
276 my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
278 my $job = $job_adaptor->fetch_by_dbID( $job_id )
279 or die
"Could not fetch job with dbID='$job_id'";
280 my $job_status = $job->status();
282 if ($Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor::ALL_STATUSES_OF_TAKEN_JOBS =~ /
'$job_status'/) {
283 die
"Job with dbID='$job_id' is already in progress, cannot run"; # FIXME:
try GC first, then complain
284 } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
285 die
"Job with dbID='$job_id' is $job_status, please use --force to override";
288 $analysis = $job->analysis;
289 if(($analysis->stats->status eq
'BLOCKED') and !$force) {
290 die
"Analysis is BLOCKED, can't specialize a worker. Please use --force to override";
293 if(($job_status eq
'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
294 $worker->worker_say(
"Increasing the semaphore count of the dependent job");
295 $controlled_semaphore->increase_by( [ $job ] );
298 $analysis->stats->adaptor->increment_a_counter( $Bio::EnsEMBL::Hive::AnalysisStats::status2counter{$job->status}, -1, $job->analysis_id );
303 my $analyses_matching_pattern = $worker->hive_pipeline->collection_of(
'Analysis' )->find_all_by_pattern( $analyses_pattern );
305 # refresh the stats of matching analyses before re-specialization:
306 foreach my $analysis ( @$analyses_matching_pattern ) {
307 $analysis->stats->refresh();
309 $self->db->hive_pipeline->invalidate_hive_current_load;
313 unless( ref($analysis) ) {
315 $worker->cause_of_death(
'NO_ROLE');
324 'analysis' => $analysis,
326 $self->db->get_RoleAdaptor->store( $new_role );
327 $worker->current_role( $new_role );
329 my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
332 my $role_id = $new_role->
dbID;
333 if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
335 $worker->special_batch( [ $job ] );
337 die
"Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
340 }
else { # Note: special batch Workers should avoid flipping the status to
'WORKING' in
case the analysis is still
'BLOCKED'
342 $analysis_stats_adaptor->update_status($analysis->dbID,
'WORKING');
345 # The following increment used to be done only when no specific task was given to the worker,
346 # thereby excluding such "special task" workers from being counted in num_running_workers.
348 # However this may be tricky to emulate by triggers that know nothing about "special tasks",
349 # so I am (temporarily?) simplifying the accounting algorithm.
351 $analysis_stats_adaptor->increment_a_counter(
'num_running_workers', 1, $analysis->dbID );
355 sub register_worker_death {
356 my ($self, $worker, $update_when_checked_in) = @_;
358 my $worker_id = $worker->dbID;
359 my $work_done = $worker->work_done;
360 my $cause_of_death = $worker->cause_of_death ||
'UNKNOWN'; # make sure we
do not attempt to insert a
void
361 my $worker_died = $worker->when_died;
363 my $current_role = $worker->current_role;
365 unless( $current_role ) {
366 $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
369 if( $current_role and !$current_role->when_finished() ) {
370 # List of cause_of_death:
371 # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG'
372 # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN'
373 my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN|SEE_EXIT_STATUS)$/);
374 $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
375 $current_role->when_finished( $worker_died );
376 $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
379 my $sql = "UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'"
380 . ( $update_when_checked_in ? ', when_checked_in=CURRENT_TIMESTAMP ' : '' )
381 . ( $worker_died ? ", when_died='$worker_died'" : ', when_died=CURRENT_TIMESTAMP' )
382 . " WHERE worker_id='$worker_id' ";
384 $self->dbc->protected_prepare_execute( [ $sql ],
385 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker,
"register_worker_death".$after,
'INFO' ); }
390 sub kill_all_workers {
391 my ( $self, $valley ) = @_;
393 my $this_meadow_user = whoami();
394 my $all_workers_considered_alive = $self->fetch_all(
"status!='DEAD'" );
395 foreach my $worker ( @{ $all_workers_considered_alive } ) {
398 my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker );
399 if ( ! defined $meadow ) {
400 # Most likely a meadow not reachable for the current beekeeper,
401 # e.g. a LOCAL one started on a different host.
402 $kill_status =
'meadow not reachable';
404 elsif ( ! $meadow->can(
'kill_worker') ) {
405 $kill_status =
'killing workers not supported by the meadow';
407 elsif ( $worker->meadow_user eq $this_meadow_user ) { #
if I
'm actually allowed to kill the worker...
408 # The actual termination of a worker might well be asynchronous
409 # but at least we check for obvious problems, e.g. insufficient
410 # permissions to execute a kill.
411 my $kill_return_value = $meadow->kill_worker( $worker, 1 );
412 if ( $kill_return_value != 0 ) {
413 $kill_status = "request failure (return code: ${kill_return_value})";
416 $kill_status = 'requested successfully
';
417 $worker->cause_of_death( 'KILLED_BY_USER
' );
418 $self->register_worker_death( $worker );
419 if ( $worker->status ne 'SUBMITTED
' ) { # There is no worker_temp_directory before specialization
420 $meadow->cleanup_temp_directory( $worker );
425 $kill_status = 'could not kill, running under user
' . $worker->meadow_user;
428 print 'Killing worker
' . $worker->dbID() . ':
'
429 . $worker->toString( 1 ) . " -> $kill_status \n";
436 sub cached_resource_mapping {
438 $self->{'_cached_resource_mapping
'} ||= { map { $_->dbID => $_->name } $self->db->hive_pipeline->collection_of('ResourceClass
')->list };
439 return $self->{'_cached_resource_mapping
'};
443 sub registered_workers_attributes {
446 return $self->fetch_all("status!='DEAD
'", 1, ['meadow_type
', 'meadow_name
', 'meadow_user
', 'process_id
'], 'status
' );
450 sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
451 my ($self, $meadow_user) = @_;
453 my $worker_counts_by_meadow_type_rc_id = $self->count_all("status='SUBMITTED
' AND meadow_user='$meadow_user
'", ['meadow_type
', 'resource_class_id
'] );
454 my $cached_resource_mapping = $self->cached_resource_mapping;
456 my %counts_by_meadow_type_rc_name = ();
458 while(my ($meadow_type, $counts_by_rc_id) = each %$worker_counts_by_meadow_type_rc_id) {
459 while(my ($rc_id, $count) = each %$counts_by_rc_id) {
460 my $rc_name = $cached_resource_mapping->{ $rc_id } || '__undefined_rc_name__
';
461 $counts_by_meadow_type_rc_name{ $meadow_type }{ $rc_name } = $count;
465 return \%counts_by_meadow_type_rc_name;
469 sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
470 my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
472 my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
474 print "GarbageCollector:\tChecking for lost Workers...\n";
476 # all non-DEAD workers found in the database, with their meadow status
477 my $reconciled_worker_statuses = $valley->query_worker_statuses( $self->registered_workers_attributes );
478 # selects the workers available in this valley. does not query the database / meadow
479 my $signature_and_pid_to_worker_status = $valley->status_of_all_our_workers_by_meadow_signature( $reconciled_worker_statuses );
480 # this may pick up workers that have been created since the last fetch
481 my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
483 if (@$queen_overdue_workers) {
484 print "GarbageCollector:\tOut of the ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n
";
486 print "GarbageCollector:\tfound none (all have checked in during the last $last_few_seconds seconds)\n
";
489 my $this_meadow_user = whoami();
491 my %meadow_status_counts = ();
492 my %mt_and_pid_to_lost_worker = ();
493 foreach my $worker (@$queen_overdue_workers) {
495 my $meadow_signature = $worker->meadow_type.'/'.$worker->meadow_name;
496 if(my $pid_to_worker_status = $signature_and_pid_to_worker_status->{$meadow_signature}) { # the whole Meadow subhash is either present or the Meadow is unreachable
498 my $meadow_type = $worker->meadow_type;
499 my $process_id = $worker->process_id;
500 my $status = $pid_to_worker_status->{$process_id} // 'DEFERRED_CHECK'; # Workers that have been created between registered_workers_attributes and fetch_overdue_workers
502 if($bury_unkwn_workers and ($status eq 'UNKWN')) {
503 if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
504 if($meadow->can('kill_worker')) {
505 if($worker->meadow_user eq $this_meadow_user) { # if I'm actually allowed to kill the worker...
506 print "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id\n
";
508 $meadow->kill_worker($worker, 1);
515 $meadow_status_counts{$meadow_signature}{$status}++;
517 if(($status eq 'LOST') or ($status eq 'SUBMITTED') or ($status eq 'COMPILATION')) {
519 $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
521 } elsif ($status eq 'DEFERRED_CHECK') {
523 # do nothing now, wait until the next pass to check on this worker
527 # RUN|PEND|xSUSP handling
528 my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id=
'".$worker->dbID."'";
529 $self->dbc->protected_prepare_execute( [ $update_when_seen_sql ],
530 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "see_worker
".$after, 'INFO' ); }
534 $meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
538 # print a quick summary report:
539 while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
540 print "GarbageCollector:\t[$meadow_signature Meadow:]\t
".join(', ', map { "$_:$status_count->{$_}
" } keys %$status_count )."\n\n
";
543 while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
544 my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
546 if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
547 print "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n
";
551 if($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
552 my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
553 print "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type workers died:\n
";
554 print "" . join (",
", (map { $_->{'cause_of_death'} } values %$report_entries)) . "\n
";
556 print "GarbageCollector:\tUnknown why $lost_this_meadow $meadow_type workers died\n
";
559 print "GarbageCollector:\tRecording workers
' missing attributes, registering their death, releasing their jobs and cleaning up temp directories\n";
560 while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
561 if(my $report_entry = $report_entries && $report_entries->{$process_id}) {
562 my @updated_attribs = ();
563 foreach my $worker_attrib ( qw(when_born meadow_host when_died cause_of_death) ) {
564 if( defined( $report_entry->{$worker_attrib} ) ) {
565 $worker->$worker_attrib( $report_entry->{$worker_attrib} );
566 push @updated_attribs, $worker_attrib;
569 $self->update( $worker, @updated_attribs ) if(scalar(@updated_attribs));
572 my $max_limbo_seconds = $this_meadow->config_get('MaxLimboSeconds
') // 0; # The maximum time for a Meadow to start showing the Worker (even in PEND state) after submission.
573 # We use it as a timeout for burying SUBMITTED and Meadow-invisible entries in the 'worker
' table.
575 if( ($worker->status ne 'SUBMITTED
')
576 || $worker->when_died # reported by Meadow as DEAD (only if Meadow supports get_report_entries_for_process_ids)
577 || ($worker->seconds_since_when_submitted > $max_limbo_seconds) ) { # SUBMITTED and Meadow-invisible for too long => we consider them LOST
579 $worker->cause_of_death('LIMBO
') if( ($worker->status eq 'SUBMITTED
') and !$worker->cause_of_death); # LIMBO cause_of_death means: found in SUBMITTED state, exceeded the timeout, Meadow did not tell us more
581 $self->register_worker_death( $worker );
583 if( ($worker->status ne 'SUBMITTED
') # There is no worker_temp_directory before specialization
584 and ($worker->meadow_user eq $this_meadow_user) ) { # if I'm actually allowed to kill the worker...
585 $this_meadow->cleanup_temp_directory( $worker );
590 if( $report_entries && %$report_entries ) { # use the opportunity to also store resource
usage of the buried workers:
591 my $processid_2_workerid = {
map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
592 $self->store_resource_usage( $report_entries, $processid_2_workerid );
597 # the following bit is completely Meadow-agnostic and only restores database integrity:
598 if($check_buried_in_haste) {
599 my $role_adaptor = $self->db->get_RoleAdaptor;
600 my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
602 print
"GarbageCollector:\tChecking for orphan roles...\n";
603 my $orphan_roles = $role_adaptor->fetch_all_unfinished_roles_of_dead_workers();
604 if(my $orphan_role_number = scalar @$orphan_roles) {
605 print
"GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
606 foreach my $orphan_role (@$orphan_roles) {
607 $role_adaptor->finalize_role( $orphan_role );
610 print
"GarbageCollector:\tfound none\n";
613 print
"GarbageCollector:\tChecking for roles buried in haste...\n";
614 my $buried_in_haste_roles = $role_adaptor->fetch_all_finished_roles_with_unfinished_jobs();
615 if(my $bih_number = scalar @$buried_in_haste_roles) {
616 print
"GarbageCollector:\tfound $bih_number buried roles with unfinished jobs, reclaiming.\n\n";
617 foreach my $role (@$buried_in_haste_roles) {
618 $job_adaptor->release_undone_jobs_from_role( $role );
621 print
"GarbageCollector:\tfound none\n";
624 print
"GarbageCollector:\tChecking for orphan jobs...\n";
625 my $orphan_jobs = $job_adaptor->fetch_all_unfinished_jobs_with_no_roles();
626 if(my $sj_number = scalar @$orphan_jobs) {
627 print
"GarbageCollector:\tfound $sj_number unfinished jobs with no roles, reclaiming.\n\n";
628 foreach my $job (@$orphan_jobs) {
629 $job_adaptor->release_and_age_job($job->dbID, $job->analysis->max_retry_count, 1);
632 print
"GarbageCollector:\tfound none\n";
638 # To tackle the RELOCATED event: this method checks whether there are already workers with these attributes
639 sub find_previous_worker_incarnations {
640 my ($self, $meadow_type, $meadow_name, $process_id) = @_;
642 # This happens in standalone mode, when there is no database
643 return [] unless ref($self);
645 return $self->fetch_all(
"status!='DEAD' AND status!='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" );
649 sub fetch_preregistered_worker {
650 my ($self, $meadow_type, $meadow_name, $process_id) = @_;
652 # This happens in standalone mode, when there is no database
653 return [] unless ref($self);
655 my ($worker) = @{ $self->fetch_all(
"status='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" ) };
661 # a new version that both checks in and updates the status
662 sub check_in_worker {
663 my ($self, $worker) = @_;
665 my $sql =
"UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status='".$worker->status.
"', work_done='".$worker->work_done.
"' WHERE worker_id='".$worker->dbID.
"'";
667 $self->dbc->protected_prepare_execute( [ $sql ],
668 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker,
"check_in_worker".$after,
'INFO' ); }
673 =head2 reset_job_by_dbID_and_sync
677 my $job = $queen->reset_job_by_dbID_and_sync($job_id);
679 For the specified job_id it will fetch just that job,
680 reset it completely as
if it has never
run, and
return it.
681 Specifying a specific job bypasses the safety checks,
682 thus multiple workers could be running the
683 same job simultaneously (use only
for debugging).
686 Caller : beekeeper.pl
690 sub reset_job_by_dbID_and_sync {
691 my ($self, $job_id) = @_;
693 my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id);
695 my $stats = $job->analysis->stats;
697 $self->synchronize_AnalysisStats($stats);
701 ######################################
703 # Public API interface for beekeeper
705 ######################################
708 # Note: asking for Queen->fetch_overdue_workers(0) essentially means
709 # "fetch all workers known to the Queen not to be officially dead"
711 sub fetch_overdue_workers {
712 my ($self,$overdue_secs) = @_;
714 $overdue_secs = 3600 unless(defined($overdue_secs));
716 my $constraint =
"status!='DEAD' AND (when_checked_in IS NULL OR ". $self->dbc->_interval_seconds_sql(
'when_checked_in') .
" > $overdue_secs)";
718 return $self->fetch_all( $constraint );
722 =head2 synchronize_hive
724 Arg [1] : $list_of_analyses
725 Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
726 Description: Runs through all analyses in the given list and synchronizes
727 the analysis_stats summary with the states in the job and worker tables.
728 Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
734 sub synchronize_hive {
735 my ($self, $list_of_analyses) = @_;
737 my $start_time = time();
739 print
"\nSynchronizing the hive (".scalar(@$list_of_analyses).
" analyses this time):\n";
740 foreach my $analysis (@$list_of_analyses) {
741 $self->synchronize_AnalysisStats($analysis->stats);
742 print ( ($analysis->stats()->status eq
'BLOCKED') ?
'x' :
'o');
746 print
''.((time() - $start_time)).
" seconds to synchronize_hive\n\n";
750 =head2 safe_synchronize_AnalysisStats
753 Example : $self->safe_synchronize_AnalysisStats($stats);
754 Description: Prewrapper around synchronize_AnalysisStats that does
755 checks and grabs sync_lock before proceeding with sync.
756 Used by distributed worker sync system to avoid contention.
757 Returns 1 on success and 0
if the lock could not have been obtained,
758 and so no sync was attempted.
764 sub safe_synchronize_AnalysisStats {
765 my ($self, $stats) = @_;
768 my $was_synching = $stats->sync_lock;
770 my $max_refresh_attempts = 5;
771 while($stats->sync_lock and $max_refresh_attempts--) { # another Worker/Beekeeper is synching
this analysis right now
772 # ToDo: it would be nice to report the detected collision
774 $stats->refresh(); # just
try to avoid collision
777 # The sync has just completed and we have the freshest stats
778 if ($was_synching && !$stats->sync_lock) {
779 return 'sync_done_by_friend';
782 unless( ($stats->status eq
'DONE')
783 or ( ($stats->status eq
'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) {
785 # In case $stats->sync_lock is set, this is basically giving it one last chance
786 my $sql =
"UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
787 "WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
789 my $row_count = $self->dbc->do($sql); #
try to claim the sync_lock
791 if( $row_count == 1 ) { #
if we managed to obtain the lock, let
's go and perform the sync:
792 if ($stats->sync_lock) {
793 # Actually the sync has just been completed by another agent. Save time and load the stats it computed
795 # And release the lock
796 $stats->sync_lock(0);
797 $stats->adaptor->update_sync_lock($stats);
798 return 'sync_done_by_friend
';
800 $self->synchronize_AnalysisStats($stats, 1);
803 # otherwise assume it's locked and just
return un-updated
808 return $stats->sync_lock ? 0 :
'stats_fresh_enough';
812 =head2 synchronize_AnalysisStats
815 Example : $self->synchronize_AnalysisStats( $stats );
816 Description: Queries the job and worker tables to get summary counts
817 and rebuilds the AnalysisStats
object.
818 Then updates the analysis_stats table with the
new summary
info.
824 sub synchronize_AnalysisStats {
825 my ($self, $stats, $has_refresh_just_been_done) = @_;
827 if( $stats and $stats->analysis_id ) {
829 $stats->refresh() unless $has_refresh_just_been_done;
831 my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
833 $stats->recalculate_from_job_counts( $job_counts );
835 # $stats->sync_lock(0); ## do we perhaps need it here?
836 $stats->update; #update and release sync_lock
841 =head2 check_nothing_to_run_but_semaphored
843 Arg [1] : $list_of_analyses
844 Example : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] );
845 Description: Counts the number of immediately runnable jobs in the given analyses.
851 sub check_nothing_to_run_but_semaphored { # make sure it is
run after a recent sync
852 my ($self, $list_of_analyses) = @_;
854 my $only_semaphored_jobs_to_run = 1;
855 my $total_semaphored_job_count = 0;
857 foreach my $analysis (@$list_of_analyses) {
858 my $stats = $analysis->stats;
860 $only_semaphored_jobs_to_run = 0
if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count );
861 $total_semaphored_job_count += $stats->semaphored_job_count;
864 return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run );
868 =head2 print_status_and_return_reasons_to_exit
870 Arg [1] : $list_of_analyses
872 Example : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] );
873 :
foreach my $reason_to_exit (@$reasons_to_exit) {
874 : my $exit_message = $reason_to_exit->{
'message'};
875 : my $exit_status = $reason_to_exit->{
'exit_status'};
876 Description: Runs through all analyses in the given list, reports failed analyses, and computes some totals.
877 : It returns a list of exit messages and status codes. Each element of the list is a hashref,
878 : with the exit message keyed by
'message' and the status code keyed by
'exit_status'
880 : Possible status codes are:
885 : If $debug is set, the list will contain all analyses. Otherwise, empty and done analyses
888 Caller : beekeeper.pl
892 sub print_status_and_return_reasons_to_exit {
893 my ($self, $list_of_analyses, $debug) = @_;
895 my ($total_done_jobs, $total_failed_jobs, $total_jobs, $total_excluded_jobs, $cpumsec_to_do) = (0) x 5;
896 my %skipped_analyses = (
'EMPTY' => [],
'DONE' => []);
897 my @analyses_to_display;
900 foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
901 my $stats = $analysis->stats;
902 my $failed_job_count = $stats->failed_job_count;
903 my $is_excluded = $stats->is_excluded;
905 if ($debug or !$skipped_analyses{$stats->status}) {
906 push @analyses_to_display, $analysis;
908 push @{$skipped_analyses{$stats->status}}, $analysis;
911 if ($failed_job_count > 0) {
912 $self->synchronize_AnalysisStats($stats);
913 $stats->determine_status();
916 my $logic_name = $analysis->logic_name;
917 my $tolerance = $analysis->failed_job_tolerance;
918 if( $stats->status eq
'FAILED') {
919 $exit_status =
'ANALYSIS_FAILED';
920 $failure_message =
"### Analysis '$logic_name' has FAILED (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
922 $exit_status =
'JOB_FAILED';
923 $failure_message =
"### Analysis '$logic_name' has failed jobs (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
925 push (@reasons_to_exit, {
'message' => $failure_message,
926 'exit_status' => $exit_status});
930 my $excluded_job_count = $stats->total_job_count - $stats->done_job_count - $failed_job_count;
931 $total_excluded_jobs += $excluded_job_count;
932 push @{$skipped_analyses{
'EXCLUDED'}}, $analysis;
934 $total_done_jobs += $stats->done_job_count;
935 $total_failed_jobs += $failed_job_count;
936 $total_jobs += $stats->total_job_count;
937 $cpumsec_to_do += $stats->ready_job_count * $stats->avg_msec_per_job;
940 my $total_jobs_to_do = $total_jobs - $total_done_jobs - $total_failed_jobs - $total_excluded_jobs; # includes SEMAPHORED, READY, CLAIMED, INPROGRESS
941 my $cpuhrs_to_do = $cpumsec_to_do / (1000.0*60*60);
942 my $percentage_completed = $total_jobs
943 ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs)
946 # We use print_aligned_fields instead of printing each AnalysisStats' toString(),
947 # so that the fields are all vertically aligned.
948 if (@analyses_to_display) {
949 my $template = $analyses_to_display[0]->stats->_toString_template;
950 my @all_fields =
map {$_->stats->_toString_fields} @analyses_to_display;
951 print_aligned_fields(\@all_fields, $template);
955 if (@{$skipped_analyses{
'EMPTY'}}) {
956 printf(
"%d analyses not shown because they don't have any jobs.\n", scalar(@{$skipped_analyses{
'EMPTY'}}));
958 if (@{$skipped_analyses{
'DONE'}}) {
959 printf(
"%d analyses not shown because all their jobs are done.\n", scalar(@{$skipped_analyses{
'DONE'}}));
961 printf(
"total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed + %d excluded = %d total)\n",
962 scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_excluded_jobs, $total_jobs);
964 unless( $total_jobs_to_do ) {
965 if ($total_excluded_jobs > 0) {
966 push (@reasons_to_exit, {
'message' =>
"### Some analyses are excluded ###",
967 'exit_status' =>
'NO_WORK'});
969 push (@reasons_to_exit, {
'message' =>
"### No jobs left to do ###",
970 'exit_status' =>
'NO_WORK'});
973 return \@reasons_to_exit;
977 =head2 register_all_workers_dead
979 Example : $queen->register_all_workers_dead();
980 Description: Registers all workers dead
982 Caller : beekeeper.pl
986 sub register_all_workers_dead {
989 my $all_workers_considered_alive = $self->fetch_all(
"status!='DEAD'" );
990 foreach my $worker (@{$all_workers_considered_alive}) {
991 $self->register_worker_death( $worker );
996 sub interval_workers_with_unknown_usage {
999 my %meadow_to_interval = ();
1002 SELECT meadow_type, meadow_name, MIN(when_submitted), IFNULL(max(when_died), MAX(when_submitted)), COUNT(*)
1004 LEFT JOIN worker_resource_usage u USING(worker_id)
1005 WHERE u.worker_id IS NULL
1006 GROUP BY meadow_type, meadow_name
1008 my $sth_times = $self->prepare( $sql_times );
1009 $sth_times->execute();
1010 while( my ($meadow_type, $meadow_name, $min_submitted, $max_died, $workers_count) = $sth_times->fetchrow_array() ) {
1011 $meadow_to_interval{$meadow_type}{$meadow_name} = {
1012 'min_submitted' => $min_submitted,
1013 'max_died' => $max_died,
1014 'workers_count' => $workers_count,
1017 $sth_times->finish();
1019 return \%meadow_to_interval;
1023 sub store_resource_usage {
1024 my ($self, $report_entries, $processid_2_workerid) = @_;
1026 # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
1028 my $sql_delete =
'DELETE FROM worker_resource_usage WHERE worker_id=?';
1029 my $sth_delete = $self->prepare( $sql_delete );
1031 my $sql_insert =
'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)';
1032 my $sth_insert = $self->prepare( $sql_insert );
1036 while( my ($process_id, $report_entry) = each %$report_entries ) {
1038 if( my $worker_id = $processid_2_workerid->{$process_id} ) {
1039 $sth_delete->execute( $worker_id );
1042 $sth_insert->execute( $worker_id, @$report_entry{
'exit_status',
'mem_megs',
'swap_megs',
'pending_sec',
'cpu_sec',
'lifespan_sec',
'exception_status'} ); # slicing hashref
1045 if($@ =~ /execute failed: Duplicate entry/s) { # ignore the collision with another parallel beekeeper
1046 $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id,
"Collision detected when storing resource_usage",
'WORKER_CAUTION' );
1052 push @not_ours, $process_id;
1053 #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
1056 $sth_delete->finish();
1057 $sth_insert->finish();