9 The Queen of the Hive based job control system is responsible to
'birthing' the
10 correct number of workers of the right type so that they can find jobs to
do.
11 It will also free up jobs of Workers that died unexpectantly so that other workers
14 Hive based processing is a concept based on a more controlled version
15 of an autonomous agent type system. Each worker is not told what to
do
16 (like a centralized control system - like the current pipeline system)
17 but rather queries a central database
for jobs (give me jobs).
19 Each worker is linked to an analysis_id, registers its
self on creation
20 into the Hive, creates a RunnableDB instance of the Analysis->module,
21 gets $analysis->batch_size jobs from the job table, does its work,
22 creates the next layer of job entries by interfacing to
23 the DataflowRuleAdaptor to determine the analyses it needs to pass its
24 output data to and creates jobs on the next analysis database.
25 It repeats
this cycle until it has lived its lifetime or until there are no
27 The lifetime limit is just a safety limit to prevent these from
'infecting'
30 The Queens job is to simply birth Workers of the correct analysis_id to get the
31 work down. The only other thing the Queen does is free up jobs that were
32 claimed by Workers that died unexpectantly so that other workers can take
35 The Beekeeper is in charge of interfacing between the Queen and a compute resource
36 or
'compute farm'. Its job is to query Queens
if they need any workers and to
37 send the requested number of workers to open machines via the runWorker.pl script.
38 It is also responsible
for interfacing with the Queen to identify worker which died
43 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
44 Copyright [2016-2024] EMBL-European Bioinformatics Institute
46 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
47 You may obtain a copy of the License at
51 Unless required by applicable law or agreed to in writing, software distributed under the License
52 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
53 See the License
for the specific language governing permissions and limitations under the License.
57 Please subscribe to the
Hive mailing list: http:
61 The rest of the documentation details each of the
object methods.
62 Internal methods are usually preceded with a _
67 package Bio::EnsEMBL::Hive::Queen;
74 use
Bio::EnsEMBL::Hive::Utils (
'destringify',
'dir_revhash',
'whoami',
'print_aligned_fields'); # NB: some are needed by invisible code
82 use base (
'Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
84 sub default_table_name {
89 sub default_input_column_mapping {
92 'when_submitted' => $self->dbc->_interval_seconds_sql(
'when_submitted') .
' seconds_since_when_submitted',
97 sub do_not_update_columns {
98 return [
'when_submitted'];
104 return 'Bio::EnsEMBL::Hive::Worker';
108 ############################
112 ############################
115 =head2 create_new_worker
117 Description: Creates an entry in the worker table,
118 populates some non-storable attributes
119 and returns a
Worker object based on that insert.
120 This guarantees that each worker registered in
this Queen's hive is properly registered.
121 Returntype : Bio::EnsEMBL::Hive::Worker
122 Caller : runWorker.pl
126 sub create_new_worker {
130 my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id,
131 $no_write, $debug, $worker_base_temp_dir, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize,
132 $worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files)
133 = @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id
134 -no_write -debug -worker_base_temp_dir -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize
135 -worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)};
137 sleep( $worker_delay_startup_seconds // 0 ); # NB: undefined parameter would have caused eternal sleep!
139 if( defined( $worker_crash_on_startup_prob ) ) {
140 if( rand(1) < $worker_crash_on_startup_prob ) {
141 die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)";
145 my $default_config = Bio::EnsEMBL::Hive::Utils::Config->new(@$config_files);
146 my ($meadow, $process_id, $meadow_host, $meadow_user) = Bio::EnsEMBL::Hive::Valley->new( $default_config )->whereami();
147 die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user);
148 my $meadow_type = $meadow->type;
149 my $meadow_name = $meadow->cached_name;
151 foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) {
152 # So far 'RELOCATED events
' has been detected on LSF 9.0 in response to sending signal #99 or #100
153 # Since I don't know how to avoid them, I am trying to
register them when they happen.
154 # The following snippet buries the previous incarnation of the
Worker before starting a
new one.
156 # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
157 # LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
158 # So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
160 $self->register_worker_death( $prev_worker_incarnation );
167 my $max_registration_seconds = $meadow->config_get(
'MaxRegistrationSeconds');
168 my $seconds_waited = 0;
169 my $seconds_more = 5; # step increment
171 until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
172 my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
173 if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
174 my $msg =
"Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
175 $log_message_adaptor->store_hive_message($msg,
'WORKER_ERROR' );
178 $log_message_adaptor->store_hive_message(
"Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...",
'WORKER_CAUTION' );
179 sleep($seconds_more);
180 $seconds_waited += $seconds_more;
184 # only update the fields that were not available at the time of submission:
185 $worker->meadow_host( $meadow_host );
186 $worker->meadow_user( $meadow_user );
187 $worker->when_born(
'CURRENT_TIMESTAMP' );
188 $worker->status(
'READY' );
190 $self->update( $worker );
195 if( defined($resource_class_name) ) {
196 $resource_class = $self->db->hive_pipeline->collection_of(
'ResourceClass')->find_one_by(
'name' => $resource_class_name)
197 or die
"resource_class with name='$resource_class_name' could not be fetched from the database";
198 } elsif( defined($resource_class_id) ) {
199 $resource_class = $self->db->hive_pipeline->collection_of(
'ResourceClass')->find_one_by(
'dbID', $resource_class_id)
200 or die
"resource_class with dbID='$resource_class_id' could not be fetched from the database";
204 'meadow_type' => $meadow_type,
205 'meadow_name' => $meadow_name,
206 'process_id' => $process_id,
207 'resource_class' => $resource_class,
208 'beekeeper_id' => $beekeeper_id,
210 'meadow_host' => $meadow_host,
211 'meadow_user' => $meadow_user,
215 $self->store( $worker );
217 $worker->when_born(
'CURRENT_TIMESTAMP' );
218 $self->update_when_born( $worker );
220 $self->refresh( $worker );
224 $worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
225 $worker->set_temp_directory_name( $worker_base_temp_dir || $meadow->config_get(
'BaseTempDirectory') );
229 if(defined($job_limit)) {
230 $worker->job_limiter($job_limit);
231 $worker->life_span(0);
234 $worker->life_span($life_span * 60)
if($life_span); # $life_span min -> sec
236 $worker->execute_writes(0)
if($no_write);
238 $worker->perform_cleanup(0)
if($no_cleanup);
240 $worker->debug($debug)
if($debug);
242 $worker->retry_throwing_jobs($retry_throwing_jobs)
if(defined $retry_throwing_jobs);
244 $worker->can_respecialize($can_respecialize)
if(defined $can_respecialize);
250 =head2 specialize_worker
252 Description: If analysis_id or logic_name is specified it will
try to specialize the Worker into
this analysis.
253 If not specified the Queen will analyze the hive and pick the most suitable analysis.
258 sub specialize_worker {
260 my $worker = shift @_;
261 my $flags = shift @_;
263 my ($analyses_pattern, $job_id, $force)
264 = @$flags{qw(-analyses_pattern -job_id -force)};
266 if( $analyses_pattern and $job_id ) {
267 die
"At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
274 $worker->
worker_say(
"resetting and fetching job for job_id '$job_id'");
276 my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
278 my $job = $job_adaptor->fetch_by_dbID( $job_id )
279 or die
"Could not fetch job with dbID='$job_id'";
280 my $job_status = $job->status();
282 if ($Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor::ALL_STATUSES_OF_TAKEN_JOBS =~ /
'$job_status'/) {
283 die
"Job with dbID='$job_id' is already in progress, cannot run"; # FIXME:
try GC first, then complain
284 } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
285 die
"Job with dbID='$job_id' is $job_status, please use --force to override";
288 $analysis = $job->analysis;
289 if(($analysis->stats->status eq
'BLOCKED') and !$force) {
290 die
"Analysis is BLOCKED, can't specialize a worker. Please use --force to override";
293 if(($job_status eq
'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
294 $worker->worker_say(
"Increasing the semaphore count of the dependent job");
295 $controlled_semaphore->increase_by( [ $job ] );
298 $analysis->stats->adaptor->increment_a_counter( $Bio::EnsEMBL::Hive::AnalysisStats::status2counter{$job->status}, -1, $job->analysis_id );
303 my $analyses_matching_pattern = $worker->hive_pipeline->collection_of(
'Analysis' )->find_all_by_pattern( $analyses_pattern );
305 # refresh the stats of matching analyses before re-specialization:
306 foreach my $analysis ( @$analyses_matching_pattern ) {
307 $analysis->stats->refresh();
309 $self->db->hive_pipeline->invalidate_hive_current_load;
313 unless( ref($analysis) ) {
315 $worker->cause_of_death(
'NO_ROLE');
324 'analysis' => $analysis,
326 $self->db->get_RoleAdaptor->store( $new_role );
327 $worker->current_role( $new_role );
329 my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
332 my $role_id = $new_role->
dbID;
333 if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
335 $worker->special_batch( [ $job ] );
337 die
"Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
340 }
else { # Note: special batch Workers should avoid flipping the status to
'WORKING' in
case the analysis is still
'BLOCKED'
342 $analysis_stats_adaptor->update_status($analysis->dbID,
'WORKING');
345 # The following increment used to be done only when no specific task was given to the worker,
346 # thereby excluding such "special task" workers from being counted in num_running_workers.
348 # However this may be tricky to emulate by triggers that know nothing about "special tasks",
349 # so I am (temporarily?) simplifying the accounting algorithm.
351 $analysis_stats_adaptor->increment_a_counter(
'num_running_workers', 1, $analysis->dbID );
355 sub register_worker_death {
356 my ($self, $worker, $update_when_checked_in) = @_;
358 my $worker_id = $worker->dbID;
359 my $work_done = $worker->work_done;
360 my $cause_of_death = $worker->cause_of_death ||
'UNKNOWN'; # make sure we
do not attempt to insert a
void
361 my $worker_died = $worker->when_died;
363 my $current_role = $worker->current_role;
365 unless( $current_role ) {
366 $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
369 if( $current_role and !$current_role->when_finished() ) {
370 # List of cause_of_death:
371 # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG'
372 # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN'
373 my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN|SEE_EXIT_STATUS)$/);
374 $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
375 $current_role->when_finished( $worker_died );
376 $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
379 my $sql = "UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'"
380 . ( $update_when_checked_in ? ', when_checked_in=CURRENT_TIMESTAMP ' : '' )
381 . ( $worker_died ? ", when_died='$worker_died'" : ', when_died=CURRENT_TIMESTAMP' )
382 . " WHERE worker_id='$worker_id' ";
384 $self->dbc->protected_prepare_execute( [ $sql ],
385 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker,
"register_worker_death".$after,
'INFO' ); }
390 sub kill_all_workers {
391 my ( $self, $valley ) = @_;
393 my $this_meadow_user = whoami();
394 my $all_workers_considered_alive = $self->fetch_all(
"status!='DEAD'" );
395 foreach my $worker ( @{ $all_workers_considered_alive } ) {
398 my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker );
399 if ( ! defined $meadow ) {
400 # Most likely a meadow not reachable for the current beekeeper,
401 # e.g. a LOCAL one started on a different host.
402 $kill_status =
'meadow not reachable';
404 elsif ( ! $meadow->can(
'kill_worker') ) {
405 $kill_status =
'killing workers not supported by the meadow';
407 elsif ( $worker->meadow_user eq $this_meadow_user ) { #
if I
'm actually allowed to kill the worker...
408 # The actual termination of a worker might well be asynchronous
409 # but at least we check for obvious problems, e.g. insufficient
410 # permissions to execute a kill.
411 my $kill_return_value = $meadow->kill_worker( $worker, 1 );
412 if ( $kill_return_value != 0 ) {
413 $kill_status = "request failure (return code: ${kill_return_value})";
416 $kill_status = 'requested successfully
';
417 $worker->cause_of_death( 'KILLED_BY_USER
' );
418 $self->register_worker_death( $worker );
419 if ( $worker->status ne 'SUBMITTED
' ) { # There is no worker_temp_directory before specialization
420 $meadow->cleanup_temp_directory( $worker );
425 $kill_status = 'could not kill, running under user
' . $worker->meadow_user;
428 print 'Killing worker
' . $worker->dbID() . ':
'
429 . $worker->toString( 1 ) . " -> $kill_status \n";
436 sub cached_resource_mapping {
438 $self->{'_cached_resource_mapping
'} ||= { map { $_->dbID => $_->name } $self->db->hive_pipeline->collection_of('ResourceClass
')->list };
439 return $self->{'_cached_resource_mapping
'};
443 sub registered_workers_attributes {
446 return $self->fetch_all("status!='DEAD
'", 1, ['meadow_type
', 'meadow_name
', 'meadow_user
', 'process_id
'], 'status
' );
450 sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
451 my ($self, $meadow_user) = @_;
453 my $worker_counts_by_meadow_type_rc_id = $self->count_all("status='SUBMITTED
' AND meadow_user='$meadow_user
'", ['meadow_type
', 'resource_class_id
'] );
454 my $cached_resource_mapping = $self->cached_resource_mapping;
456 my %counts_by_meadow_type_rc_name = ();
458 while(my ($meadow_type, $counts_by_rc_id) = each %$worker_counts_by_meadow_type_rc_id) {
459 while(my ($rc_id, $count) = each %$counts_by_rc_id) {
460 my $rc_name = $cached_resource_mapping->{ $rc_id } || '__undefined_rc_name__
';
461 $counts_by_meadow_type_rc_name{ $meadow_type }{ $rc_name } = $count;
465 return \%counts_by_meadow_type_rc_name;
469 sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
470 my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
472 my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
474 print "GarbageCollector:\tChecking for lost Workers...\n";
476 # all non-DEAD workers found in the database, with their meadow status
477 my $reconciled_worker_statuses = $valley->query_worker_statuses( $self->registered_workers_attributes );
478 # selects the workers available in this valley. does not query the database / meadow
479 my $signature_and_pid_to_worker_status = $valley->status_of_all_our_workers_by_meadow_signature( $reconciled_worker_statuses );
480 # this may pick up workers that have been created since the last fetch
481 my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
483 if (@$queen_overdue_workers) {
484 print "GarbageCollector:\tOut of the ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n
";
486 print "GarbageCollector:\tfound none (all have checked in during the last $last_few_seconds seconds)\n
";
489 my $this_meadow_user = whoami();
491 my %meadow_status_counts = ();
492 my %mt_and_pid_to_lost_worker = ();
493 foreach my $worker (@$queen_overdue_workers) {
495 my $meadow_signature = $worker->meadow_type.'/'.$worker->meadow_name;
496 if(my $pid_to_worker_status = $signature_and_pid_to_worker_status->{$meadow_signature}) { # the whole Meadow subhash is either present or the Meadow is unreachable
498 my $meadow_type = $worker->meadow_type;
499 my $process_id = $worker->process_id;
500 my $status = $pid_to_worker_status->{$process_id} // 'DEFERRED_CHECK'; # Workers that have been created between registered_workers_attributes and fetch_overdue_workers
502 if($bury_unkwn_workers and ($status eq 'UNKWN')) {
503 if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
504 if($meadow->can('kill_worker')) {
505 if($worker->meadow_user eq $this_meadow_user) { # if I'm actually allowed to kill the worker...
506 print "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id
";
508 $meadow->kill_worker($worker, 1);
515 $meadow_status_counts{$meadow_signature}{$status}++;
517 if(($status eq 'LOST') or ($status eq 'SUBMITTED')) {
519 $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
521 } elsif ($status eq 'DEFERRED_CHECK') {
523 # do nothing now, wait until the next pass to check on this worker
527 # RUN|PEND|xSUSP handling
528 my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id=
'".$worker->dbID."'";
529 $self->dbc->protected_prepare_execute( [ $update_when_seen_sql ],
530 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "see_worker
".$after, 'INFO' ); }
534 $meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
538 # print a quick summary report:
539 while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
540 print "GarbageCollector:\t[$meadow_signature Meadow:]\t
".join(', ', map { "$_:$status_count->{$_}
" } keys %$status_count )."\n\n
";
543 while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
544 my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
546 if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
547 print "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n
";
551 if($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
552 my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
553 print "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n
";
556 print "GarbageCollector:\tRecording workers
' missing attributes, registering their death, releasing their jobs and cleaning up temp directories\n";
557 while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
558 if(my $report_entry = $report_entries && $report_entries->{$process_id}) {
559 my @updated_attribs = ();
560 foreach my $worker_attrib ( qw(when_born meadow_host when_died cause_of_death) ) {
561 if( defined( $report_entry->{$worker_attrib} ) ) {
562 $worker->$worker_attrib( $report_entry->{$worker_attrib} );
563 push @updated_attribs, $worker_attrib;
566 $self->update( $worker, @updated_attribs ) if(scalar(@updated_attribs));
569 my $max_limbo_seconds = $this_meadow->config_get('MaxLimboSeconds
') // 0; # The maximum time for a Meadow to start showing the Worker (even in PEND state) after submission.
570 # We use it as a timeout for burying SUBMITTED and Meadow-invisible entries in the 'worker
' table.
572 if( ($worker->status ne 'SUBMITTED
')
573 || $worker->when_died # reported by Meadow as DEAD (only if Meadow supports get_report_entries_for_process_ids)
574 || ($worker->seconds_since_when_submitted > $max_limbo_seconds) ) { # SUBMITTED and Meadow-invisible for too long => we consider them LOST
576 $worker->cause_of_death('LIMBO
') if( ($worker->status eq 'SUBMITTED
') and !$worker->cause_of_death); # LIMBO cause_of_death means: found in SUBMITTED state, exceeded the timeout, Meadow did not tell us more
578 $self->register_worker_death( $worker );
580 if( ($worker->status ne 'SUBMITTED
') # There is no worker_temp_directory before specialization
581 and ($worker->meadow_user eq $this_meadow_user) ) { # if I'm actually allowed to kill the worker...
582 $this_meadow->cleanup_temp_directory( $worker );
587 if( $report_entries && %$report_entries ) { # use the opportunity to also store resource
usage of the buried workers:
588 my $processid_2_workerid = {
map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
589 $self->store_resource_usage( $report_entries, $processid_2_workerid );
594 # the following bit is completely Meadow-agnostic and only restores database integrity:
595 if($check_buried_in_haste) {
596 my $role_adaptor = $self->db->get_RoleAdaptor;
597 my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
599 print
"GarbageCollector:\tChecking for orphan roles...\n";
600 my $orphan_roles = $role_adaptor->fetch_all_unfinished_roles_of_dead_workers();
601 if(my $orphan_role_number = scalar @$orphan_roles) {
602 print
"GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
603 foreach my $orphan_role (@$orphan_roles) {
604 $role_adaptor->finalize_role( $orphan_role );
607 print
"GarbageCollector:\tfound none\n";
610 print
"GarbageCollector:\tChecking for roles buried in haste...\n";
611 my $buried_in_haste_roles = $role_adaptor->fetch_all_finished_roles_with_unfinished_jobs();
612 if(my $bih_number = scalar @$buried_in_haste_roles) {
613 print
"GarbageCollector:\tfound $bih_number buried roles with unfinished jobs, reclaiming.\n\n";
614 foreach my $role (@$buried_in_haste_roles) {
615 $job_adaptor->release_undone_jobs_from_role( $role );
618 print
"GarbageCollector:\tfound none\n";
621 print
"GarbageCollector:\tChecking for orphan jobs...\n";
622 my $orphan_jobs = $job_adaptor->fetch_all_unfinished_jobs_with_no_roles();
623 if(my $sj_number = scalar @$orphan_jobs) {
624 print
"GarbageCollector:\tfound $sj_number unfinished jobs with no roles, reclaiming.\n\n";
625 foreach my $job (@$orphan_jobs) {
626 $job_adaptor->release_and_age_job($job->dbID, $job->analysis->max_retry_count, 1);
629 print
"GarbageCollector:\tfound none\n";
635 # To tackle the RELOCATED event: this method checks whether there are already workers with these attributes
636 sub find_previous_worker_incarnations {
637 my ($self, $meadow_type, $meadow_name, $process_id) = @_;
639 # This happens in standalone mode, when there is no database
640 return [] unless ref($self);
642 return $self->fetch_all(
"status!='DEAD' AND status!='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" );
646 sub fetch_preregistered_worker {
647 my ($self, $meadow_type, $meadow_name, $process_id) = @_;
649 # This happens in standalone mode, when there is no database
650 return [] unless ref($self);
652 my ($worker) = @{ $self->fetch_all(
"status='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" ) };
658 # a new version that both checks in and updates the status
659 sub check_in_worker {
660 my ($self, $worker) = @_;
662 my $sql =
"UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status='".$worker->status.
"', work_done='".$worker->work_done.
"' WHERE worker_id='".$worker->dbID.
"'";
664 $self->dbc->protected_prepare_execute( [ $sql ],
665 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker,
"check_in_worker".$after,
'INFO' ); }
670 =head2 reset_job_by_dbID_and_sync
674 my $job = $queen->reset_job_by_dbID_and_sync($job_id);
676 For the specified job_id it will fetch just that job,
677 reset it completely as
if it has never
run, and
return it.
678 Specifying a specific job bypasses the safety checks,
679 thus multiple workers could be running the
680 same job simultaneously (use only
for debugging).
683 Caller : beekeeper.pl
687 sub reset_job_by_dbID_and_sync {
688 my ($self, $job_id) = @_;
690 my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id);
692 my $stats = $job->analysis->stats;
694 $self->synchronize_AnalysisStats($stats);
698 ######################################
700 # Public API interface for beekeeper
702 ######################################
705 # Note: asking for Queen->fetch_overdue_workers(0) essentially means
706 # "fetch all workers known to the Queen not to be officially dead"
708 sub fetch_overdue_workers {
709 my ($self,$overdue_secs) = @_;
711 $overdue_secs = 3600 unless(defined($overdue_secs));
713 my $constraint =
"status!='DEAD' AND (when_checked_in IS NULL OR ". $self->dbc->_interval_seconds_sql(
'when_checked_in') .
" > $overdue_secs)";
715 return $self->fetch_all( $constraint );
719 =head2 synchronize_hive
721 Arg [1] : $list_of_analyses
722 Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
723 Description: Runs through all analyses in the given list and synchronizes
724 the analysis_stats summary with the states in the job and worker tables.
725 Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
731 sub synchronize_hive {
732 my ($self, $list_of_analyses) = @_;
734 my $start_time = time();
736 print
"\nSynchronizing the hive (".scalar(@$list_of_analyses).
" analyses this time):\n";
737 foreach my $analysis (@$list_of_analyses) {
738 $self->synchronize_AnalysisStats($analysis->stats);
739 print ( ($analysis->stats()->status eq
'BLOCKED') ?
'x' :
'o');
743 print
''.((time() - $start_time)).
" seconds to synchronize_hive\n\n";
747 =head2 safe_synchronize_AnalysisStats
750 Example : $self->safe_synchronize_AnalysisStats($stats);
751 Description: Prewrapper around synchronize_AnalysisStats that does
752 checks and grabs sync_lock before proceeding with sync.
753 Used by distributed worker sync system to avoid contention.
754 Returns 1 on success and 0
if the lock could not have been obtained,
755 and so no sync was attempted.
761 sub safe_synchronize_AnalysisStats {
762 my ($self, $stats) = @_;
765 my $was_synching = $stats->sync_lock;
767 my $max_refresh_attempts = 5;
768 while($stats->sync_lock and $max_refresh_attempts--) { # another Worker/Beekeeper is synching
this analysis right now
769 # ToDo: it would be nice to report the detected collision
771 $stats->refresh(); # just
try to avoid collision
774 # The sync has just completed and we have the freshest stats
775 if ($was_synching && !$stats->sync_lock) {
776 return 'sync_done_by_friend';
779 unless( ($stats->status eq
'DONE')
780 or ( ($stats->status eq
'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) {
782 # In case $stats->sync_lock is set, this is basically giving it one last chance
783 my $sql =
"UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
784 "WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
786 my $row_count = $self->dbc->do($sql); #
try to claim the sync_lock
788 if( $row_count == 1 ) { #
if we managed to obtain the lock, let
's go and perform the sync:
789 if ($stats->sync_lock) {
790 # Actually the sync has just been completed by another agent. Save time and load the stats it computed
792 # And release the lock
793 $stats->sync_lock(0);
794 $stats->adaptor->update_sync_lock($stats);
795 return 'sync_done_by_friend
';
797 $self->synchronize_AnalysisStats($stats, 1);
800 # otherwise assume it's locked and just
return un-updated
805 return $stats->sync_lock ? 0 :
'stats_fresh_enough';
809 =head2 synchronize_AnalysisStats
812 Example : $self->synchronize_AnalysisStats( $stats );
813 Description: Queries the job and worker tables to get summary counts
814 and rebuilds the AnalysisStats
object.
815 Then updates the analysis_stats table with the
new summary
info.
821 sub synchronize_AnalysisStats {
822 my ($self, $stats, $has_refresh_just_been_done) = @_;
824 if( $stats and $stats->analysis_id ) {
826 $stats->refresh() unless $has_refresh_just_been_done;
828 my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
830 $stats->recalculate_from_job_counts( $job_counts );
832 # $stats->sync_lock(0); ## do we perhaps need it here?
833 $stats->update; #update and release sync_lock
838 =head2 check_nothing_to_run_but_semaphored
840 Arg [1] : $list_of_analyses
841 Example : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] );
842 Description: Counts the number of immediately runnable jobs in the given analyses.
848 sub check_nothing_to_run_but_semaphored { # make sure it is
run after a recent sync
849 my ($self, $list_of_analyses) = @_;
851 my $only_semaphored_jobs_to_run = 1;
852 my $total_semaphored_job_count = 0;
854 foreach my $analysis (@$list_of_analyses) {
855 my $stats = $analysis->stats;
857 $only_semaphored_jobs_to_run = 0
if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count );
858 $total_semaphored_job_count += $stats->semaphored_job_count;
861 return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run );
865 =head2 print_status_and_return_reasons_to_exit
867 Arg [1] : $list_of_analyses
869 Example : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] );
870 :
foreach my $reason_to_exit (@$reasons_to_exit) {
871 : my $exit_message = $reason_to_exit->{
'message'};
872 : my $exit_status = $reason_to_exit->{
'exit_status'};
873 Description: Runs through all analyses in the given list, reports failed analyses, and computes some totals.
874 : It returns a list of exit messages and status codes. Each element of the list is a hashref,
875 : with the exit message keyed by
'message' and the status code keyed by
'exit_status'
877 : Possible status codes are:
882 : If $debug is set, the list will contain all analyses. Otherwise, empty and done analyses
885 Caller : beekeeper.pl
889 sub print_status_and_return_reasons_to_exit {
890 my ($self, $list_of_analyses, $debug) = @_;
892 my ($total_done_jobs, $total_failed_jobs, $total_jobs, $total_excluded_jobs, $cpumsec_to_do) = (0) x 5;
893 my %skipped_analyses = (
'EMPTY' => [],
'DONE' => []);
894 my @analyses_to_display;
897 foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
898 my $stats = $analysis->stats;
899 my $failed_job_count = $stats->failed_job_count;
900 my $is_excluded = $stats->is_excluded;
902 if ($debug or !$skipped_analyses{$stats->status}) {
903 push @analyses_to_display, $analysis;
905 push @{$skipped_analyses{$stats->status}}, $analysis;
908 if ($failed_job_count > 0) {
909 $self->synchronize_AnalysisStats($stats);
910 $stats->determine_status();
913 my $logic_name = $analysis->logic_name;
914 my $tolerance = $analysis->failed_job_tolerance;
915 if( $stats->status eq
'FAILED') {
916 $exit_status =
'ANALYSIS_FAILED';
917 $failure_message =
"### Analysis '$logic_name' has FAILED (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
919 $exit_status =
'JOB_FAILED';
920 $failure_message =
"### Analysis '$logic_name' has failed jobs (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
922 push (@reasons_to_exit, {
'message' => $failure_message,
923 'exit_status' => $exit_status});
927 my $excluded_job_count = $stats->total_job_count - $stats->done_job_count - $failed_job_count;
928 $total_excluded_jobs += $excluded_job_count;
929 push @{$skipped_analyses{
'EXCLUDED'}}, $analysis;
931 $total_done_jobs += $stats->done_job_count;
932 $total_failed_jobs += $failed_job_count;
933 $total_jobs += $stats->total_job_count;
934 $cpumsec_to_do += $stats->ready_job_count * $stats->avg_msec_per_job;
937 my $total_jobs_to_do = $total_jobs - $total_done_jobs - $total_failed_jobs - $total_excluded_jobs; # includes SEMAPHORED, READY, CLAIMED, INPROGRESS
938 my $cpuhrs_to_do = $cpumsec_to_do / (1000.0*60*60);
939 my $percentage_completed = $total_jobs
940 ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs)
943 # We use print_aligned_fields instead of printing each AnalysisStats' toString(),
944 # so that the fields are all vertically aligned.
945 if (@analyses_to_display) {
946 my $template = $analyses_to_display[0]->stats->_toString_template;
947 my @all_fields =
map {$_->stats->_toString_fields} @analyses_to_display;
948 print_aligned_fields(\@all_fields, $template);
952 if (@{$skipped_analyses{
'EMPTY'}}) {
953 printf(
"%d analyses not shown because they don't have any jobs.\n", scalar(@{$skipped_analyses{
'EMPTY'}}));
955 if (@{$skipped_analyses{
'DONE'}}) {
956 printf(
"%d analyses not shown because all their jobs are done.\n", scalar(@{$skipped_analyses{
'DONE'}}));
958 printf(
"total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed + %d excluded = %d total)\n",
959 scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_excluded_jobs, $total_jobs);
961 unless( $total_jobs_to_do ) {
962 if ($total_excluded_jobs > 0) {
963 push (@reasons_to_exit, {
'message' =>
"### Some analyses are excluded ###",
964 'exit_status' =>
'NO_WORK'});
966 push (@reasons_to_exit, {
'message' =>
"### No jobs left to do ###",
967 'exit_status' =>
'NO_WORK'});
970 return \@reasons_to_exit;
974 =head2 register_all_workers_dead
976 Example : $queen->register_all_workers_dead();
977 Description: Registers all workers dead
979 Caller : beekeeper.pl
983 sub register_all_workers_dead {
986 my $all_workers_considered_alive = $self->fetch_all(
"status!='DEAD'" );
987 foreach my $worker (@{$all_workers_considered_alive}) {
988 $self->register_worker_death( $worker );
993 sub interval_workers_with_unknown_usage {
996 my %meadow_to_interval = ();
999 SELECT meadow_type, meadow_name, MIN(when_submitted), IFNULL(max(when_died), MAX(when_submitted)), COUNT(*)
1001 LEFT JOIN worker_resource_usage u USING(worker_id)
1002 WHERE u.worker_id IS NULL
1003 GROUP BY meadow_type, meadow_name
1005 my $sth_times = $self->prepare( $sql_times );
1006 $sth_times->execute();
1007 while( my ($meadow_type, $meadow_name, $min_submitted, $max_died, $workers_count) = $sth_times->fetchrow_array() ) {
1008 $meadow_to_interval{$meadow_type}{$meadow_name} = {
1009 'min_submitted' => $min_submitted,
1010 'max_died' => $max_died,
1011 'workers_count' => $workers_count,
1014 $sth_times->finish();
1016 return \%meadow_to_interval;
1020 sub store_resource_usage {
1021 my ($self, $report_entries, $processid_2_workerid) = @_;
1023 # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
1025 my $sql_delete =
'DELETE FROM worker_resource_usage WHERE worker_id=?';
1026 my $sth_delete = $self->prepare( $sql_delete );
1028 my $sql_insert =
'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)';
1029 my $sth_insert = $self->prepare( $sql_insert );
1033 while( my ($process_id, $report_entry) = each %$report_entries ) {
1035 if( my $worker_id = $processid_2_workerid->{$process_id} ) {
1036 $sth_delete->execute( $worker_id );
1039 $sth_insert->execute( $worker_id, @$report_entry{
'exit_status',
'mem_megs',
'swap_megs',
'pending_sec',
'cpu_sec',
'lifespan_sec',
'exception_status'} ); # slicing hashref
1042 if($@ =~ /execute failed: Duplicate entry/s) { # ignore the collision with another parallel beekeeper
1043 $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id,
"Collision detected when storing resource_usage",
'WORKER_CAUTION' );
1049 push @not_ours, $process_id;
1050 #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
1053 $sth_delete->finish();
1054 $sth_insert->finish();