9 The Queen of the Hive based job control system is responsible to
'birthing' the
10 correct number of workers of the right type so that they can find jobs to
do.
11 It will also free up jobs of Workers that died unexpectantly so that other workers
14 Hive based processing is a concept based on a more controlled version
15 of an autonomous agent type system. Each worker is not told what to
do 16 (like a centralized control system - like the current pipeline system)
17 but rather queries a central database
for jobs (give me jobs).
19 Each worker is linked to an analysis_id, registers its
self on creation
20 into the Hive, creates a RunnableDB instance of the Analysis->module,
21 gets $analysis->batch_size jobs from the job table, does its work,
22 creates the next layer of job entries by interfacing to
23 the DataflowRuleAdaptor to determine the analyses it needs to pass its
24 output data to and creates jobs on the next analysis database.
25 It repeats
this cycle until it has lived its lifetime or until there are no
27 The lifetime limit is just a safety limit to prevent these from
'infecting' 30 The Queens job is to simply birth Workers of the correct analysis_id to
get the
31 work down. The only other thing the Queen does is free up jobs that were
32 claimed by Workers that died unexpectantly so that other workers can take
35 The Beekeeper is in charge of interfacing between the Queen and a compute resource
36 or
'compute farm'. Its job is to query Queens
if they need any workers and to
37 send the requested number of workers to open machines via the runWorker.pl script.
38 It is also responsible
for interfacing with the Queen to identify worker which died
43 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
44 Copyright [2016-2022] EMBL-European Bioinformatics Institute
46 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
47 You may obtain a copy of the License at
51 Unless required by applicable law or agreed to in writing, software distributed under the License
52 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
53 See the License
for the specific language governing permissions and limitations under the License.
57 Please subscribe to the Hive mailing list: http:
61 The rest of the documentation details each of the
object methods.
62 Internal methods are usually preceded with a _
67 package Bio::EnsEMBL::Hive::Queen;
71 use File::Path
'make_path';
72 use List::Util qw(max);
81 use base (
'Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
83 sub default_table_name {
88 sub default_input_column_mapping {
90 my $driver = $self->dbc->driver();
93 'mysql' =>
"UNIX_TIMESTAMP()-UNIX_TIMESTAMP(when_submitted) seconds_since_when_submitted ",
94 'sqlite' =>
"strftime('%s','now')-strftime('%s',when_submitted) seconds_since_when_submitted ",
95 'pgsql' =>
"EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - when_submitted) seconds_since_when_submitted ",
101 sub do_not_update_columns {
102 return [
'when_submitted'];
108 return 'Bio::EnsEMBL::Hive::Worker';
112 ############################ 116 ############################ 119 =head2 create_new_worker
121 Description: Creates an entry in the worker table,
122 populates some non-storable attributes
123 and returns a
Worker object based on that insert.
124 This guarantees that each worker registered in
this Queen's hive is properly registered. 125 Returntype : Bio::EnsEMBL::Hive::Worker 126 Caller : runWorker.pl 130 sub create_new_worker { 134 my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id, 135 $no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize, 136 $worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files) 137 = @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id 138 -no_write -debug -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize 139 -worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)}; 141 sleep( $worker_delay_startup_seconds // 0 ); # NB: undefined parameter would have caused eternal sleep! 143 if( defined( $worker_crash_on_startup_prob ) ) { 144 if( rand(1) < $worker_crash_on_startup_prob ) { 145 die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)"; 149 my $default_config = Bio::EnsEMBL::Hive::Utils::Config->new(@$config_files); 150 my ($meadow, $process_id, $meadow_host, $meadow_user) = Bio::EnsEMBL::Hive::Valley->new( $default_config )->whereami(); 151 die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user); 152 my $meadow_type = $meadow->type; 153 my $meadow_name = $meadow->cached_name; 155 foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) { 156 # So far 'RELOCATED events
' has been detected on LSF 9.0 in response to sending signal #99 or #100 157 # Since I don't know how to avoid them, I am trying to
register them when they happen.
158 # The following snippet buries the previous incarnation of the
Worker before starting a
new one.
160 # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN. 161 # LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'. 162 # So parsing 'bhist' output would probably yield the most accurate & confident registration of these events. 164 $self->register_worker_death( $prev_worker_incarnation );
171 my $max_registration_seconds = $meadow->config_get(
'MaxRegistrationSeconds');
172 my $seconds_waited = 0;
173 my $seconds_more = 5; # step increment
175 until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
176 my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
177 if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
178 my $msg =
"Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
179 $log_message_adaptor->store_hive_message($msg,
'WORKER_ERROR' );
182 $log_message_adaptor->store_hive_message(
"Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...",
'WORKER_CAUTION' );
183 sleep($seconds_more);
184 $seconds_waited += $seconds_more;
188 # only update the fields that were not available at the time of submission: 189 $worker->meadow_host( $meadow_host );
190 $worker->meadow_user( $meadow_user );
191 $worker->when_born(
'CURRENT_TIMESTAMP' );
192 $worker->status(
'READY' );
194 $self->update( $worker );
199 if( defined($resource_class_name) ) {
200 $resource_class = $self->db->hive_pipeline->collection_of(
'ResourceClass')->find_one_by(
'name' => $resource_class_name)
201 or die
"resource_class with name='$resource_class_name' could not be fetched from the database";
202 } elsif( defined($resource_class_id) ) {
203 $resource_class = $self->db->hive_pipeline->collection_of(
'ResourceClass')->find_one_by(
'dbID', $resource_class_id)
204 or die
"resource_class with dbID='$resource_class_id' could not be fetched from the database";
208 'meadow_type' => $meadow_type,
209 'meadow_name' => $meadow_name,
210 'process_id' => $process_id,
211 'resource_class' => $resource_class,
212 'beekeeper_id' => $beekeeper_id,
214 'meadow_host' => $meadow_host,
215 'meadow_user' => $meadow_user,
219 $self->store( $worker );
221 $worker->when_born(
'CURRENT_TIMESTAMP' );
222 $self->update_when_born( $worker );
224 $self->refresh( $worker );
228 $worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
232 if(defined($job_limit)) {
233 $worker->job_limiter($job_limit);
234 $worker->life_span(0);
237 $worker->life_span($life_span * 60)
if($life_span); # $life_span min -> sec
239 $worker->execute_writes(0)
if($no_write);
241 $worker->perform_cleanup(0)
if($no_cleanup);
243 $worker->debug($debug)
if($debug);
245 $worker->retry_throwing_jobs($retry_throwing_jobs)
if(defined $retry_throwing_jobs);
247 $worker->can_respecialize($can_respecialize)
if(defined $can_respecialize);
253 =head2 specialize_worker
255 Description: If analysis_id or logic_name is specified it will
try to specialize the
Worker into
this analysis.
256 If not specified the
Queen will analyze the hive and pick the most suitable analysis.
261 sub specialize_worker {
263 my $worker = shift @_;
264 my $flags = shift @_;
266 my ($analyses_pattern, $job_id, $force)
267 = @$flags{qw(-analyses_pattern -job_id -force)};
269 if( $analyses_pattern and $job_id ) {
270 die
"At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
277 $worker->
worker_say(
"resetting and fetching job for job_id '$job_id'");
279 my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
281 my $job = $job_adaptor->fetch_by_dbID( $job_id )
282 or die
"Could not fetch job with dbID='$job_id'";
283 my $job_status = $job->status();
285 if($job_status =~/(CLAIMED|PRE_CLEANUP|FETCH_INPUT|RUN|WRITE_OUTPUT|POST_HEALTHCHECK|POST_CLEANUP)/ ) {
286 die
"Job with dbID='$job_id' is already in progress, cannot run"; # FIXME:
try GC first, then complain
287 } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
288 die
"Job with dbID='$job_id' is $job_status, please use --force to override";
291 $analysis = $job->analysis;
292 if(($analysis->stats->status eq
'BLOCKED') and !$force) {
293 die
"Analysis is BLOCKED, can't specialize a worker. Please use --force to override";
296 if(($job_status eq
'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
297 $worker->worker_say(
"Increasing the semaphore count of the dependent job");
298 $controlled_semaphore->increase_by( [ $job ] );
301 my %status2counter = (
'FAILED' =>
'failed_job_count',
'READY' =>
'ready_job_count',
'DONE' =>
'done_job_count',
'PASSED_ON' =>
'done_job_count',
'SEMAPHORED' =>
'semaphored_job_count');
302 $analysis->stats->adaptor->increment_a_counter( $status2counter{$job->status}, -1, $job->analysis_id );
307 my $analyses_matching_pattern = $worker->hive_pipeline->collection_of(
'Analysis' )->find_all_by_pattern( $analyses_pattern );
309 # refresh the stats of matching analyses before re-specialization: 310 foreach my $analysis ( @$analyses_matching_pattern ) {
311 $analysis->stats->refresh();
313 $self->db->hive_pipeline->invalidate_hive_current_load;
317 unless( ref($analysis) ) {
319 $worker->cause_of_death(
'NO_ROLE');
328 'analysis' => $analysis,
330 $self->db->get_RoleAdaptor->store( $new_role );
331 $worker->current_role( $new_role );
333 my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
336 my $role_id = $new_role->
dbID;
337 if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
339 $worker->special_batch( [ $job ] );
341 die
"Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
344 }
else { # Note: special batch Workers should avoid flipping the status to
'WORKING' in
case the analysis is still
'BLOCKED' 346 $analysis_stats_adaptor->update_status($analysis->dbID,
'WORKING');
349 # The following increment used to be done only when no specific task was given to the worker, 350 # thereby excluding such "special task" workers from being counted in num_running_workers. 352 # However this may be tricky to emulate by triggers that know nothing about "special tasks", 353 # so I am (temporarily?) simplifying the accounting algorithm. 355 $analysis_stats_adaptor->increment_a_counter(
'num_running_workers', 1, $analysis->dbID );
359 sub register_worker_death {
360 my ($self, $worker, $update_when_checked_in) = @_;
362 my $worker_id = $worker->dbID;
363 my $work_done = $worker->work_done;
364 my $cause_of_death = $worker->cause_of_death ||
'UNKNOWN'; # make sure we
do not attempt to insert a
void 365 my $worker_died = $worker->when_died;
367 my $current_role = $worker->current_role;
369 unless( $current_role ) {
370 $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
373 if( $current_role and !$current_role->when_finished() ) {
374 # List of cause_of_death: 375 # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG' 376 # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN' 377 my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN|SEE_EXIT_STATUS)$/);
378 $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
379 $current_role->when_finished( $worker_died );
380 $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
383 my $sql =
"UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'" 384 . ( $update_when_checked_in ?
', when_checked_in=CURRENT_TIMESTAMP ' :
'' )
385 . ( $worker_died ?
", when_died='$worker_died'" :
', when_died=CURRENT_TIMESTAMP' )
386 .
" WHERE worker_id='$worker_id' ";
388 $self->dbc->protected_prepare_execute( [ $sql ],
389 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker,
"register_worker_death".$after,
'INFO' ); }
394 sub cached_resource_mapping {
396 $self->{
'_cached_resource_mapping'} ||= { map { $_->dbID => $_->name } $self->db->hive_pipeline->collection_of(
'ResourceClass')->list };
397 return $self->{
'_cached_resource_mapping'};
401 sub registered_workers_attributes {
404 return $self->fetch_all(
"status!='DEAD'", 1, [
'meadow_type',
'meadow_name',
'meadow_user',
'process_id'],
'status' );
408 sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
409 my ($self, $meadow_user) = @_;
411 my $worker_counts_by_meadow_type_rc_id = $self->count_all(
"status='SUBMITTED' AND meadow_user='$meadow_user'", [
'meadow_type',
'resource_class_id'] );
412 my $cached_resource_mapping = $self->cached_resource_mapping;
414 my %counts_by_meadow_type_rc_name = ();
416 while(my ($meadow_type, $counts_by_rc_id) = each %$worker_counts_by_meadow_type_rc_id) {
417 while(my ($rc_id, $count) = each %$counts_by_rc_id) {
418 my $rc_name = $cached_resource_mapping->{ $rc_id } ||
'__undefined_rc_name__';
419 $counts_by_meadow_type_rc_name{ $meadow_type }{ $rc_name } = $count;
423 return \%counts_by_meadow_type_rc_name;
427 sub check_for_dead_workers { # scans the whole
Valley for lost Workers (but ignores unreachable ones)
428 my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
430 my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose
this parameter
for easier tuning.
432 print
"GarbageCollector:\tChecking for lost Workers...\n";
434 # all non-DEAD workers found in the database, with their meadow status 435 my $reconciled_worker_statuses = $valley->
query_worker_statuses( $self->registered_workers_attributes );
436 # selects the workers available in this valley. does not query the database / meadow 437 my $signature_and_pid_to_worker_status = $valley->status_of_all_our_workers_by_meadow_signature( $reconciled_worker_statuses );
438 # this may pick up workers that have been created since the last fetch 439 my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
441 if (@$queen_overdue_workers) {
442 print
"GarbageCollector:\tOut of the ".scalar(@$queen_overdue_workers).
" Workers that haven't checked in during the last $last_few_seconds seconds...\n";
444 print
"GarbageCollector:\tfound none (all have checked in during the last $last_few_seconds seconds)\n";
447 my $this_meadow_user = whoami();
449 my %meadow_status_counts = ();
450 my %mt_and_pid_to_lost_worker = ();
451 foreach my $worker (@$queen_overdue_workers) {
453 my $meadow_signature = $worker->meadow_type.
'/'.$worker->meadow_name;
454 if(my $pid_to_worker_status = $signature_and_pid_to_worker_status->{$meadow_signature}) { # the whole
Meadow subhash is either present or the
Meadow is unreachable
456 my $meadow_type = $worker->meadow_type;
457 my $process_id = $worker->process_id;
458 my $status = $pid_to_worker_status->{$process_id}
460 if($bury_unkwn_workers and ($status eq
'UNKWN')) {
461 if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
462 if($meadow->can(
'kill_worker')) {
463 if($worker->meadow_user eq $this_meadow_user) { #
if I
'm actually allowed to kill the worker... 464 print "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id"; 466 $meadow->kill_worker($worker, 1); 473 $meadow_status_counts{$meadow_signature}{$status}++; 475 if(($status eq 'LOST
') or ($status eq 'SUBMITTED
')) { 477 $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker; 479 } elsif ($status eq 'DEFERRED_CHECK
') { 481 # do nothing now, wait until the next pass to check on this worker 485 # RUN|PEND|xSUSP handling 486 my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id='".$worker->dbID."'"; 487 $self->dbc->protected_prepare_execute( [ $update_when_seen_sql ], 488 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "see_worker".$after, 'INFO
' ); } 492 $meadow_status_counts{$meadow_signature}{'UNREACHABLE
'}++; # Worker is unreachable from this Valley 496 # print a quick summary report: 497 while(my ($meadow_signature, $status_count) = each %meadow_status_counts) { 498 print "GarbageCollector:\t[$meadow_signature Meadow:]\t".join(',
', map { "$_:$status_count->{$_}" } keys %$status_count )."\n\n"; 501 while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) { 502 my $this_meadow = $valley->available_meadow_hash->{$meadow_type}; 504 if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) { 505 print "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n"; 509 if($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) { 510 my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death
'} } values %$report_entries); 511 print "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n"; 514 print "GarbageCollector:\tRecording workers' missing attributes, registering their death, releasing their jobs and cleaning up temp directories\n
"; 515 while(my ($process_id, $worker) = each %$pid_to_lost_worker) { 516 if(my $report_entry = $report_entries && $report_entries->{$process_id}) { 517 my @updated_attribs = (); 518 foreach my $worker_attrib ( qw(when_born meadow_host when_died cause_of_death) ) { 519 if( defined( $report_entry->{$worker_attrib} ) ) { 520 $worker->$worker_attrib( $report_entry->{$worker_attrib} ); 521 push @updated_attribs, $worker_attrib; 524 $self->update( $worker, @updated_attribs ) if(scalar(@updated_attribs)); 527 my $max_limbo_seconds = $this_meadow->config_get('MaxLimboSeconds') // 0; # The maximum time for a Meadow to start showing the Worker (even in PEND state) after submission. 528 # We use it as a timeout for burying SUBMITTED and Meadow-invisible entries in the 'worker' table. 530 if( ($worker->status ne 'SUBMITTED') 531 || $worker->when_died # reported by Meadow as DEAD (only if Meadow supports get_report_entries_for_process_ids) 532 || ($worker->seconds_since_when_submitted > $max_limbo_seconds) ) { # SUBMITTED and Meadow-invisible for too long => we consider them LOST 534 $worker->cause_of_death('LIMBO') if( ($worker->status eq 'SUBMITTED') and !$worker->cause_of_death); # LIMBO cause_of_death means: found in SUBMITTED state, exceeded the timeout, Meadow did not tell us more 536 $self->register_worker_death( $worker ); 538 if( ($worker->status ne 'SUBMITTED') # There is no worker_temp_directory before specialization 539 and ($worker->meadow_user eq $this_meadow_user) ) { # if I'm actually allowed to kill the worker... 540 $valley->cleanup_left_temp_directory( $worker ); 545 if( $report_entries && %$report_entries ) { # use the opportunity to also store resource usage of the buried workers: 546 my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker }; 547 $self->store_resource_usage( $report_entries, $processid_2_workerid ); 552 # the following bit is completely Meadow-agnostic and only restores database integrity: 553 if($check_buried_in_haste) { 554 my $role_adaptor = $self->db->get_RoleAdaptor; 555 my $job_adaptor = $self->db->get_AnalysisJobAdaptor; 557 print "GarbageCollector:\tChecking
for orphan roles...\n
"; 558 my $orphan_roles = $role_adaptor->fetch_all_unfinished_roles_of_dead_workers(); 559 if(my $orphan_role_number = scalar @$orphan_roles) { 560 print "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n
"; 561 foreach my $orphan_role (@$orphan_roles) { 562 $role_adaptor->finalize_role( $orphan_role ); 565 print "GarbageCollector:\tfound none\n
"; 568 print "GarbageCollector:\tChecking
for roles buried in haste...\n
"; 569 my $buried_in_haste_roles = $role_adaptor->fetch_all_finished_roles_with_unfinished_jobs(); 570 if(my $bih_number = scalar @$buried_in_haste_roles) { 571 print "GarbageCollector:\tfound $bih_number buried roles with unfinished jobs, reclaiming.\n\n
"; 572 foreach my $role (@$buried_in_haste_roles) { 573 $job_adaptor->release_undone_jobs_from_role( $role ); 576 print "GarbageCollector:\tfound none\n
"; 579 print "GarbageCollector:\tChecking
for orphan jobs...\n
"; 580 my $orphan_jobs = $job_adaptor->fetch_all_unfinished_jobs_with_no_roles(); 581 if(my $sj_number = scalar @$orphan_jobs) { 582 print "GarbageCollector:\tfound $sj_number unfinished jobs with no roles, reclaiming.\n\n
"; 583 foreach my $job (@$orphan_jobs) { 584 $job_adaptor->release_and_age_job($job->dbID, $job->analysis->max_retry_count, 1); 587 print "GarbageCollector:\tfound none\n
"; 593 # To tackle the RELOCATED event: this method checks whether there are already workers with these attributes 594 sub find_previous_worker_incarnations { 595 my ($self, $meadow_type, $meadow_name, $process_id) = @_; 597 # This happens in standalone mode, when there is no database 598 return [] unless ref($self); 600 return $self->fetch_all( "status!=
'DEAD' AND status!=
'SUBMITTED' AND meadow_type=
'$meadow_type' AND meadow_name=
'$meadow_name' AND process_id=
'$process_id'" ); 604 sub fetch_preregistered_worker { 605 my ($self, $meadow_type, $meadow_name, $process_id) = @_; 607 # This happens in standalone mode, when there is no database 608 return [] unless ref($self); 610 my ($worker) = @{ $self->fetch_all( "status=
'SUBMITTED' AND meadow_type=
'$meadow_type' AND meadow_name=
'$meadow_name' AND process_id=
'$process_id'" ) }; 616 # a new version that both checks in and updates the status 617 sub check_in_worker { 618 my ($self, $worker) = @_; 620 my $sql = "UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status=
'".$worker->status."', work_done=
'".$worker->work_done."' WHERE worker_id=
'".$worker->dbID."'"; 622 $self->dbc->protected_prepare_execute( [ $sql ], 623 sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "check_in_worker
".$after, 'INFO' ); } 628 =head2 reset_job_by_dbID_and_sync 632 my $job = $queen->reset_job_by_dbID_and_sync($job_id); 634 For the specified job_id it will fetch just that job, 635 reset it completely as if it has never run, and return it. 636 Specifying a specific job bypasses the safety checks, 637 thus multiple workers could be running the 638 same job simultaneously (use only for debugging). 641 Caller : beekeeper.pl 645 sub reset_job_by_dbID_and_sync { 646 my ($self, $job_id) = @_; 648 my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id); 650 my $stats = $job->analysis->stats; 652 $self->synchronize_AnalysisStats($stats); 656 ###################################### 658 # Public API interface for beekeeper 660 ###################################### 663 # Note: asking for Queen->fetch_overdue_workers(0) essentially means 664 # "fetch all workers known to the
Queen not to be officially dead
" 666 sub fetch_overdue_workers { 667 my ($self,$overdue_secs) = @_; 669 $overdue_secs = 3600 unless(defined($overdue_secs)); 671 my $constraint = "status!=
'DEAD' AND (when_checked_in IS NULL OR
".{ 672 'mysql' => "(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(when_checked_in)) > $overdue_secs
", 673 'sqlite' => "(strftime(
'%s',
'now')-strftime(
'%s',when_checked_in)) > $overdue_secs
", 674 'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - when_checked_in) > $overdue_secs
", 675 }->{ $self->dbc->driver }.' )'; 677 return $self->fetch_all( $constraint ); 681 =head2 synchronize_hive 683 Arg [1] : $list_of_analyses 684 Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] ); 685 Description: Runs through all analyses in the given list and synchronizes 686 the analysis_stats summary with the states in the job and worker tables. 687 Then follows by checking all the blocking rules and blocks/unblocks analyses as needed. 693 sub synchronize_hive { 694 my ($self, $list_of_analyses) = @_; 696 my $start_time = time(); 698 print "\nSynchronizing the hive (
".scalar(@$list_of_analyses)." analyses
this time):\n
"; 699 foreach my $analysis (@$list_of_analyses) { 700 $self->synchronize_AnalysisStats($analysis->stats); 701 print ( ($analysis->stats()->status eq 'BLOCKED') ? 'x' : 'o'); 705 print ''.((time() - $start_time))." seconds to synchronize_hive\n\n
"; 709 =head2 safe_synchronize_AnalysisStats 711 Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object 712 Example : $self->safe_synchronize_AnalysisStats($stats); 713 Description: Prewrapper around synchronize_AnalysisStats that does 714 checks and grabs sync_lock before proceeding with sync. 715 Used by distributed worker sync system to avoid contention. 716 Returns 1 on success and 0 if the lock could not have been obtained, 717 and so no sync was attempted. 723 sub safe_synchronize_AnalysisStats { 724 my ($self, $stats) = @_; 727 my $was_synching = $stats->sync_lock; 729 my $max_refresh_attempts = 5; 730 while($stats->sync_lock and $max_refresh_attempts--) { # another Worker/Beekeeper is synching this analysis right now 731 # ToDo: it would be nice to report the detected collision 733 $stats->refresh(); # just try to avoid collision 736 # The sync has just completed and we have the freshest stats 737 if ($was_synching && !$stats->sync_lock) { 738 return 'sync_done_by_friend'; 741 unless( ($stats->status eq 'DONE') 742 or ( ($stats->status eq 'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) { 744 # In case $stats->sync_lock is set, this is basically giving it one last chance 745 my $sql = "UPDATE analysis_stats SET status=
'SYNCHING', sync_lock=1
". 746 "WHERE sync_lock=0 and analysis_id=
" . $stats->analysis_id; 748 my $row_count = $self->dbc->do($sql); # try to claim the sync_lock 750 if( $row_count == 1 ) { # if we managed to obtain the lock, let's go and perform the sync: 751 if ($stats->sync_lock) { 752 # Actually the sync has just been completed by another agent. Save time and load the stats it computed 754 # And release the lock 755 $stats->sync_lock(0); 756 $stats->adaptor->update_sync_lock($stats); 757 return 'sync_done_by_friend'; 759 $self->synchronize_AnalysisStats($stats, 1); 762 # otherwise assume it's locked and just return un-updated 767 return $stats->sync_lock ? 0 : 'stats_fresh_enough'; 771 =head2 synchronize_AnalysisStats 773 Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object 774 Example : $self->synchronize_AnalysisStats( $stats ); 775 Description: Queries the job and worker tables to get summary counts 776 and rebuilds the AnalysisStats object. 777 Then updates the analysis_stats table with the new summary info. 783 sub synchronize_AnalysisStats { 784 my ($self, $stats, $has_refresh_just_been_done) = @_; 786 if( $stats and $stats->analysis_id ) { 788 $stats->refresh() unless $has_refresh_just_been_done; 790 my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id ); 792 $stats->recalculate_from_job_counts( $job_counts ); 794 # $stats->sync_lock(0); ## do we perhaps need it here? 795 $stats->update; #update and release sync_lock 800 =head2 check_nothing_to_run_but_semaphored 802 Arg [1] : $list_of_analyses 803 Example : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] ); 804 Description: Counts the number of immediately runnable jobs in the given analyses. 810 sub check_nothing_to_run_but_semaphored { # make sure it is run after a recent sync 811 my ($self, $list_of_analyses) = @_; 813 my $only_semaphored_jobs_to_run = 1; 814 my $total_semaphored_job_count = 0; 816 foreach my $analysis (@$list_of_analyses) { 817 my $stats = $analysis->stats; 819 $only_semaphored_jobs_to_run = 0 if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count ); 820 $total_semaphored_job_count += $stats->semaphored_job_count; 823 return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run ); 827 =head2 print_status_and_return_reasons_to_exit 829 Arg [1] : $list_of_analyses 831 Example : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] ); 832 : foreach my $reason_to_exit (@$reasons_to_exit) { 833 : my $exit_message = $reason_to_exit->{'message'}; 834 : my $exit_status = $reason_to_exit->{'exit_status'}; 835 Description: Runs through all analyses in the given list, reports failed analyses, and computes some totals. 836 : It returns a list of exit messages and status codes. Each element of the list is a hashref, 837 : with the exit message keyed by 'message' and the status code keyed by 'exit_status' 839 : Possible status codes are: 844 : If $debug is set, the list will contain all analyses. Otherwise, empty and done analyses 847 Caller : beekeeper.pl 851 sub print_status_and_return_reasons_to_exit { 852 my ($self, $list_of_analyses, $debug) = @_; 854 my ($total_done_jobs, $total_failed_jobs, $total_jobs, $total_excluded_jobs, $cpumsec_to_do) = (0) x 5; 855 my %skipped_analyses = ('EMPTY' => [], 'DONE' => []); 856 my @analyses_to_display; 859 foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) { 860 my $stats = $analysis->stats; 861 my $failed_job_count = $stats->failed_job_count; 862 my $is_excluded = $stats->is_excluded; 864 if ($debug or !$skipped_analyses{$stats->status}) { 865 push @analyses_to_display, $analysis; 867 push @{$skipped_analyses{$stats->status}}, $analysis; 870 if ($failed_job_count > 0) { 871 $self->synchronize_AnalysisStats($stats); 872 $stats->determine_status(); 875 my $logic_name = $analysis->logic_name; 876 my $tolerance = $analysis->failed_job_tolerance; 877 if( $stats->status eq 'FAILED') { 878 $exit_status = 'ANALYSIS_FAILED'; 879 $failure_message = "###
Analysis '$logic_name' has FAILED (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###
"; 881 $exit_status = 'JOB_FAILED'; 882 $failure_message = "###
Analysis '$logic_name' has failed jobs (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###
"; 884 push (@reasons_to_exit, {'message' => $failure_message, 885 'exit_status' => $exit_status}); 889 my $excluded_job_count = $stats->total_job_count - $stats->done_job_count - $failed_job_count; 890 $total_excluded_jobs += $excluded_job_count; 891 push @{$skipped_analyses{'EXCLUDED'}}, $analysis; 893 $total_done_jobs += $stats->done_job_count; 894 $total_failed_jobs += $failed_job_count; 895 $total_jobs += $stats->total_job_count; 896 $cpumsec_to_do += $stats->ready_job_count * $stats->avg_msec_per_job; 899 my $total_jobs_to_do = $total_jobs - $total_done_jobs - $total_failed_jobs - $total_excluded_jobs; # includes SEMAPHORED, READY, CLAIMED, INPROGRESS 900 my $cpuhrs_to_do = $cpumsec_to_do / (1000.0*60*60); 901 my $percentage_completed = $total_jobs 902 ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs) 905 my $max_logic_name_length = max(map {length($_->logic_name)} @analyses_to_display); 906 foreach my $analysis (@analyses_to_display) { 907 print $analysis->stats->toString($max_logic_name_length) . "\n
"; 910 if (@{$skipped_analyses{'EMPTY'}}) { 911 printf("%d analyses not shown because they don
't have any jobs.\n", scalar(@{$skipped_analyses{'EMPTY
'}})); 913 if (@{$skipped_analyses{'DONE
'}}) { 914 printf("%d analyses not shown because all their jobs are done.\n", scalar(@{$skipped_analyses{'DONE
'}})); 916 printf("total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed + %d excluded = %d total)\n", 917 scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_excluded_jobs, $total_jobs); 919 unless( $total_jobs_to_do ) { 920 if ($total_excluded_jobs > 0) { 921 push (@reasons_to_exit, {'message
' => "### Some analyses are excluded ###", 922 'exit_status
' => 'NO_WORK
'}); 924 push (@reasons_to_exit, {'message
' => "### No jobs left to do ###", 925 'exit_status
' => 'NO_WORK
'}); 928 return \@reasons_to_exit; 932 =head2 register_all_workers_dead 934 Example : $queen->register_all_workers_dead(); 935 Description: Registers all workers dead 937 Caller : beekeeper.pl 941 sub register_all_workers_dead { 944 my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD
'" ); 945 foreach my $worker (@{$all_workers_considered_alive}) { 946 $self->register_worker_death( $worker ); 951 sub interval_workers_with_unknown_usage { 954 my %meadow_to_interval = (); 957 SELECT meadow_type, meadow_name, MIN(when_submitted), IFNULL(max(when_died), MAX(when_submitted)), COUNT(*) 959 LEFT JOIN worker_resource_usage u USING(worker_id) 960 WHERE u.worker_id IS NULL 961 GROUP BY meadow_type, meadow_name 963 my $sth_times = $self->prepare( $sql_times ); 964 $sth_times->execute(); 965 while( my ($meadow_type, $meadow_name, $min_submitted, $max_died, $workers_count) = $sth_times->fetchrow_array() ) { 966 $meadow_to_interval{$meadow_type}{$meadow_name} = { 967 'min_submitted
' => $min_submitted, 968 'max_died
' => $max_died, 969 'workers_count
' => $workers_count, 972 $sth_times->finish(); 974 return \%meadow_to_interval; 978 sub store_resource_usage { 979 my ($self, $report_entries, $processid_2_workerid) = @_; 981 # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet. 983 my $sql_delete = 'DELETE FROM worker_resource_usage WHERE worker_id=?
'; 984 my $sth_delete = $self->prepare( $sql_delete ); 986 my $sql_insert = 'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
'; 987 my $sth_insert = $self->prepare( $sql_insert ); 991 while( my ($process_id, $report_entry) = each %$report_entries ) { 993 if( my $worker_id = $processid_2_workerid->{$process_id} ) { 994 $sth_delete->execute( $worker_id ); 997 $sth_insert->execute( $worker_id, @$report_entry{'exit_status
', 'mem_megs
', 'swap_megs
', 'pending_sec
', 'cpu_sec
', 'lifespan_sec
', 'exception_status
'} ); # slicing hashref 1000 if($@ =~ /execute failed: Duplicate entry/s) { # ignore the collision with another parallel beekeeper 1001 $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id, "Collision detected when storing resource_usage", 'WORKER_CAUTION
' ); 1007 push @not_ours, $process_id; 1008 #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n"; 1011 $sth_delete->finish(); 1012 $sth_insert->finish();
public suggest_analysis_to_specialize_a_worker()
public query_worker_statuses()
public Bio::EnsEMBL::Hive::Storable new()