ensembl-hive  2.8.1
Queen.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  The Queen of the Hive based job control system is responsible to 'birthing' the
10  correct number of workers of the right type so that they can find jobs to do.
11  It will also free up jobs of Workers that died unexpectantly so that other workers
12  can claim them to do.
13 
14  Hive based processing is a concept based on a more controlled version
15  of an autonomous agent type system. Each worker is not told what to do
16  (like a centralized control system - like the current pipeline system)
17  but rather queries a central database for jobs (give me jobs).
18 
19  Each worker is linked to an analysis_id, registers its self on creation
20  into the Hive, creates a RunnableDB instance of the Analysis->module,
21  gets $analysis->batch_size jobs from the job table, does its work,
22  creates the next layer of job entries by interfacing to
23  the DataflowRuleAdaptor to determine the analyses it needs to pass its
24  output data to and creates jobs on the next analysis database.
25  It repeats this cycle until it has lived its lifetime or until there are no
26  more jobs left.
27  The lifetime limit is just a safety limit to prevent these from 'infecting'
28  a system.
29 
30  The Queens job is to simply birth Workers of the correct analysis_id to get the
31  work down. The only other thing the Queen does is free up jobs that were
32  claimed by Workers that died unexpectantly so that other workers can take
33  over the work.
34 
35  The Beekeeper is in charge of interfacing between the Queen and a compute resource
36  or 'compute farm'. Its job is to query Queens if they need any workers and to
37  send the requested number of workers to open machines via the runWorker.pl script.
38  It is also responsible for interfacing with the Queen to identify worker which died
39  unexpectantly.
40 
41 =head1 LICENSE
42 
43  See the NOTICE file distributed with this work for additional information
44  regarding copyright ownership.
45 
46  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
47  You may obtain a copy of the License at
48 
49  http://www.apache.org/licenses/LICENSE-2.0
50 
51  Unless required by applicable law or agreed to in writing, software distributed under the License
52  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
53  See the License for the specific language governing permissions and limitations under the License.
54 
55 =head1 CONTACT
56 
57  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
58 
59 =head1 APPENDIX
60 
61  The rest of the documentation details each of the object methods.
62  Internal methods are usually preceded with a _
63 
64 =cut
65 
66 
67 package Bio::EnsEMBL::Hive::Queen;
68 
69 use strict;
70 use warnings;
71 
74 use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash', 'whoami', 'print_aligned_fields'); # NB: some are needed by invisible code
79 
81 
82 use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
83 
84 sub default_table_name {
85  return 'worker';
86 }
87 
88 
89 sub default_input_column_mapping {
90  my $self = shift @_;
91  return {
92  'when_submitted' => $self->dbc->_interval_seconds_sql('when_submitted') . ' seconds_since_when_submitted',
93  };
94 }
95 
96 
97 sub do_not_update_columns {
98  return ['when_submitted'];
99 }
100 
101 
102 
103 sub object_class {
104  return 'Bio::EnsEMBL::Hive::Worker';
105 }
106 
107 
108 ############################
109 #
110 # PUBLIC API
111 #
112 ############################
113 
114 
115 =head2 create_new_worker
116 
117  Description: Creates an entry in the worker table,
118  populates some non-storable attributes
119  and returns a Worker object based on that insert.
120  This guarantees that each worker registered in this Queen's hive is properly registered.
121  Returntype : Bio::EnsEMBL::Hive::Worker
122  Caller : runWorker.pl
123 
124 =cut
125 
126 sub create_new_worker {
127  my $self = shift @_;
128  my %flags = @_;
129 
130  my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id,
131  $no_write, $debug, $worker_base_temp_dir, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize,
132  $worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files)
133  = @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id
134  -no_write -debug -worker_base_temp_dir -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize
135  -worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)};
136 
137  sleep( $worker_delay_startup_seconds // 0 ); # NB: undefined parameter would have caused eternal sleep!
138 
139  if( defined( $worker_crash_on_startup_prob ) ) {
140  if( rand(1) < $worker_crash_on_startup_prob ) {
141  die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)";
142  }
143  }
144 
145  my $default_config = Bio::EnsEMBL::Hive::Utils::Config->new(@$config_files);
146  my ($meadow, $process_id, $meadow_host, $meadow_user) = Bio::EnsEMBL::Hive::Valley->new( $default_config )->whereami();
147  die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user);
148  my $meadow_type = $meadow->type;
149  my $meadow_name = $meadow->cached_name;
150 
151  foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) {
152  # So far 'RELOCATED events' has been detected on LSF 9.0 in response to sending signal #99 or #100
153  # Since I don't know how to avoid them, I am trying to register them when they happen.
154  # The following snippet buries the previous incarnation of the Worker before starting a new one.
155  #
156  # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
157  # LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
158  # So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
159  $prev_worker_incarnation->cause_of_death( 'RELOCATED' );
160  $self->register_worker_death( $prev_worker_incarnation );
161  }
162 
163  my $worker;
164 
165  if($preregistered) {
166 
167  my $max_registration_seconds = $meadow->config_get('MaxRegistrationSeconds');
168  my $seconds_waited = 0;
169  my $seconds_more = 5; # step increment
170 
171  until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
172  my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
173  if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
174  my $msg = "Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
175  $log_message_adaptor->store_hive_message($msg, 'WORKER_ERROR' );
176  die $msg;
177  } else {
178  $log_message_adaptor->store_hive_message("Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...", 'WORKER_CAUTION' );
179  sleep($seconds_more);
180  $seconds_waited += $seconds_more;
181  }
182  }
183 
184  # only update the fields that were not available at the time of submission:
185  $worker->meadow_host( $meadow_host );
186  $worker->meadow_user( $meadow_user );
187  $worker->when_born( 'CURRENT_TIMESTAMP' );
188  $worker->status( 'READY' );
189 
190  $self->update( $worker );
191 
192  } else {
193  my $resource_class;
194 
195  if( defined($resource_class_name) ) {
196  $resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('name' => $resource_class_name)
197  or die "resource_class with name='$resource_class_name' could not be fetched from the database";
198  } elsif( defined($resource_class_id) ) {
199  $resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('dbID', $resource_class_id)
200  or die "resource_class with dbID='$resource_class_id' could not be fetched from the database";
201  }
202 
203  $worker = Bio::EnsEMBL::Hive::Worker->new(
204  'meadow_type' => $meadow_type,
205  'meadow_name' => $meadow_name,
206  'process_id' => $process_id,
207  'resource_class' => $resource_class,
208  'beekeeper_id' => $beekeeper_id,
209 
210  'meadow_host' => $meadow_host,
211  'meadow_user' => $meadow_user,
212  );
213 
214  if (ref($self)) {
215  $self->store( $worker );
216 
217  $worker->when_born( 'CURRENT_TIMESTAMP' );
218  $self->update_when_born( $worker );
219 
220  $self->refresh( $worker );
221  }
222  }
223 
224  $worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
225  $worker->set_temp_directory_name( $worker_base_temp_dir || $meadow->config_get('BaseTempDirectory') );
226 
227  $worker->init;
228 
229  if(defined($job_limit)) {
230  $worker->job_limiter($job_limit);
231  $worker->life_span(0);
232  }
233 
234  $worker->life_span($life_span * 60) if($life_span); # $life_span min -> sec
235 
236  $worker->execute_writes(0) if($no_write);
237 
238  $worker->perform_cleanup(0) if($no_cleanup);
239 
240  $worker->debug($debug) if($debug);
241 
242  $worker->retry_throwing_jobs($retry_throwing_jobs) if(defined $retry_throwing_jobs);
243 
244  $worker->can_respecialize($can_respecialize) if(defined $can_respecialize);
245 
246  return $worker;
247 }
248 
249 
250 =head2 specialize_worker
251 
252  Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis.
253  If not specified the Queen will analyze the hive and pick the most suitable analysis.
255 
256 =cut
257 
258 sub specialize_worker {
259  my $self = shift @_;
260  my $worker = shift @_;
261  my $flags = shift @_;
262 
263  my ($analyses_pattern, $job_id, $force)
264  = @$flags{qw(-analyses_pattern -job_id -force)};
265 
266  if( $analyses_pattern and $job_id ) {
267  die "At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
268  }
269 
270  my $analysis;
271 
272  if( $job_id ) {
273 
274  $worker->worker_say("resetting and fetching job for job_id '$job_id'");
275 
276  my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
277 
278  my $job = $job_adaptor->fetch_by_dbID( $job_id )
279  or die "Could not fetch job with dbID='$job_id'";
280  my $job_status = $job->status();
281 
282  if ($Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor::ALL_STATUSES_OF_TAKEN_JOBS =~ /'$job_status'/) {
283  die "Job with dbID='$job_id' is already in progress, cannot run"; # FIXME: try GC first, then complain
284  } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
285  die "Job with dbID='$job_id' is $job_status, please use --force to override";
286  }
287 
288  $analysis = $job->analysis;
289  if(($analysis->stats->status eq 'BLOCKED') and !$force) {
290  die "Analysis is BLOCKED, can't specialize a worker. Please use --force to override";
291  }
292 
293  if(($job_status eq 'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
294  $worker->worker_say("Increasing the semaphore count of the dependent job");
295  $controlled_semaphore->increase_by( [ $job ] );
296  }
297 
298  $analysis->stats->adaptor->increment_a_counter( $Bio::EnsEMBL::Hive::AnalysisStats::status2counter{$job->status}, -1, $job->analysis_id );
299 
300  } else {
301 
302  $analyses_pattern //= '%'; # for printing
303  my $analyses_matching_pattern = $worker->hive_pipeline->collection_of( 'Analysis' )->find_all_by_pattern( $analyses_pattern );
304 
305  # refresh the stats of matching analyses before re-specialization:
306  foreach my $analysis ( @$analyses_matching_pattern ) {
307  $analysis->stats->refresh();
308  }
309  $self->db->hive_pipeline->invalidate_hive_current_load;
310 
311  $analysis = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_matching_pattern, $analyses_pattern);
312 
313  unless( ref($analysis) ) {
314 
315  $worker->cause_of_death('NO_ROLE');
316 
317  my $msg = $analysis // "No analysis suitable for the worker was found";
318  die "$msg\n";
319  }
320  }
321 
322  my $new_role = Bio::EnsEMBL::Hive::Role->new(
323  'worker' => $worker,
324  'analysis' => $analysis,
325  );
326  $self->db->get_RoleAdaptor->store( $new_role );
327  $worker->current_role( $new_role );
328 
329  my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
330 
331  if($job_id) {
332  my $role_id = $new_role->dbID;
333  if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
334 
335  $worker->special_batch( [ $job ] );
336  } else {
337  die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
338  }
339 
340  } else { # Note: special batch Workers should avoid flipping the status to 'WORKING' in case the analysis is still 'BLOCKED'
341 
342  $analysis_stats_adaptor->update_status($analysis->dbID, 'WORKING');
343  }
344 
345  # The following increment used to be done only when no specific task was given to the worker,
346  # thereby excluding such "special task" workers from being counted in num_running_workers.
347  #
348  # However this may be tricky to emulate by triggers that know nothing about "special tasks",
349  # so I am (temporarily?) simplifying the accounting algorithm.
350  #
351  $analysis_stats_adaptor->increment_a_counter( 'num_running_workers', 1, $analysis->dbID );
352 }
353 
354 
355 sub register_worker_death {
356  my ($self, $worker, $update_when_checked_in) = @_;
357 
358  my $worker_id = $worker->dbID;
359  my $work_done = $worker->work_done;
360  my $cause_of_death = $worker->cause_of_death || 'UNKNOWN'; # make sure we do not attempt to insert a void
361  my $worker_died = $worker->when_died;
362 
363  my $current_role = $worker->current_role;
364 
365  unless( $current_role ) {
366  $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
367  }
368 
369  if( $current_role and !$current_role->when_finished() ) {
370  # List of cause_of_death:
371  # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG'
372  # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN'
373  my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN|SEE_EXIT_STATUS)$/);
374  $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
375  $current_role->when_finished( $worker_died );
376  $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
377  }
378 
379  my $sql = "UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'"
380  . ( $update_when_checked_in ? ', when_checked_in=CURRENT_TIMESTAMP ' : '' )
381  . ( $worker_died ? ", when_died='$worker_died'" : ', when_died=CURRENT_TIMESTAMP' )
382  . " WHERE worker_id='$worker_id' ";
383 
384  $self->dbc->protected_prepare_execute( [ $sql ],
385  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "register_worker_death".$after, 'INFO' ); }
386  );
387 }
388 
389 
390 sub kill_all_workers {
391  my ( $self, $valley ) = @_;
392 
393  my $this_meadow_user = whoami();
394  my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD'" );
395  foreach my $worker ( @{ $all_workers_considered_alive } ) {
396  my $kill_status;
397 
398  my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker );
399  if ( ! defined $meadow ) {
400  # Most likely a meadow not reachable for the current beekeeper,
401  # e.g. a LOCAL one started on a different host.
402  $kill_status = 'meadow not reachable';
403  }
404  elsif ( ! $meadow->can('kill_worker') ) {
405  $kill_status = 'killing workers not supported by the meadow';
406  }
407  elsif ( $worker->meadow_user eq $this_meadow_user ) { # if I'm actually allowed to kill the worker...
408  # The actual termination of a worker might well be asynchronous
409  # but at least we check for obvious problems, e.g. insufficient
410  # permissions to execute a kill.
411  my $kill_return_value = $meadow->kill_worker( $worker, 1 );
412  if ( $kill_return_value != 0 ) {
413  $kill_status = "request failure (return code: ${kill_return_value})";
414  }
415  else {
416  $kill_status = 'requested successfully';
417  $worker->cause_of_death( 'KILLED_BY_USER' );
418  $self->register_worker_death( $worker );
419  if ( $worker->status ne 'SUBMITTED' ) { # There is no worker_temp_directory before specialization
420  $meadow->cleanup_temp_directory( $worker );
421  }
422  }
423  }
424  else {
425  $kill_status = 'could not kill, running under user ' . $worker->meadow_user;
426  }
427 
428  print 'Killing worker ' . $worker->dbID() . ': '
429  . $worker->toString( 1 ) . " -> $kill_status \n";
430  }
431 
432  return;
433 }
434 
435 
436 sub cached_resource_mapping {
437  my $self = shift;
438  $self->{'_cached_resource_mapping'} ||= { map { $_->dbID => $_->name } $self->db->hive_pipeline->collection_of('ResourceClass')->list };
439  return $self->{'_cached_resource_mapping'};
440 }
441 
442 
443 sub registered_workers_attributes {
444  my $self = shift @_;
445 
446  return $self->fetch_all("status!='DEAD'", 1, ['meadow_type', 'meadow_name', 'meadow_user', 'process_id'], 'status' );
447 }
448 
449 
450 sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
451  my ($self, $meadow_user) = @_;
452 
453  my $worker_counts_by_meadow_type_rc_id = $self->count_all("status='SUBMITTED' AND meadow_user='$meadow_user'", ['meadow_type', 'resource_class_id'] );
454  my $cached_resource_mapping = $self->cached_resource_mapping;
455 
456  my %counts_by_meadow_type_rc_name = ();
457 
458  while(my ($meadow_type, $counts_by_rc_id) = each %$worker_counts_by_meadow_type_rc_id) {
459  while(my ($rc_id, $count) = each %$counts_by_rc_id) {
460  my $rc_name = $cached_resource_mapping->{ $rc_id } || '__undefined_rc_name__';
461  $counts_by_meadow_type_rc_name{ $meadow_type }{ $rc_name } = $count;
462  }
463  }
464 
465  return \%counts_by_meadow_type_rc_name;
466 }
467 
468 
469 sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
470  my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
471 
472  my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
473 
474  print "GarbageCollector:\tChecking for lost Workers...\n";
475 
476  # all non-DEAD workers found in the database, with their meadow status
477  my $reconciled_worker_statuses = $valley->query_worker_statuses( $self->registered_workers_attributes );
478  # selects the workers available in this valley. does not query the database / meadow
479  my $signature_and_pid_to_worker_status = $valley->status_of_all_our_workers_by_meadow_signature( $reconciled_worker_statuses );
480  # this may pick up workers that have been created since the last fetch
481  my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
482 
483  if (@$queen_overdue_workers) {
484  print "GarbageCollector:\tOut of the ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n";
485  } else {
486  print "GarbageCollector:\tfound none (all have checked in during the last $last_few_seconds seconds)\n";
487  }
488 
489  my $this_meadow_user = whoami();
490 
491  my %meadow_status_counts = ();
492  my %mt_and_pid_to_lost_worker = ();
493  foreach my $worker (@$queen_overdue_workers) {
494 
495  my $meadow_signature = $worker->meadow_type.'/'.$worker->meadow_name;
496  if(my $pid_to_worker_status = $signature_and_pid_to_worker_status->{$meadow_signature}) { # the whole Meadow subhash is either present or the Meadow is unreachable
497 
498  my $meadow_type = $worker->meadow_type;
499  my $process_id = $worker->process_id;
500  my $status = $pid_to_worker_status->{$process_id} // 'DEFERRED_CHECK'; # Workers that have been created between registered_workers_attributes and fetch_overdue_workers
501 
502  if($bury_unkwn_workers and ($status eq 'UNKWN')) {
503  if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
504  if($meadow->can('kill_worker')) {
505  if($worker->meadow_user eq $this_meadow_user) { # if I'm actually allowed to kill the worker...
506  print "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id\n";
507 
508  $meadow->kill_worker($worker, 1);
509  $status = 'LOST';
510  }
511  }
512  }
513  }
514 
515  $meadow_status_counts{$meadow_signature}{$status}++;
516 
517  if(($status eq 'LOST') or ($status eq 'SUBMITTED') or ($status eq 'COMPILATION')) {
518 
519  $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
520 
521  } elsif ($status eq 'DEFERRED_CHECK') {
522 
523  # do nothing now, wait until the next pass to check on this worker
524 
525  } else {
526 
527  # RUN|PEND|xSUSP handling
528  my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id='".$worker->dbID."'";
529  $self->dbc->protected_prepare_execute( [ $update_when_seen_sql ],
530  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "see_worker".$after, 'INFO' ); }
531  );
532  }
533  } else {
534  $meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
535  }
536  }
537 
538  # print a quick summary report:
539  while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
540  print "GarbageCollector:\t[$meadow_signature Meadow:]\t".join(', ', map { "$_:$status_count->{$_}" } keys %$status_count )."\n\n";
541  }
542 
543  while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
544  my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
545 
546  if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
547  print "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";
548 
549  my $report_entries;
550 
551  if($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
552  my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
553  print "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type workers died:\n";
554  print "" . join (", ", (map { $_->{'cause_of_death'} } values %$report_entries)) . "\n";
555  } else {
556  print "GarbageCollector:\tUnknown why $lost_this_meadow $meadow_type workers died\n";
557  }
558 
559  print "GarbageCollector:\tRecording workers' missing attributes, registering their death, releasing their jobs and cleaning up temp directories\n";
560  while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
561  if(my $report_entry = $report_entries && $report_entries->{$process_id}) {
562  my @updated_attribs = ();
563  foreach my $worker_attrib ( qw(when_born meadow_host when_died cause_of_death) ) {
564  if( defined( $report_entry->{$worker_attrib} ) ) {
565  $worker->$worker_attrib( $report_entry->{$worker_attrib} );
566  push @updated_attribs, $worker_attrib;
567  }
568  }
569  $self->update( $worker, @updated_attribs ) if(scalar(@updated_attribs));
570  }
571 
572  my $max_limbo_seconds = $this_meadow->config_get('MaxLimboSeconds') // 0; # The maximum time for a Meadow to start showing the Worker (even in PEND state) after submission.
573  # We use it as a timeout for burying SUBMITTED and Meadow-invisible entries in the 'worker' table.
574 
575  if( ($worker->status ne 'SUBMITTED')
576  || $worker->when_died # reported by Meadow as DEAD (only if Meadow supports get_report_entries_for_process_ids)
577  || ($worker->seconds_since_when_submitted > $max_limbo_seconds) ) { # SUBMITTED and Meadow-invisible for too long => we consider them LOST
578 
579  $worker->cause_of_death('LIMBO') if( ($worker->status eq 'SUBMITTED') and !$worker->cause_of_death); # LIMBO cause_of_death means: found in SUBMITTED state, exceeded the timeout, Meadow did not tell us more
580 
581  $self->register_worker_death( $worker );
582 
583  if( ($worker->status ne 'SUBMITTED') # There is no worker_temp_directory before specialization
584  and ($worker->meadow_user eq $this_meadow_user) ) { # if I'm actually allowed to kill the worker...
585  $this_meadow->cleanup_temp_directory( $worker );
586  }
587  }
588  }
589 
590  if( $report_entries && %$report_entries ) { # use the opportunity to also store resource usage of the buried workers:
591  my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
592  $self->store_resource_usage( $report_entries, $processid_2_workerid );
593  }
594  }
595  }
596 
597  # the following bit is completely Meadow-agnostic and only restores database integrity:
598  if($check_buried_in_haste) {
599  my $role_adaptor = $self->db->get_RoleAdaptor;
600  my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
601 
602  print "GarbageCollector:\tChecking for orphan roles...\n";
603  my $orphan_roles = $role_adaptor->fetch_all_unfinished_roles_of_dead_workers();
604  if(my $orphan_role_number = scalar @$orphan_roles) {
605  print "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
606  foreach my $orphan_role (@$orphan_roles) {
607  $role_adaptor->finalize_role( $orphan_role );
608  }
609  } else {
610  print "GarbageCollector:\tfound none\n";
611  }
612 
613  print "GarbageCollector:\tChecking for roles buried in haste...\n";
614  my $buried_in_haste_roles = $role_adaptor->fetch_all_finished_roles_with_unfinished_jobs();
615  if(my $bih_number = scalar @$buried_in_haste_roles) {
616  print "GarbageCollector:\tfound $bih_number buried roles with unfinished jobs, reclaiming.\n\n";
617  foreach my $role (@$buried_in_haste_roles) {
618  $job_adaptor->release_undone_jobs_from_role( $role );
619  }
620  } else {
621  print "GarbageCollector:\tfound none\n";
622  }
623 
624  print "GarbageCollector:\tChecking for orphan jobs...\n";
625  my $orphan_jobs = $job_adaptor->fetch_all_unfinished_jobs_with_no_roles();
626  if(my $sj_number = scalar @$orphan_jobs) {
627  print "GarbageCollector:\tfound $sj_number unfinished jobs with no roles, reclaiming.\n\n";
628  foreach my $job (@$orphan_jobs) {
629  $job_adaptor->release_and_age_job($job->dbID, $job->analysis->max_retry_count, 1);
630  }
631  } else {
632  print "GarbageCollector:\tfound none\n";
633  }
634  }
635 }
636 
637 
638  # To tackle the RELOCATED event: this method checks whether there are already workers with these attributes
639 sub find_previous_worker_incarnations {
640  my ($self, $meadow_type, $meadow_name, $process_id) = @_;
641 
642  # This happens in standalone mode, when there is no database
643  return [] unless ref($self);
644 
645  return $self->fetch_all( "status!='DEAD' AND status!='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" );
646 }
647 
648 
649 sub fetch_preregistered_worker {
650  my ($self, $meadow_type, $meadow_name, $process_id) = @_;
651 
652  # This happens in standalone mode, when there is no database
653  return [] unless ref($self);
654 
655  my ($worker) = @{ $self->fetch_all( "status='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" ) };
656 
657  return $worker;
658 }
659 
660 
661  # a new version that both checks in and updates the status
662 sub check_in_worker {
663  my ($self, $worker) = @_;
664 
665  my $sql = "UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status='".$worker->status."', work_done='".$worker->work_done."' WHERE worker_id='".$worker->dbID."'";
666 
667  $self->dbc->protected_prepare_execute( [ $sql ],
668  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "check_in_worker".$after, 'INFO' ); }
669  );
670 }
671 
672 
673 =head2 reset_job_by_dbID_and_sync
674 
675  Arg [1]: int $job_id
676  Example:
677  my $job = $queen->reset_job_by_dbID_and_sync($job_id);
678  Description:
679  For the specified job_id it will fetch just that job,
680  reset it completely as if it has never run, and return it.
681  Specifying a specific job bypasses the safety checks,
682  thus multiple workers could be running the
683  same job simultaneously (use only for debugging).
684  Returntype : none
685  Exceptions :
686  Caller : beekeeper.pl
687 
688 =cut
689 
690 sub reset_job_by_dbID_and_sync {
691  my ($self, $job_id) = @_;
692 
693  my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id);
694 
695  my $stats = $job->analysis->stats;
696 
697  $self->synchronize_AnalysisStats($stats);
698 }
699 
700 
701 ######################################
702 #
703 # Public API interface for beekeeper
704 #
705 ######################################
706 
707 
708  # Note: asking for Queen->fetch_overdue_workers(0) essentially means
709  # "fetch all workers known to the Queen not to be officially dead"
710  #
711 sub fetch_overdue_workers {
712  my ($self,$overdue_secs) = @_;
713 
714  $overdue_secs = 3600 unless(defined($overdue_secs));
715 
716  my $constraint = "status!='DEAD' AND (when_checked_in IS NULL OR ". $self->dbc->_interval_seconds_sql('when_checked_in') . " > $overdue_secs)";
717 
718  return $self->fetch_all( $constraint );
719 }
720 
721 
722 =head2 synchronize_hive
723 
724  Arg [1] : $list_of_analyses
725  Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
726  Description: Runs through all analyses in the given list and synchronizes
727  the analysis_stats summary with the states in the job and worker tables.
728  Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
729  Exceptions : none
730  Caller : general
731 
732 =cut
733 
734 sub synchronize_hive {
735  my ($self, $list_of_analyses) = @_;
736 
737  my $start_time = time();
738 
739  print "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
740  foreach my $analysis (@$list_of_analyses) {
741  $self->synchronize_AnalysisStats($analysis->stats);
742  print ( ($analysis->stats()->status eq 'BLOCKED') ? 'x' : 'o');
743  }
744  print "\n";
745 
746  print ''.((time() - $start_time))." seconds to synchronize_hive\n\n";
747 }
748 
749 
750 =head2 safe_synchronize_AnalysisStats
751 
752  Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
753  Example : $self->safe_synchronize_AnalysisStats($stats);
754  Description: Prewrapper around synchronize_AnalysisStats that does
755  checks and grabs sync_lock before proceeding with sync.
756  Used by distributed worker sync system to avoid contention.
757  Returns 1 on success and 0 if the lock could not have been obtained,
758  and so no sync was attempted.
759  Returntype : boolean
760  Caller : general
761 
762 =cut
763 
764 sub safe_synchronize_AnalysisStats {
765  my ($self, $stats) = @_;
766 
767  $stats->refresh();
768  my $was_synching = $stats->sync_lock;
769 
770  my $max_refresh_attempts = 5;
771  while($stats->sync_lock and $max_refresh_attempts--) { # another Worker/Beekeeper is synching this analysis right now
772  # ToDo: it would be nice to report the detected collision
773  sleep(1);
774  $stats->refresh(); # just try to avoid collision
775  }
776 
777  # The sync has just completed and we have the freshest stats
778  if ($was_synching && !$stats->sync_lock) {
779  return 'sync_done_by_friend';
780  }
781 
782  unless( ($stats->status eq 'DONE')
783  or ( ($stats->status eq 'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) {
784 
785  # In case $stats->sync_lock is set, this is basically giving it one last chance
786  my $sql = "UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
787  "WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
788 
789  my $row_count = $self->dbc->do($sql); # try to claim the sync_lock
790 
791  if( $row_count == 1 ) { # if we managed to obtain the lock, let's go and perform the sync:
792  if ($stats->sync_lock) {
793  # Actually the sync has just been completed by another agent. Save time and load the stats it computed
794  $stats->refresh();
795  # And release the lock
796  $stats->sync_lock(0);
797  $stats->adaptor->update_sync_lock($stats);
798  return 'sync_done_by_friend';
799  }
800  $self->synchronize_AnalysisStats($stats, 1);
801  return 'sync_done';
802  } else {
803  # otherwise assume it's locked and just return un-updated
804  return 0;
805  }
806  }
807 
808  return $stats->sync_lock ? 0 : 'stats_fresh_enough';
809 }
810 
811 
812 =head2 synchronize_AnalysisStats
813 
814  Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
815  Example : $self->synchronize_AnalysisStats( $stats );
816  Description: Queries the job and worker tables to get summary counts
817  and rebuilds the AnalysisStats object.
818  Then updates the analysis_stats table with the new summary info.
819  Exceptions : none
820  Caller : general
821 
822 =cut
823 
824 sub synchronize_AnalysisStats {
825  my ($self, $stats, $has_refresh_just_been_done) = @_;
826 
827  if( $stats and $stats->analysis_id ) {
828 
829  $stats->refresh() unless $has_refresh_just_been_done;
830 
831  my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
832 
833  $stats->recalculate_from_job_counts( $job_counts );
834 
835  # $stats->sync_lock(0); ## do we perhaps need it here?
836  $stats->update; #update and release sync_lock
837  }
838 }
839 
840 
841 =head2 check_nothing_to_run_but_semaphored
842 
843  Arg [1] : $list_of_analyses
844  Example : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] );
845  Description: Counts the number of immediately runnable jobs in the given analyses.
846  Exceptions : none
847  Caller : Scheduler
848 
849 =cut
850 
851 sub check_nothing_to_run_but_semaphored { # make sure it is run after a recent sync
852  my ($self, $list_of_analyses) = @_;
853 
854  my $only_semaphored_jobs_to_run = 1;
855  my $total_semaphored_job_count = 0;
856 
857  foreach my $analysis (@$list_of_analyses) {
858  my $stats = $analysis->stats;
859 
860  $only_semaphored_jobs_to_run = 0 if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count );
861  $total_semaphored_job_count += $stats->semaphored_job_count;
862  }
863 
864  return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run );
865 }
866 
867 
868 =head2 print_status_and_return_reasons_to_exit
869 
870  Arg [1] : $list_of_analyses
871  Arg [2] : $debug
872  Example : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] );
873  : foreach my $reason_to_exit (@$reasons_to_exit) {
874  : my $exit_message = $reason_to_exit->{'message'};
875  : my $exit_status = $reason_to_exit->{'exit_status'};
876  Description: Runs through all analyses in the given list, reports failed analyses, and computes some totals.
877  : It returns a list of exit messages and status codes. Each element of the list is a hashref,
878  : with the exit message keyed by 'message' and the status code keyed by 'exit_status'
879  :
880  : Possible status codes are:
881  : 'JOB_FAILED'
882  : 'ANALYSIS_FAILED'
883  : 'NO_WORK'
884  :
885  : If $debug is set, the list will contain all analyses. Otherwise, empty and done analyses
886  : will not be listed
887  Exceptions : none
888  Caller : beekeeper.pl
889 
890 =cut
891 
892 sub print_status_and_return_reasons_to_exit {
893  my ($self, $list_of_analyses, $debug) = @_;
894 
895  my ($total_done_jobs, $total_failed_jobs, $total_jobs, $total_excluded_jobs, $cpumsec_to_do) = (0) x 5;
896  my %skipped_analyses = ('EMPTY' => [], 'DONE' => []);
897  my @analyses_to_display;
898  my @reasons_to_exit;
899 
900  foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
901  my $stats = $analysis->stats;
902  my $failed_job_count = $stats->failed_job_count;
903  my $is_excluded = $stats->is_excluded;
904 
905  if ($debug or !$skipped_analyses{$stats->status}) {
906  push @analyses_to_display, $analysis;
907  } else {
908  push @{$skipped_analyses{$stats->status}}, $analysis;
909  }
910 
911  if ($failed_job_count > 0) {
912  $self->synchronize_AnalysisStats($stats);
913  $stats->determine_status();
914  my $exit_status;
915  my $failure_message;
916  my $logic_name = $analysis->logic_name;
917  my $tolerance = $analysis->failed_job_tolerance;
918  if( $stats->status eq 'FAILED') {
919  $exit_status = 'ANALYSIS_FAILED';
920  $failure_message = "### Analysis '$logic_name' has FAILED (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
921  } else {
922  $exit_status = 'JOB_FAILED';
923  $failure_message = "### Analysis '$logic_name' has failed jobs (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
924  }
925  push (@reasons_to_exit, {'message' => $failure_message,
926  'exit_status' => $exit_status});
927  }
928 
929  if ($is_excluded) {
930  my $excluded_job_count = $stats->total_job_count - $stats->done_job_count - $failed_job_count;
931  $total_excluded_jobs += $excluded_job_count;
932  push @{$skipped_analyses{'EXCLUDED'}}, $analysis;
933  }
934  $total_done_jobs += $stats->done_job_count;
935  $total_failed_jobs += $failed_job_count;
936  $total_jobs += $stats->total_job_count;
937  $cpumsec_to_do += $stats->ready_job_count * $stats->avg_msec_per_job;
938  }
939 
940  my $total_jobs_to_do = $total_jobs - $total_done_jobs - $total_failed_jobs - $total_excluded_jobs; # includes SEMAPHORED, READY, CLAIMED, INPROGRESS
941  my $cpuhrs_to_do = $cpumsec_to_do / (1000.0*60*60);
942  my $percentage_completed = $total_jobs
943  ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs)
944  : 0.0;
945 
946  # We use print_aligned_fields instead of printing each AnalysisStats' toString(),
947  # so that the fields are all vertically aligned.
948  if (@analyses_to_display) {
949  my $template = $analyses_to_display[0]->stats->_toString_template;
950  my @all_fields = map {$_->stats->_toString_fields} @analyses_to_display;
951  print_aligned_fields(\@all_fields, $template);
952  }
953  print "\n";
954 
955  if (@{$skipped_analyses{'EMPTY'}}) {
956  printf("%d analyses not shown because they don't have any jobs.\n", scalar(@{$skipped_analyses{'EMPTY'}}));
957  }
958  if (@{$skipped_analyses{'DONE'}}) {
959  printf("%d analyses not shown because all their jobs are done.\n", scalar(@{$skipped_analyses{'DONE'}}));
960  }
961  printf("total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed + %d excluded = %d total)\n",
962  scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_excluded_jobs, $total_jobs);
963 
964  unless( $total_jobs_to_do ) {
965  if ($total_excluded_jobs > 0) {
966  push (@reasons_to_exit, {'message' => "### Some analyses are excluded ###",
967  'exit_status' => 'NO_WORK'});
968  }
969  push (@reasons_to_exit, {'message' => "### No jobs left to do ###",
970  'exit_status' => 'NO_WORK'});
971  }
972 
973  return \@reasons_to_exit;
974 }
975 
976 
977 =head2 register_all_workers_dead
978 
979  Example : $queen->register_all_workers_dead();
980  Description: Registers all workers dead
981  Exceptions : none
982  Caller : beekeeper.pl
983 
984 =cut
985 
986 sub register_all_workers_dead {
987  my $self = shift;
988 
989  my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD'" );
990  foreach my $worker (@{$all_workers_considered_alive}) {
991  $self->register_worker_death( $worker );
992  }
993 }
994 
995 
996 sub interval_workers_with_unknown_usage {
997  my $self = shift @_;
998 
999  my %meadow_to_interval = ();
1000 
1001  my $sql_times = qq{
1002  SELECT meadow_type, meadow_name, MIN(when_submitted), IFNULL(max(when_died), MAX(when_submitted)), COUNT(*)
1003  FROM worker w
1004  LEFT JOIN worker_resource_usage u USING(worker_id)
1005  WHERE u.worker_id IS NULL
1006  GROUP BY meadow_type, meadow_name
1007  };
1008  my $sth_times = $self->prepare( $sql_times );
1009  $sth_times->execute();
1010  while( my ($meadow_type, $meadow_name, $min_submitted, $max_died, $workers_count) = $sth_times->fetchrow_array() ) {
1011  $meadow_to_interval{$meadow_type}{$meadow_name} = {
1012  'min_submitted' => $min_submitted,
1013  'max_died' => $max_died,
1014  'workers_count' => $workers_count,
1015  };
1016  }
1017  $sth_times->finish();
1018 
1019  return \%meadow_to_interval;
1020 }
1021 
1022 
1023 sub store_resource_usage {
1024  my ($self, $report_entries, $processid_2_workerid) = @_;
1025 
1026  # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
1027 
1028  my $sql_delete = 'DELETE FROM worker_resource_usage WHERE worker_id=?';
1029  my $sth_delete = $self->prepare( $sql_delete );
1030 
1031  my $sql_insert = 'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)';
1032  my $sth_insert = $self->prepare( $sql_insert );
1033 
1034  my @not_ours = ();
1035 
1036  while( my ($process_id, $report_entry) = each %$report_entries ) {
1037 
1038  if( my $worker_id = $processid_2_workerid->{$process_id} ) {
1039  $sth_delete->execute( $worker_id );
1040 
1041  eval {
1042  $sth_insert->execute( $worker_id, @$report_entry{'exit_status', 'mem_megs', 'swap_megs', 'pending_sec', 'cpu_sec', 'lifespan_sec', 'exception_status'} ); # slicing hashref
1043  1;
1044  } or do {
1045  if($@ =~ /execute failed: Duplicate entry/s) { # ignore the collision with another parallel beekeeper
1046  $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id, "Collision detected when storing resource_usage", 'WORKER_CAUTION' );
1047  } else {
1048  die $@;
1049  }
1050  };
1051  } else {
1052  push @not_ours, $process_id;
1053  #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
1054  }
1055  }
1056  $sth_delete->finish();
1057  $sth_insert->finish();
1058 }
1059 
1060 
1061 1;
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
usage
public usage()
Bio::EnsEMBL::Hive::Storable::dbID
public Int dbID()
Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker
public suggest_analysis_to_specialize_a_worker()
Bio::EnsEMBL::Hive::Worker::cause_of_death
public cause_of_death()
map
public map()
Bio::EnsEMBL::Hive::Role
Definition: Role.pm:9
Bio::EnsEMBL::Hive::Utils::Config
Definition: Config.pm:12
Bio::EnsEMBL::Hive::Worker::worker_say
public worker_say()
Bio::EnsEMBL::Hive::Storable::new
public Bio::EnsEMBL::Hive::Storable new()
Bio::EnsEMBL::Hive::AnalysisStats
Definition: AnalysisStats.pm:12
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
Bio::EnsEMBL::Hive::Queen
Definition: Queen.pm:47
run
public run()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
Definition: AnalysisJobAdaptor.pm:22
Bio::EnsEMBL::Hive::AnalysisStats::refresh
public refresh()
info
public info()
Bio::EnsEMBL::Hive::Valley
Definition: Valley.pm:16
Bio::EnsEMBL::Hive::Scheduler
Definition: Scheduler.pm:15