ensembl-hive  2.6
Queen.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  The Queen of the Hive based job control system is responsible to 'birthing' the
10  correct number of workers of the right type so that they can find jobs to do.
11  It will also free up jobs of Workers that died unexpectantly so that other workers
12  can claim them to do.
13 
14  Hive based processing is a concept based on a more controlled version
15  of an autonomous agent type system. Each worker is not told what to do
16  (like a centralized control system - like the current pipeline system)
17  but rather queries a central database for jobs (give me jobs).
18 
19  Each worker is linked to an analysis_id, registers its self on creation
20  into the Hive, creates a RunnableDB instance of the Analysis->module,
21  gets $analysis->batch_size jobs from the job table, does its work,
22  creates the next layer of job entries by interfacing to
23  the DataflowRuleAdaptor to determine the analyses it needs to pass its
24  output data to and creates jobs on the next analysis database.
25  It repeats this cycle until it has lived its lifetime or until there are no
26  more jobs left.
27  The lifetime limit is just a safety limit to prevent these from 'infecting'
28  a system.
29 
30  The Queens job is to simply birth Workers of the correct analysis_id to get the
31  work down. The only other thing the Queen does is free up jobs that were
32  claimed by Workers that died unexpectantly so that other workers can take
33  over the work.
34 
35  The Beekeeper is in charge of interfacing between the Queen and a compute resource
36  or 'compute farm'. Its job is to query Queens if they need any workers and to
37  send the requested number of workers to open machines via the runWorker.pl script.
38  It is also responsible for interfacing with the Queen to identify worker which died
39  unexpectantly.
40 
41 =head1 LICENSE
42 
43  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
44  Copyright [2016-2024] EMBL-European Bioinformatics Institute
45 
46  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
47  You may obtain a copy of the License at
48 
49  http://www.apache.org/licenses/LICENSE-2.0
50 
51  Unless required by applicable law or agreed to in writing, software distributed under the License
52  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
53  See the License for the specific language governing permissions and limitations under the License.
54 
55 =head1 CONTACT
56 
57  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
58 
59 =head1 APPENDIX
60 
61  The rest of the documentation details each of the object methods.
62  Internal methods are usually preceded with a _
63 
64 =cut
65 
66 
67 package Bio::EnsEMBL::Hive::Queen;
68 
69 use strict;
70 use warnings;
71 
74 use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash', 'whoami', 'print_aligned_fields'); # NB: some are needed by invisible code
79 
81 
82 use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
83 
84 sub default_table_name {
85  return 'worker';
86 }
87 
88 
89 sub default_input_column_mapping {
90  my $self = shift @_;
91  return {
92  'when_submitted' => $self->dbc->_interval_seconds_sql('when_submitted') . ' seconds_since_when_submitted',
93  };
94 }
95 
96 
97 sub do_not_update_columns {
98  return ['when_submitted'];
99 }
100 
101 
102 
103 sub object_class {
104  return 'Bio::EnsEMBL::Hive::Worker';
105 }
106 
107 
108 ############################
109 #
110 # PUBLIC API
111 #
112 ############################
113 
114 
115 =head2 create_new_worker
116 
117  Description: Creates an entry in the worker table,
118  populates some non-storable attributes
119  and returns a Worker object based on that insert.
120  This guarantees that each worker registered in this Queen's hive is properly registered.
121  Returntype : Bio::EnsEMBL::Hive::Worker
122  Caller : runWorker.pl
123 
124 =cut
125 
126 sub create_new_worker {
127  my $self = shift @_;
128  my %flags = @_;
129 
130  my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id,
131  $no_write, $debug, $worker_base_temp_dir, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize,
132  $worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files)
133  = @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id
134  -no_write -debug -worker_base_temp_dir -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize
135  -worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)};
136 
137  sleep( $worker_delay_startup_seconds // 0 ); # NB: undefined parameter would have caused eternal sleep!
138 
139  if( defined( $worker_crash_on_startup_prob ) ) {
140  if( rand(1) < $worker_crash_on_startup_prob ) {
141  die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)";
142  }
143  }
144 
145  my $default_config = Bio::EnsEMBL::Hive::Utils::Config->new(@$config_files);
146  my ($meadow, $process_id, $meadow_host, $meadow_user) = Bio::EnsEMBL::Hive::Valley->new( $default_config )->whereami();
147  die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user);
148  my $meadow_type = $meadow->type;
149  my $meadow_name = $meadow->cached_name;
150 
151  foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) {
152  # So far 'RELOCATED events' has been detected on LSF 9.0 in response to sending signal #99 or #100
153  # Since I don't know how to avoid them, I am trying to register them when they happen.
154  # The following snippet buries the previous incarnation of the Worker before starting a new one.
155  #
156  # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
157  # LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
158  # So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
159  $prev_worker_incarnation->cause_of_death( 'RELOCATED' );
160  $self->register_worker_death( $prev_worker_incarnation );
161  }
162 
163  my $worker;
164 
165  if($preregistered) {
166 
167  my $max_registration_seconds = $meadow->config_get('MaxRegistrationSeconds');
168  my $seconds_waited = 0;
169  my $seconds_more = 5; # step increment
170 
171  until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
172  my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
173  if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
174  my $msg = "Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
175  $log_message_adaptor->store_hive_message($msg, 'WORKER_ERROR' );
176  die $msg;
177  } else {
178  $log_message_adaptor->store_hive_message("Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...", 'WORKER_CAUTION' );
179  sleep($seconds_more);
180  $seconds_waited += $seconds_more;
181  }
182  }
183 
184  # only update the fields that were not available at the time of submission:
185  $worker->meadow_host( $meadow_host );
186  $worker->meadow_user( $meadow_user );
187  $worker->when_born( 'CURRENT_TIMESTAMP' );
188  $worker->status( 'READY' );
189 
190  $self->update( $worker );
191 
192  } else {
193  my $resource_class;
194 
195  if( defined($resource_class_name) ) {
196  $resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('name' => $resource_class_name)
197  or die "resource_class with name='$resource_class_name' could not be fetched from the database";
198  } elsif( defined($resource_class_id) ) {
199  $resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('dbID', $resource_class_id)
200  or die "resource_class with dbID='$resource_class_id' could not be fetched from the database";
201  }
202 
203  $worker = Bio::EnsEMBL::Hive::Worker->new(
204  'meadow_type' => $meadow_type,
205  'meadow_name' => $meadow_name,
206  'process_id' => $process_id,
207  'resource_class' => $resource_class,
208  'beekeeper_id' => $beekeeper_id,
209 
210  'meadow_host' => $meadow_host,
211  'meadow_user' => $meadow_user,
212  );
213 
214  if (ref($self)) {
215  $self->store( $worker );
216 
217  $worker->when_born( 'CURRENT_TIMESTAMP' );
218  $self->update_when_born( $worker );
219 
220  $self->refresh( $worker );
221  }
222  }
223 
224  $worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
225  $worker->set_temp_directory_name( $worker_base_temp_dir || $meadow->config_get('BaseTempDirectory') );
226 
227  $worker->init;
228 
229  if(defined($job_limit)) {
230  $worker->job_limiter($job_limit);
231  $worker->life_span(0);
232  }
233 
234  $worker->life_span($life_span * 60) if($life_span); # $life_span min -> sec
235 
236  $worker->execute_writes(0) if($no_write);
237 
238  $worker->perform_cleanup(0) if($no_cleanup);
239 
240  $worker->debug($debug) if($debug);
241 
242  $worker->retry_throwing_jobs($retry_throwing_jobs) if(defined $retry_throwing_jobs);
243 
244  $worker->can_respecialize($can_respecialize) if(defined $can_respecialize);
245 
246  return $worker;
247 }
248 
249 
250 =head2 specialize_worker
251 
252  Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis.
253  If not specified the Queen will analyze the hive and pick the most suitable analysis.
255 
256 =cut
257 
258 sub specialize_worker {
259  my $self = shift @_;
260  my $worker = shift @_;
261  my $flags = shift @_;
262 
263  my ($analyses_pattern, $job_id, $force)
264  = @$flags{qw(-analyses_pattern -job_id -force)};
265 
266  if( $analyses_pattern and $job_id ) {
267  die "At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
268  }
269 
270  my $analysis;
271 
272  if( $job_id ) {
273 
274  $worker->worker_say("resetting and fetching job for job_id '$job_id'");
275 
276  my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
277 
278  my $job = $job_adaptor->fetch_by_dbID( $job_id )
279  or die "Could not fetch job with dbID='$job_id'";
280  my $job_status = $job->status();
281 
282  if ($Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor::ALL_STATUSES_OF_TAKEN_JOBS =~ /'$job_status'/) {
283  die "Job with dbID='$job_id' is already in progress, cannot run"; # FIXME: try GC first, then complain
284  } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
285  die "Job with dbID='$job_id' is $job_status, please use --force to override";
286  }
287 
288  $analysis = $job->analysis;
289  if(($analysis->stats->status eq 'BLOCKED') and !$force) {
290  die "Analysis is BLOCKED, can't specialize a worker. Please use --force to override";
291  }
292 
293  if(($job_status eq 'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
294  $worker->worker_say("Increasing the semaphore count of the dependent job");
295  $controlled_semaphore->increase_by( [ $job ] );
296  }
297 
298  $analysis->stats->adaptor->increment_a_counter( $Bio::EnsEMBL::Hive::AnalysisStats::status2counter{$job->status}, -1, $job->analysis_id );
299 
300  } else {
301 
302  $analyses_pattern //= '%'; # for printing
303  my $analyses_matching_pattern = $worker->hive_pipeline->collection_of( 'Analysis' )->find_all_by_pattern( $analyses_pattern );
304 
305  # refresh the stats of matching analyses before re-specialization:
306  foreach my $analysis ( @$analyses_matching_pattern ) {
307  $analysis->stats->refresh();
308  }
309  $self->db->hive_pipeline->invalidate_hive_current_load;
310 
311  $analysis = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_matching_pattern, $analyses_pattern);
312 
313  unless( ref($analysis) ) {
314 
315  $worker->cause_of_death('NO_ROLE');
316 
317  my $msg = $analysis // "No analysis suitable for the worker was found";
318  die "$msg\n";
319  }
320  }
321 
322  my $new_role = Bio::EnsEMBL::Hive::Role->new(
323  'worker' => $worker,
324  'analysis' => $analysis,
325  );
326  $self->db->get_RoleAdaptor->store( $new_role );
327  $worker->current_role( $new_role );
328 
329  my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
330 
331  if($job_id) {
332  my $role_id = $new_role->dbID;
333  if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
334 
335  $worker->special_batch( [ $job ] );
336  } else {
337  die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
338  }
339 
340  } else { # Note: special batch Workers should avoid flipping the status to 'WORKING' in case the analysis is still 'BLOCKED'
341 
342  $analysis_stats_adaptor->update_status($analysis->dbID, 'WORKING');
343  }
344 
345  # The following increment used to be done only when no specific task was given to the worker,
346  # thereby excluding such "special task" workers from being counted in num_running_workers.
347  #
348  # However this may be tricky to emulate by triggers that know nothing about "special tasks",
349  # so I am (temporarily?) simplifying the accounting algorithm.
350  #
351  $analysis_stats_adaptor->increment_a_counter( 'num_running_workers', 1, $analysis->dbID );
352 }
353 
354 
355 sub register_worker_death {
356  my ($self, $worker, $update_when_checked_in) = @_;
357 
358  my $worker_id = $worker->dbID;
359  my $work_done = $worker->work_done;
360  my $cause_of_death = $worker->cause_of_death || 'UNKNOWN'; # make sure we do not attempt to insert a void
361  my $worker_died = $worker->when_died;
362 
363  my $current_role = $worker->current_role;
364 
365  unless( $current_role ) {
366  $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
367  }
368 
369  if( $current_role and !$current_role->when_finished() ) {
370  # List of cause_of_death:
371  # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG'
372  # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN'
373  my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN|SEE_EXIT_STATUS)$/);
374  $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
375  $current_role->when_finished( $worker_died );
376  $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
377  }
378 
379  my $sql = "UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'"
380  . ( $update_when_checked_in ? ', when_checked_in=CURRENT_TIMESTAMP ' : '' )
381  . ( $worker_died ? ", when_died='$worker_died'" : ', when_died=CURRENT_TIMESTAMP' )
382  . " WHERE worker_id='$worker_id' ";
383 
384  $self->dbc->protected_prepare_execute( [ $sql ],
385  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "register_worker_death".$after, 'INFO' ); }
386  );
387 }
388 
389 
390 sub kill_all_workers {
391  my ( $self, $valley ) = @_;
392 
393  my $this_meadow_user = whoami();
394  my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD'" );
395  foreach my $worker ( @{ $all_workers_considered_alive } ) {
396  my $kill_status;
397 
398  my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker );
399  if ( ! defined $meadow ) {
400  # Most likely a meadow not reachable for the current beekeeper,
401  # e.g. a LOCAL one started on a different host.
402  $kill_status = 'meadow not reachable';
403  }
404  elsif ( ! $meadow->can('kill_worker') ) {
405  $kill_status = 'killing workers not supported by the meadow';
406  }
407  elsif ( $worker->meadow_user eq $this_meadow_user ) { # if I'm actually allowed to kill the worker...
408  # The actual termination of a worker might well be asynchronous
409  # but at least we check for obvious problems, e.g. insufficient
410  # permissions to execute a kill.
411  my $kill_return_value = $meadow->kill_worker( $worker, 1 );
412  if ( $kill_return_value != 0 ) {
413  $kill_status = "request failure (return code: ${kill_return_value})";
414  }
415  else {
416  $kill_status = 'requested successfully';
417  $worker->cause_of_death( 'KILLED_BY_USER' );
418  $self->register_worker_death( $worker );
419  if ( $worker->status ne 'SUBMITTED' ) { # There is no worker_temp_directory before specialization
420  $meadow->cleanup_temp_directory( $worker );
421  }
422  }
423  }
424  else {
425  $kill_status = 'could not kill, running under user ' . $worker->meadow_user;
426  }
427 
428  print 'Killing worker ' . $worker->dbID() . ': '
429  . $worker->toString( 1 ) . " -> $kill_status \n";
430  }
431 
432  return;
433 }
434 
435 
436 sub cached_resource_mapping {
437  my $self = shift;
438  $self->{'_cached_resource_mapping'} ||= { map { $_->dbID => $_->name } $self->db->hive_pipeline->collection_of('ResourceClass')->list };
439  return $self->{'_cached_resource_mapping'};
440 }
441 
442 
443 sub registered_workers_attributes {
444  my $self = shift @_;
445 
446  return $self->fetch_all("status!='DEAD'", 1, ['meadow_type', 'meadow_name', 'meadow_user', 'process_id'], 'status' );
447 }
448 
449 
450 sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
451  my ($self, $meadow_user) = @_;
452 
453  my $worker_counts_by_meadow_type_rc_id = $self->count_all("status='SUBMITTED' AND meadow_user='$meadow_user'", ['meadow_type', 'resource_class_id'] );
454  my $cached_resource_mapping = $self->cached_resource_mapping;
455 
456  my %counts_by_meadow_type_rc_name = ();
457 
458  while(my ($meadow_type, $counts_by_rc_id) = each %$worker_counts_by_meadow_type_rc_id) {
459  while(my ($rc_id, $count) = each %$counts_by_rc_id) {
460  my $rc_name = $cached_resource_mapping->{ $rc_id } || '__undefined_rc_name__';
461  $counts_by_meadow_type_rc_name{ $meadow_type }{ $rc_name } = $count;
462  }
463  }
464 
465  return \%counts_by_meadow_type_rc_name;
466 }
467 
468 
469 sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
470  my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
471 
472  my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
473 
474  print "GarbageCollector:\tChecking for lost Workers...\n";
475 
476  # all non-DEAD workers found in the database, with their meadow status
477  my $reconciled_worker_statuses = $valley->query_worker_statuses( $self->registered_workers_attributes );
478  # selects the workers available in this valley. does not query the database / meadow
479  my $signature_and_pid_to_worker_status = $valley->status_of_all_our_workers_by_meadow_signature( $reconciled_worker_statuses );
480  # this may pick up workers that have been created since the last fetch
481  my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
482 
483  if (@$queen_overdue_workers) {
484  print "GarbageCollector:\tOut of the ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n";
485  } else {
486  print "GarbageCollector:\tfound none (all have checked in during the last $last_few_seconds seconds)\n";
487  }
488 
489  my $this_meadow_user = whoami();
490 
491  my %meadow_status_counts = ();
492  my %mt_and_pid_to_lost_worker = ();
493  foreach my $worker (@$queen_overdue_workers) {
494 
495  my $meadow_signature = $worker->meadow_type.'/'.$worker->meadow_name;
496  if(my $pid_to_worker_status = $signature_and_pid_to_worker_status->{$meadow_signature}) { # the whole Meadow subhash is either present or the Meadow is unreachable
497 
498  my $meadow_type = $worker->meadow_type;
499  my $process_id = $worker->process_id;
500  my $status = $pid_to_worker_status->{$process_id} // 'DEFERRED_CHECK'; # Workers that have been created between registered_workers_attributes and fetch_overdue_workers
501 
502  if($bury_unkwn_workers and ($status eq 'UNKWN')) {
503  if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
504  if($meadow->can('kill_worker')) {
505  if($worker->meadow_user eq $this_meadow_user) { # if I'm actually allowed to kill the worker...
506  print "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id";
507 
508  $meadow->kill_worker($worker, 1);
509  $status = 'LOST';
510  }
511  }
512  }
513  }
514 
515  $meadow_status_counts{$meadow_signature}{$status}++;
516 
517  if(($status eq 'LOST') or ($status eq 'SUBMITTED')) {
518 
519  $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
520 
521  } elsif ($status eq 'DEFERRED_CHECK') {
522 
523  # do nothing now, wait until the next pass to check on this worker
524 
525  } else {
526 
527  # RUN|PEND|xSUSP handling
528  my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id='".$worker->dbID."'";
529  $self->dbc->protected_prepare_execute( [ $update_when_seen_sql ],
530  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "see_worker".$after, 'INFO' ); }
531  );
532  }
533  } else {
534  $meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
535  }
536  }
537 
538  # print a quick summary report:
539  while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
540  print "GarbageCollector:\t[$meadow_signature Meadow:]\t".join(', ', map { "$_:$status_count->{$_}" } keys %$status_count )."\n\n";
541  }
542 
543  while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
544  my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
545 
546  if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
547  print "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";
548 
549  my $report_entries;
550 
551  if($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
552  my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
553  print "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n";
554  }
555 
556  print "GarbageCollector:\tRecording workers' missing attributes, registering their death, releasing their jobs and cleaning up temp directories\n";
557  while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
558  if(my $report_entry = $report_entries && $report_entries->{$process_id}) {
559  my @updated_attribs = ();
560  foreach my $worker_attrib ( qw(when_born meadow_host when_died cause_of_death) ) {
561  if( defined( $report_entry->{$worker_attrib} ) ) {
562  $worker->$worker_attrib( $report_entry->{$worker_attrib} );
563  push @updated_attribs, $worker_attrib;
564  }
565  }
566  $self->update( $worker, @updated_attribs ) if(scalar(@updated_attribs));
567  }
568 
569  my $max_limbo_seconds = $this_meadow->config_get('MaxLimboSeconds') // 0; # The maximum time for a Meadow to start showing the Worker (even in PEND state) after submission.
570  # We use it as a timeout for burying SUBMITTED and Meadow-invisible entries in the 'worker' table.
571 
572  if( ($worker->status ne 'SUBMITTED')
573  || $worker->when_died # reported by Meadow as DEAD (only if Meadow supports get_report_entries_for_process_ids)
574  || ($worker->seconds_since_when_submitted > $max_limbo_seconds) ) { # SUBMITTED and Meadow-invisible for too long => we consider them LOST
575 
576  $worker->cause_of_death('LIMBO') if( ($worker->status eq 'SUBMITTED') and !$worker->cause_of_death); # LIMBO cause_of_death means: found in SUBMITTED state, exceeded the timeout, Meadow did not tell us more
577 
578  $self->register_worker_death( $worker );
579 
580  if( ($worker->status ne 'SUBMITTED') # There is no worker_temp_directory before specialization
581  and ($worker->meadow_user eq $this_meadow_user) ) { # if I'm actually allowed to kill the worker...
582  $this_meadow->cleanup_temp_directory( $worker );
583  }
584  }
585  }
586 
587  if( $report_entries && %$report_entries ) { # use the opportunity to also store resource usage of the buried workers:
588  my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
589  $self->store_resource_usage( $report_entries, $processid_2_workerid );
590  }
591  }
592  }
593 
594  # the following bit is completely Meadow-agnostic and only restores database integrity:
595  if($check_buried_in_haste) {
596  my $role_adaptor = $self->db->get_RoleAdaptor;
597  my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
598 
599  print "GarbageCollector:\tChecking for orphan roles...\n";
600  my $orphan_roles = $role_adaptor->fetch_all_unfinished_roles_of_dead_workers();
601  if(my $orphan_role_number = scalar @$orphan_roles) {
602  print "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
603  foreach my $orphan_role (@$orphan_roles) {
604  $role_adaptor->finalize_role( $orphan_role );
605  }
606  } else {
607  print "GarbageCollector:\tfound none\n";
608  }
609 
610  print "GarbageCollector:\tChecking for roles buried in haste...\n";
611  my $buried_in_haste_roles = $role_adaptor->fetch_all_finished_roles_with_unfinished_jobs();
612  if(my $bih_number = scalar @$buried_in_haste_roles) {
613  print "GarbageCollector:\tfound $bih_number buried roles with unfinished jobs, reclaiming.\n\n";
614  foreach my $role (@$buried_in_haste_roles) {
615  $job_adaptor->release_undone_jobs_from_role( $role );
616  }
617  } else {
618  print "GarbageCollector:\tfound none\n";
619  }
620 
621  print "GarbageCollector:\tChecking for orphan jobs...\n";
622  my $orphan_jobs = $job_adaptor->fetch_all_unfinished_jobs_with_no_roles();
623  if(my $sj_number = scalar @$orphan_jobs) {
624  print "GarbageCollector:\tfound $sj_number unfinished jobs with no roles, reclaiming.\n\n";
625  foreach my $job (@$orphan_jobs) {
626  $job_adaptor->release_and_age_job($job->dbID, $job->analysis->max_retry_count, 1);
627  }
628  } else {
629  print "GarbageCollector:\tfound none\n";
630  }
631  }
632 }
633 
634 
635  # To tackle the RELOCATED event: this method checks whether there are already workers with these attributes
636 sub find_previous_worker_incarnations {
637  my ($self, $meadow_type, $meadow_name, $process_id) = @_;
638 
639  # This happens in standalone mode, when there is no database
640  return [] unless ref($self);
641 
642  return $self->fetch_all( "status!='DEAD' AND status!='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" );
643 }
644 
645 
646 sub fetch_preregistered_worker {
647  my ($self, $meadow_type, $meadow_name, $process_id) = @_;
648 
649  # This happens in standalone mode, when there is no database
650  return [] unless ref($self);
651 
652  my ($worker) = @{ $self->fetch_all( "status='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" ) };
653 
654  return $worker;
655 }
656 
657 
658  # a new version that both checks in and updates the status
659 sub check_in_worker {
660  my ($self, $worker) = @_;
661 
662  my $sql = "UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status='".$worker->status."', work_done='".$worker->work_done."' WHERE worker_id='".$worker->dbID."'";
663 
664  $self->dbc->protected_prepare_execute( [ $sql ],
665  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "check_in_worker".$after, 'INFO' ); }
666  );
667 }
668 
669 
670 =head2 reset_job_by_dbID_and_sync
671 
672  Arg [1]: int $job_id
673  Example:
674  my $job = $queen->reset_job_by_dbID_and_sync($job_id);
675  Description:
676  For the specified job_id it will fetch just that job,
677  reset it completely as if it has never run, and return it.
678  Specifying a specific job bypasses the safety checks,
679  thus multiple workers could be running the
680  same job simultaneously (use only for debugging).
681  Returntype : none
682  Exceptions :
683  Caller : beekeeper.pl
684 
685 =cut
686 
687 sub reset_job_by_dbID_and_sync {
688  my ($self, $job_id) = @_;
689 
690  my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id);
691 
692  my $stats = $job->analysis->stats;
693 
694  $self->synchronize_AnalysisStats($stats);
695 }
696 
697 
698 ######################################
699 #
700 # Public API interface for beekeeper
701 #
702 ######################################
703 
704 
705  # Note: asking for Queen->fetch_overdue_workers(0) essentially means
706  # "fetch all workers known to the Queen not to be officially dead"
707  #
708 sub fetch_overdue_workers {
709  my ($self,$overdue_secs) = @_;
710 
711  $overdue_secs = 3600 unless(defined($overdue_secs));
712 
713  my $constraint = "status!='DEAD' AND (when_checked_in IS NULL OR ". $self->dbc->_interval_seconds_sql('when_checked_in') . " > $overdue_secs)";
714 
715  return $self->fetch_all( $constraint );
716 }
717 
718 
719 =head2 synchronize_hive
720 
721  Arg [1] : $list_of_analyses
722  Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
723  Description: Runs through all analyses in the given list and synchronizes
724  the analysis_stats summary with the states in the job and worker tables.
725  Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
726  Exceptions : none
727  Caller : general
728 
729 =cut
730 
731 sub synchronize_hive {
732  my ($self, $list_of_analyses) = @_;
733 
734  my $start_time = time();
735 
736  print "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
737  foreach my $analysis (@$list_of_analyses) {
738  $self->synchronize_AnalysisStats($analysis->stats);
739  print ( ($analysis->stats()->status eq 'BLOCKED') ? 'x' : 'o');
740  }
741  print "\n";
742 
743  print ''.((time() - $start_time))." seconds to synchronize_hive\n\n";
744 }
745 
746 
747 =head2 safe_synchronize_AnalysisStats
748 
749  Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
750  Example : $self->safe_synchronize_AnalysisStats($stats);
751  Description: Prewrapper around synchronize_AnalysisStats that does
752  checks and grabs sync_lock before proceeding with sync.
753  Used by distributed worker sync system to avoid contention.
754  Returns 1 on success and 0 if the lock could not have been obtained,
755  and so no sync was attempted.
756  Returntype : boolean
757  Caller : general
758 
759 =cut
760 
761 sub safe_synchronize_AnalysisStats {
762  my ($self, $stats) = @_;
763 
764  $stats->refresh();
765  my $was_synching = $stats->sync_lock;
766 
767  my $max_refresh_attempts = 5;
768  while($stats->sync_lock and $max_refresh_attempts--) { # another Worker/Beekeeper is synching this analysis right now
769  # ToDo: it would be nice to report the detected collision
770  sleep(1);
771  $stats->refresh(); # just try to avoid collision
772  }
773 
774  # The sync has just completed and we have the freshest stats
775  if ($was_synching && !$stats->sync_lock) {
776  return 'sync_done_by_friend';
777  }
778 
779  unless( ($stats->status eq 'DONE')
780  or ( ($stats->status eq 'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) {
781 
782  # In case $stats->sync_lock is set, this is basically giving it one last chance
783  my $sql = "UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
784  "WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
785 
786  my $row_count = $self->dbc->do($sql); # try to claim the sync_lock
787 
788  if( $row_count == 1 ) { # if we managed to obtain the lock, let's go and perform the sync:
789  if ($stats->sync_lock) {
790  # Actually the sync has just been completed by another agent. Save time and load the stats it computed
791  $stats->refresh();
792  # And release the lock
793  $stats->sync_lock(0);
794  $stats->adaptor->update_sync_lock($stats);
795  return 'sync_done_by_friend';
796  }
797  $self->synchronize_AnalysisStats($stats, 1);
798  return 'sync_done';
799  } else {
800  # otherwise assume it's locked and just return un-updated
801  return 0;
802  }
803  }
804 
805  return $stats->sync_lock ? 0 : 'stats_fresh_enough';
806 }
807 
808 
809 =head2 synchronize_AnalysisStats
810 
811  Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
812  Example : $self->synchronize_AnalysisStats( $stats );
813  Description: Queries the job and worker tables to get summary counts
814  and rebuilds the AnalysisStats object.
815  Then updates the analysis_stats table with the new summary info.
816  Exceptions : none
817  Caller : general
818 
819 =cut
820 
821 sub synchronize_AnalysisStats {
822  my ($self, $stats, $has_refresh_just_been_done) = @_;
823 
824  if( $stats and $stats->analysis_id ) {
825 
826  $stats->refresh() unless $has_refresh_just_been_done;
827 
828  my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
829 
830  $stats->recalculate_from_job_counts( $job_counts );
831 
832  # $stats->sync_lock(0); ## do we perhaps need it here?
833  $stats->update; #update and release sync_lock
834  }
835 }
836 
837 
838 =head2 check_nothing_to_run_but_semaphored
839 
840  Arg [1] : $list_of_analyses
841  Example : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] );
842  Description: Counts the number of immediately runnable jobs in the given analyses.
843  Exceptions : none
844  Caller : Scheduler
845 
846 =cut
847 
848 sub check_nothing_to_run_but_semaphored { # make sure it is run after a recent sync
849  my ($self, $list_of_analyses) = @_;
850 
851  my $only_semaphored_jobs_to_run = 1;
852  my $total_semaphored_job_count = 0;
853 
854  foreach my $analysis (@$list_of_analyses) {
855  my $stats = $analysis->stats;
856 
857  $only_semaphored_jobs_to_run = 0 if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count );
858  $total_semaphored_job_count += $stats->semaphored_job_count;
859  }
860 
861  return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run );
862 }
863 
864 
865 =head2 print_status_and_return_reasons_to_exit
866 
867  Arg [1] : $list_of_analyses
868  Arg [2] : $debug
869  Example : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] );
870  : foreach my $reason_to_exit (@$reasons_to_exit) {
871  : my $exit_message = $reason_to_exit->{'message'};
872  : my $exit_status = $reason_to_exit->{'exit_status'};
873  Description: Runs through all analyses in the given list, reports failed analyses, and computes some totals.
874  : It returns a list of exit messages and status codes. Each element of the list is a hashref,
875  : with the exit message keyed by 'message' and the status code keyed by 'exit_status'
876  :
877  : Possible status codes are:
878  : 'JOB_FAILED'
879  : 'ANALYSIS_FAILED'
880  : 'NO_WORK'
881  :
882  : If $debug is set, the list will contain all analyses. Otherwise, empty and done analyses
883  : will not be listed
884  Exceptions : none
885  Caller : beekeeper.pl
886 
887 =cut
888 
889 sub print_status_and_return_reasons_to_exit {
890  my ($self, $list_of_analyses, $debug) = @_;
891 
892  my ($total_done_jobs, $total_failed_jobs, $total_jobs, $total_excluded_jobs, $cpumsec_to_do) = (0) x 5;
893  my %skipped_analyses = ('EMPTY' => [], 'DONE' => []);
894  my @analyses_to_display;
895  my @reasons_to_exit;
896 
897  foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
898  my $stats = $analysis->stats;
899  my $failed_job_count = $stats->failed_job_count;
900  my $is_excluded = $stats->is_excluded;
901 
902  if ($debug or !$skipped_analyses{$stats->status}) {
903  push @analyses_to_display, $analysis;
904  } else {
905  push @{$skipped_analyses{$stats->status}}, $analysis;
906  }
907 
908  if ($failed_job_count > 0) {
909  $self->synchronize_AnalysisStats($stats);
910  $stats->determine_status();
911  my $exit_status;
912  my $failure_message;
913  my $logic_name = $analysis->logic_name;
914  my $tolerance = $analysis->failed_job_tolerance;
915  if( $stats->status eq 'FAILED') {
916  $exit_status = 'ANALYSIS_FAILED';
917  $failure_message = "### Analysis '$logic_name' has FAILED (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
918  } else {
919  $exit_status = 'JOB_FAILED';
920  $failure_message = "### Analysis '$logic_name' has failed jobs (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
921  }
922  push (@reasons_to_exit, {'message' => $failure_message,
923  'exit_status' => $exit_status});
924  }
925 
926  if ($is_excluded) {
927  my $excluded_job_count = $stats->total_job_count - $stats->done_job_count - $failed_job_count;
928  $total_excluded_jobs += $excluded_job_count;
929  push @{$skipped_analyses{'EXCLUDED'}}, $analysis;
930  }
931  $total_done_jobs += $stats->done_job_count;
932  $total_failed_jobs += $failed_job_count;
933  $total_jobs += $stats->total_job_count;
934  $cpumsec_to_do += $stats->ready_job_count * $stats->avg_msec_per_job;
935  }
936 
937  my $total_jobs_to_do = $total_jobs - $total_done_jobs - $total_failed_jobs - $total_excluded_jobs; # includes SEMAPHORED, READY, CLAIMED, INPROGRESS
938  my $cpuhrs_to_do = $cpumsec_to_do / (1000.0*60*60);
939  my $percentage_completed = $total_jobs
940  ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs)
941  : 0.0;
942 
943  # We use print_aligned_fields instead of printing each AnalysisStats' toString(),
944  # so that the fields are all vertically aligned.
945  if (@analyses_to_display) {
946  my $template = $analyses_to_display[0]->stats->_toString_template;
947  my @all_fields = map {$_->stats->_toString_fields} @analyses_to_display;
948  print_aligned_fields(\@all_fields, $template);
949  }
950  print "\n";
951 
952  if (@{$skipped_analyses{'EMPTY'}}) {
953  printf("%d analyses not shown because they don't have any jobs.\n", scalar(@{$skipped_analyses{'EMPTY'}}));
954  }
955  if (@{$skipped_analyses{'DONE'}}) {
956  printf("%d analyses not shown because all their jobs are done.\n", scalar(@{$skipped_analyses{'DONE'}}));
957  }
958  printf("total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed + %d excluded = %d total)\n",
959  scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_excluded_jobs, $total_jobs);
960 
961  unless( $total_jobs_to_do ) {
962  if ($total_excluded_jobs > 0) {
963  push (@reasons_to_exit, {'message' => "### Some analyses are excluded ###",
964  'exit_status' => 'NO_WORK'});
965  }
966  push (@reasons_to_exit, {'message' => "### No jobs left to do ###",
967  'exit_status' => 'NO_WORK'});
968  }
969 
970  return \@reasons_to_exit;
971 }
972 
973 
974 =head2 register_all_workers_dead
975 
976  Example : $queen->register_all_workers_dead();
977  Description: Registers all workers dead
978  Exceptions : none
979  Caller : beekeeper.pl
980 
981 =cut
982 
983 sub register_all_workers_dead {
984  my $self = shift;
985 
986  my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD'" );
987  foreach my $worker (@{$all_workers_considered_alive}) {
988  $self->register_worker_death( $worker );
989  }
990 }
991 
992 
993 sub interval_workers_with_unknown_usage {
994  my $self = shift @_;
995 
996  my %meadow_to_interval = ();
997 
998  my $sql_times = qq{
999  SELECT meadow_type, meadow_name, MIN(when_submitted), IFNULL(max(when_died), MAX(when_submitted)), COUNT(*)
1000  FROM worker w
1001  LEFT JOIN worker_resource_usage u USING(worker_id)
1002  WHERE u.worker_id IS NULL
1003  GROUP BY meadow_type, meadow_name
1004  };
1005  my $sth_times = $self->prepare( $sql_times );
1006  $sth_times->execute();
1007  while( my ($meadow_type, $meadow_name, $min_submitted, $max_died, $workers_count) = $sth_times->fetchrow_array() ) {
1008  $meadow_to_interval{$meadow_type}{$meadow_name} = {
1009  'min_submitted' => $min_submitted,
1010  'max_died' => $max_died,
1011  'workers_count' => $workers_count,
1012  };
1013  }
1014  $sth_times->finish();
1015 
1016  return \%meadow_to_interval;
1017 }
1018 
1019 
1020 sub store_resource_usage {
1021  my ($self, $report_entries, $processid_2_workerid) = @_;
1022 
1023  # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
1024 
1025  my $sql_delete = 'DELETE FROM worker_resource_usage WHERE worker_id=?';
1026  my $sth_delete = $self->prepare( $sql_delete );
1027 
1028  my $sql_insert = 'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)';
1029  my $sth_insert = $self->prepare( $sql_insert );
1030 
1031  my @not_ours = ();
1032 
1033  while( my ($process_id, $report_entry) = each %$report_entries ) {
1034 
1035  if( my $worker_id = $processid_2_workerid->{$process_id} ) {
1036  $sth_delete->execute( $worker_id );
1037 
1038  eval {
1039  $sth_insert->execute( $worker_id, @$report_entry{'exit_status', 'mem_megs', 'swap_megs', 'pending_sec', 'cpu_sec', 'lifespan_sec', 'exception_status'} ); # slicing hashref
1040  1;
1041  } or do {
1042  if($@ =~ /execute failed: Duplicate entry/s) { # ignore the collision with another parallel beekeeper
1043  $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id, "Collision detected when storing resource_usage", 'WORKER_CAUTION' );
1044  } else {
1045  die $@;
1046  }
1047  };
1048  } else {
1049  push @not_ours, $process_id;
1050  #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
1051  }
1052  }
1053  $sth_delete->finish();
1054  $sth_insert->finish();
1055 }
1056 
1057 
1058 1;
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
usage
public usage()
Bio::EnsEMBL::Hive::Storable::dbID
public Int dbID()
Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker
public suggest_analysis_to_specialize_a_worker()
Bio::EnsEMBL::Hive::Worker::cause_of_death
public cause_of_death()
map
public map()
Bio::EnsEMBL::Hive::Role
Definition: Role.pm:9
Bio::EnsEMBL::Hive::Utils::Config
Definition: Config.pm:12
Bio::EnsEMBL::Hive::Worker::worker_say
public worker_say()
Bio::EnsEMBL::Hive::Storable::new
public Bio::EnsEMBL::Hive::Storable new()
Bio::EnsEMBL::Hive::AnalysisStats
Definition: AnalysisStats.pm:12
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
Bio::EnsEMBL::Hive::Queen
Definition: Queen.pm:47
run
public run()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
Definition: AnalysisJobAdaptor.pm:22
Bio::EnsEMBL::Hive::AnalysisStats::refresh
public refresh()
info
public info()
Bio::EnsEMBL::Hive::Valley
Definition: Valley.pm:16
Bio::EnsEMBL::Hive::Scheduler
Definition: Scheduler.pm:15