ensembl-hive  2.5
Queen.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  The Queen of the Hive based job control system is responsible to 'birthing' the
10  correct number of workers of the right type so that they can find jobs to do.
11  It will also free up jobs of Workers that died unexpectantly so that other workers
12  can claim them to do.
13 
14  Hive based processing is a concept based on a more controlled version
15  of an autonomous agent type system. Each worker is not told what to do
16  (like a centralized control system - like the current pipeline system)
17  but rather queries a central database for jobs (give me jobs).
18 
19  Each worker is linked to an analysis_id, registers its self on creation
20  into the Hive, creates a RunnableDB instance of the Analysis->module,
21  gets $analysis->batch_size jobs from the job table, does its work,
22  creates the next layer of job entries by interfacing to
23  the DataflowRuleAdaptor to determine the analyses it needs to pass its
24  output data to and creates jobs on the next analysis database.
25  It repeats this cycle until it has lived its lifetime or until there are no
26  more jobs left.
27  The lifetime limit is just a safety limit to prevent these from 'infecting'
28  a system.
29 
30  The Queens job is to simply birth Workers of the correct analysis_id to get the
31  work down. The only other thing the Queen does is free up jobs that were
32  claimed by Workers that died unexpectantly so that other workers can take
33  over the work.
34 
35  The Beekeeper is in charge of interfacing between the Queen and a compute resource
36  or 'compute farm'. Its job is to query Queens if they need any workers and to
37  send the requested number of workers to open machines via the runWorker.pl script.
38  It is also responsible for interfacing with the Queen to identify worker which died
39  unexpectantly.
40 
41 =head1 LICENSE
42 
43  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
44  Copyright [2016-2022] EMBL-European Bioinformatics Institute
45 
46  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
47  You may obtain a copy of the License at
48 
49  http://www.apache.org/licenses/LICENSE-2.0
50 
51  Unless required by applicable law or agreed to in writing, software distributed under the License
52  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
53  See the License for the specific language governing permissions and limitations under the License.
54 
55 =head1 CONTACT
56 
57  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
58 
59 =head1 APPENDIX
60 
61  The rest of the documentation details each of the object methods.
62  Internal methods are usually preceded with a _
63 
64 =cut
65 
66 
67 package Bio::EnsEMBL::Hive::Queen;
68 
69 use strict;
70 use warnings;
71 use File::Path 'make_path';
72 use List::Util qw(max);
73 
75 use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash', 'whoami'); # NB: needed by invisible code
80 
81 use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
82 
83 sub default_table_name {
84  return 'worker';
85 }
86 
87 
88 sub default_input_column_mapping {
89  my $self = shift @_;
90  my $driver = $self->dbc->driver();
91  return {
92  'when_submitted' => {
93  'mysql' => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(when_submitted) seconds_since_when_submitted ",
94  'sqlite' => "strftime('%s','now')-strftime('%s',when_submitted) seconds_since_when_submitted ",
95  'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - when_submitted) seconds_since_when_submitted ",
96  }->{$driver},
97  };
98 }
99 
100 
101 sub do_not_update_columns {
102  return ['when_submitted'];
103 }
104 
105 
106 
107 sub object_class {
108  return 'Bio::EnsEMBL::Hive::Worker';
109 }
110 
111 
112 ############################
113 #
114 # PUBLIC API
115 #
116 ############################
117 
118 
119 =head2 create_new_worker
120 
121  Description: Creates an entry in the worker table,
122  populates some non-storable attributes
123  and returns a Worker object based on that insert.
124  This guarantees that each worker registered in this Queen's hive is properly registered.
125  Returntype : Bio::EnsEMBL::Hive::Worker
126  Caller : runWorker.pl
127 
128 =cut
129 
130 sub create_new_worker {
131  my $self = shift @_;
132  my %flags = @_;
133 
134  my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id,
135  $no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize,
136  $worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files)
137  = @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id
138  -no_write -debug -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize
139  -worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)};
140 
141  sleep( $worker_delay_startup_seconds // 0 ); # NB: undefined parameter would have caused eternal sleep!
142 
143  if( defined( $worker_crash_on_startup_prob ) ) {
144  if( rand(1) < $worker_crash_on_startup_prob ) {
145  die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)";
146  }
147  }
148 
149  my $default_config = Bio::EnsEMBL::Hive::Utils::Config->new(@$config_files);
150  my ($meadow, $process_id, $meadow_host, $meadow_user) = Bio::EnsEMBL::Hive::Valley->new( $default_config )->whereami();
151  die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user);
152  my $meadow_type = $meadow->type;
153  my $meadow_name = $meadow->cached_name;
154 
155  foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) {
156  # So far 'RELOCATED events' has been detected on LSF 9.0 in response to sending signal #99 or #100
157  # Since I don't know how to avoid them, I am trying to register them when they happen.
158  # The following snippet buries the previous incarnation of the Worker before starting a new one.
159  #
160  # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
161  # LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
162  # So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
163  $prev_worker_incarnation->cause_of_death( 'RELOCATED' );
164  $self->register_worker_death( $prev_worker_incarnation );
165  }
166 
167  my $worker;
168 
169  if($preregistered) {
170 
171  my $max_registration_seconds = $meadow->config_get('MaxRegistrationSeconds');
172  my $seconds_waited = 0;
173  my $seconds_more = 5; # step increment
174 
175  until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
176  my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
177  if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
178  my $msg = "Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
179  $log_message_adaptor->store_hive_message($msg, 'WORKER_ERROR' );
180  die $msg;
181  } else {
182  $log_message_adaptor->store_hive_message("Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...", 'WORKER_CAUTION' );
183  sleep($seconds_more);
184  $seconds_waited += $seconds_more;
185  }
186  }
187 
188  # only update the fields that were not available at the time of submission:
189  $worker->meadow_host( $meadow_host );
190  $worker->meadow_user( $meadow_user );
191  $worker->when_born( 'CURRENT_TIMESTAMP' );
192  $worker->status( 'READY' );
193 
194  $self->update( $worker );
195 
196  } else {
197  my $resource_class;
198 
199  if( defined($resource_class_name) ) {
200  $resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('name' => $resource_class_name)
201  or die "resource_class with name='$resource_class_name' could not be fetched from the database";
202  } elsif( defined($resource_class_id) ) {
203  $resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('dbID', $resource_class_id)
204  or die "resource_class with dbID='$resource_class_id' could not be fetched from the database";
205  }
206 
207  $worker = Bio::EnsEMBL::Hive::Worker->new(
208  'meadow_type' => $meadow_type,
209  'meadow_name' => $meadow_name,
210  'process_id' => $process_id,
211  'resource_class' => $resource_class,
212  'beekeeper_id' => $beekeeper_id,
213 
214  'meadow_host' => $meadow_host,
215  'meadow_user' => $meadow_user,
216  );
217 
218  if (ref($self)) {
219  $self->store( $worker );
220 
221  $worker->when_born( 'CURRENT_TIMESTAMP' );
222  $self->update_when_born( $worker );
223 
224  $self->refresh( $worker );
225  }
226  }
227 
228  $worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
229 
230  $worker->init;
231 
232  if(defined($job_limit)) {
233  $worker->job_limiter($job_limit);
234  $worker->life_span(0);
235  }
236 
237  $worker->life_span($life_span * 60) if($life_span); # $life_span min -> sec
238 
239  $worker->execute_writes(0) if($no_write);
240 
241  $worker->perform_cleanup(0) if($no_cleanup);
242 
243  $worker->debug($debug) if($debug);
244 
245  $worker->retry_throwing_jobs($retry_throwing_jobs) if(defined $retry_throwing_jobs);
246 
247  $worker->can_respecialize($can_respecialize) if(defined $can_respecialize);
248 
249  return $worker;
250 }
251 
252 
253 =head2 specialize_worker
254 
255  Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis.
256  If not specified the Queen will analyze the hive and pick the most suitable analysis.
258 
259 =cut
260 
261 sub specialize_worker {
262  my $self = shift @_;
263  my $worker = shift @_;
264  my $flags = shift @_;
265 
266  my ($analyses_pattern, $job_id, $force)
267  = @$flags{qw(-analyses_pattern -job_id -force)};
268 
269  if( $analyses_pattern and $job_id ) {
270  die "At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
271  }
272 
273  my $analysis;
274 
275  if( $job_id ) {
276 
277  $worker->worker_say("resetting and fetching job for job_id '$job_id'");
278 
279  my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
280 
281  my $job = $job_adaptor->fetch_by_dbID( $job_id )
282  or die "Could not fetch job with dbID='$job_id'";
283  my $job_status = $job->status();
284 
285  if($job_status =~/(CLAIMED|PRE_CLEANUP|FETCH_INPUT|RUN|WRITE_OUTPUT|POST_HEALTHCHECK|POST_CLEANUP)/ ) {
286  die "Job with dbID='$job_id' is already in progress, cannot run"; # FIXME: try GC first, then complain
287  } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
288  die "Job with dbID='$job_id' is $job_status, please use --force to override";
289  }
290 
291  $analysis = $job->analysis;
292  if(($analysis->stats->status eq 'BLOCKED') and !$force) {
293  die "Analysis is BLOCKED, can't specialize a worker. Please use --force to override";
294  }
295 
296  if(($job_status eq 'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
297  $worker->worker_say("Increasing the semaphore count of the dependent job");
298  $controlled_semaphore->increase_by( [ $job ] );
299  }
300 
301  my %status2counter = ('FAILED' => 'failed_job_count', 'READY' => 'ready_job_count', 'DONE' => 'done_job_count', 'PASSED_ON' => 'done_job_count', 'SEMAPHORED' => 'semaphored_job_count');
302  $analysis->stats->adaptor->increment_a_counter( $status2counter{$job->status}, -1, $job->analysis_id );
303 
304  } else {
305 
306  $analyses_pattern //= '%'; # for printing
307  my $analyses_matching_pattern = $worker->hive_pipeline->collection_of( 'Analysis' )->find_all_by_pattern( $analyses_pattern );
308 
309  # refresh the stats of matching analyses before re-specialization:
310  foreach my $analysis ( @$analyses_matching_pattern ) {
311  $analysis->stats->refresh();
312  }
313  $self->db->hive_pipeline->invalidate_hive_current_load;
314 
315  $analysis = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_matching_pattern, $analyses_pattern);
316 
317  unless( ref($analysis) ) {
318 
319  $worker->cause_of_death('NO_ROLE');
320 
321  my $msg = $analysis // "No analysis suitable for the worker was found";
322  die "$msg\n";
323  }
324  }
325 
326  my $new_role = Bio::EnsEMBL::Hive::Role->new(
327  'worker' => $worker,
328  'analysis' => $analysis,
329  );
330  $self->db->get_RoleAdaptor->store( $new_role );
331  $worker->current_role( $new_role );
332 
333  my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
334 
335  if($job_id) {
336  my $role_id = $new_role->dbID;
337  if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
338 
339  $worker->special_batch( [ $job ] );
340  } else {
341  die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
342  }
343 
344  } else { # Note: special batch Workers should avoid flipping the status to 'WORKING' in case the analysis is still 'BLOCKED'
345 
346  $analysis_stats_adaptor->update_status($analysis->dbID, 'WORKING');
347  }
348 
349  # The following increment used to be done only when no specific task was given to the worker,
350  # thereby excluding such "special task" workers from being counted in num_running_workers.
351  #
352  # However this may be tricky to emulate by triggers that know nothing about "special tasks",
353  # so I am (temporarily?) simplifying the accounting algorithm.
354  #
355  $analysis_stats_adaptor->increment_a_counter( 'num_running_workers', 1, $analysis->dbID );
356 }
357 
358 
359 sub register_worker_death {
360  my ($self, $worker, $update_when_checked_in) = @_;
361 
362  my $worker_id = $worker->dbID;
363  my $work_done = $worker->work_done;
364  my $cause_of_death = $worker->cause_of_death || 'UNKNOWN'; # make sure we do not attempt to insert a void
365  my $worker_died = $worker->when_died;
366 
367  my $current_role = $worker->current_role;
368 
369  unless( $current_role ) {
370  $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
371  }
372 
373  if( $current_role and !$current_role->when_finished() ) {
374  # List of cause_of_death:
375  # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG'
376  # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN'
377  my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN|SEE_EXIT_STATUS)$/);
378  $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
379  $current_role->when_finished( $worker_died );
380  $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
381  }
382 
383  my $sql = "UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'"
384  . ( $update_when_checked_in ? ', when_checked_in=CURRENT_TIMESTAMP ' : '' )
385  . ( $worker_died ? ", when_died='$worker_died'" : ', when_died=CURRENT_TIMESTAMP' )
386  . " WHERE worker_id='$worker_id' ";
387 
388  $self->dbc->protected_prepare_execute( [ $sql ],
389  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "register_worker_death".$after, 'INFO' ); }
390  );
391 }
392 
393 
394 sub cached_resource_mapping {
395  my $self = shift;
396  $self->{'_cached_resource_mapping'} ||= { map { $_->dbID => $_->name } $self->db->hive_pipeline->collection_of('ResourceClass')->list };
397  return $self->{'_cached_resource_mapping'};
398 }
399 
400 
401 sub registered_workers_attributes {
402  my $self = shift @_;
403 
404  return $self->fetch_all("status!='DEAD'", 1, ['meadow_type', 'meadow_name', 'meadow_user', 'process_id'], 'status' );
405 }
406 
407 
408 sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
409  my ($self, $meadow_user) = @_;
410 
411  my $worker_counts_by_meadow_type_rc_id = $self->count_all("status='SUBMITTED' AND meadow_user='$meadow_user'", ['meadow_type', 'resource_class_id'] );
412  my $cached_resource_mapping = $self->cached_resource_mapping;
413 
414  my %counts_by_meadow_type_rc_name = ();
415 
416  while(my ($meadow_type, $counts_by_rc_id) = each %$worker_counts_by_meadow_type_rc_id) {
417  while(my ($rc_id, $count) = each %$counts_by_rc_id) {
418  my $rc_name = $cached_resource_mapping->{ $rc_id } || '__undefined_rc_name__';
419  $counts_by_meadow_type_rc_name{ $meadow_type }{ $rc_name } = $count;
420  }
421  }
422 
423  return \%counts_by_meadow_type_rc_name;
424 }
425 
426 
427 sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
428  my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
429 
430  my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
431 
432  print "GarbageCollector:\tChecking for lost Workers...\n";
433 
434  # all non-DEAD workers found in the database, with their meadow status
435  my $reconciled_worker_statuses = $valley->query_worker_statuses( $self->registered_workers_attributes );
436  # selects the workers available in this valley. does not query the database / meadow
437  my $signature_and_pid_to_worker_status = $valley->status_of_all_our_workers_by_meadow_signature( $reconciled_worker_statuses );
438  # this may pick up workers that have been created since the last fetch
439  my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
440 
441  if (@$queen_overdue_workers) {
442  print "GarbageCollector:\tOut of the ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n";
443  } else {
444  print "GarbageCollector:\tfound none (all have checked in during the last $last_few_seconds seconds)\n";
445  }
446 
447  my $this_meadow_user = whoami();
448 
449  my %meadow_status_counts = ();
450  my %mt_and_pid_to_lost_worker = ();
451  foreach my $worker (@$queen_overdue_workers) {
452 
453  my $meadow_signature = $worker->meadow_type.'/'.$worker->meadow_name;
454  if(my $pid_to_worker_status = $signature_and_pid_to_worker_status->{$meadow_signature}) { # the whole Meadow subhash is either present or the Meadow is unreachable
455 
456  my $meadow_type = $worker->meadow_type;
457  my $process_id = $worker->process_id;
458  my $status = $pid_to_worker_status->{$process_id} // 'DEFERRED_CHECK'; # Workers that have been created between registered_workers_attributes and fetch_overdue_workers
459 
460  if($bury_unkwn_workers and ($status eq 'UNKWN')) {
461  if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
462  if($meadow->can('kill_worker')) {
463  if($worker->meadow_user eq $this_meadow_user) { # if I'm actually allowed to kill the worker...
464  print "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id";
465 
466  $meadow->kill_worker($worker, 1);
467  $status = 'LOST';
468  }
469  }
470  }
471  }
472 
473  $meadow_status_counts{$meadow_signature}{$status}++;
474 
475  if(($status eq 'LOST') or ($status eq 'SUBMITTED')) {
476 
477  $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
478 
479  } elsif ($status eq 'DEFERRED_CHECK') {
480 
481  # do nothing now, wait until the next pass to check on this worker
482 
483  } else {
484 
485  # RUN|PEND|xSUSP handling
486  my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id='".$worker->dbID."'";
487  $self->dbc->protected_prepare_execute( [ $update_when_seen_sql ],
488  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "see_worker".$after, 'INFO' ); }
489  );
490  }
491  } else {
492  $meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
493  }
494  }
495 
496  # print a quick summary report:
497  while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
498  print "GarbageCollector:\t[$meadow_signature Meadow:]\t".join(', ', map { "$_:$status_count->{$_}" } keys %$status_count )."\n\n";
499  }
500 
501  while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
502  my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
503 
504  if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
505  print "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";
506 
507  my $report_entries;
508 
509  if($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
510  my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
511  print "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n";
512  }
513 
514  print "GarbageCollector:\tRecording workers' missing attributes, registering their death, releasing their jobs and cleaning up temp directories\n";
515  while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
516  if(my $report_entry = $report_entries && $report_entries->{$process_id}) {
517  my @updated_attribs = ();
518  foreach my $worker_attrib ( qw(when_born meadow_host when_died cause_of_death) ) {
519  if( defined( $report_entry->{$worker_attrib} ) ) {
520  $worker->$worker_attrib( $report_entry->{$worker_attrib} );
521  push @updated_attribs, $worker_attrib;
522  }
523  }
524  $self->update( $worker, @updated_attribs ) if(scalar(@updated_attribs));
525  }
526 
527  my $max_limbo_seconds = $this_meadow->config_get('MaxLimboSeconds') // 0; # The maximum time for a Meadow to start showing the Worker (even in PEND state) after submission.
528  # We use it as a timeout for burying SUBMITTED and Meadow-invisible entries in the 'worker' table.
529 
530  if( ($worker->status ne 'SUBMITTED')
531  || $worker->when_died # reported by Meadow as DEAD (only if Meadow supports get_report_entries_for_process_ids)
532  || ($worker->seconds_since_when_submitted > $max_limbo_seconds) ) { # SUBMITTED and Meadow-invisible for too long => we consider them LOST
533 
534  $worker->cause_of_death('LIMBO') if( ($worker->status eq 'SUBMITTED') and !$worker->cause_of_death); # LIMBO cause_of_death means: found in SUBMITTED state, exceeded the timeout, Meadow did not tell us more
535 
536  $self->register_worker_death( $worker );
537 
538  if( ($worker->status ne 'SUBMITTED') # There is no worker_temp_directory before specialization
539  and ($worker->meadow_user eq $this_meadow_user) ) { # if I'm actually allowed to kill the worker...
540  $valley->cleanup_left_temp_directory( $worker );
541  }
542  }
543  }
544 
545  if( $report_entries && %$report_entries ) { # use the opportunity to also store resource usage of the buried workers:
546  my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
547  $self->store_resource_usage( $report_entries, $processid_2_workerid );
548  }
549  }
550  }
551 
552  # the following bit is completely Meadow-agnostic and only restores database integrity:
553  if($check_buried_in_haste) {
554  my $role_adaptor = $self->db->get_RoleAdaptor;
555  my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
556 
557  print "GarbageCollector:\tChecking for orphan roles...\n";
558  my $orphan_roles = $role_adaptor->fetch_all_unfinished_roles_of_dead_workers();
559  if(my $orphan_role_number = scalar @$orphan_roles) {
560  print "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
561  foreach my $orphan_role (@$orphan_roles) {
562  $role_adaptor->finalize_role( $orphan_role );
563  }
564  } else {
565  print "GarbageCollector:\tfound none\n";
566  }
567 
568  print "GarbageCollector:\tChecking for roles buried in haste...\n";
569  my $buried_in_haste_roles = $role_adaptor->fetch_all_finished_roles_with_unfinished_jobs();
570  if(my $bih_number = scalar @$buried_in_haste_roles) {
571  print "GarbageCollector:\tfound $bih_number buried roles with unfinished jobs, reclaiming.\n\n";
572  foreach my $role (@$buried_in_haste_roles) {
573  $job_adaptor->release_undone_jobs_from_role( $role );
574  }
575  } else {
576  print "GarbageCollector:\tfound none\n";
577  }
578 
579  print "GarbageCollector:\tChecking for orphan jobs...\n";
580  my $orphan_jobs = $job_adaptor->fetch_all_unfinished_jobs_with_no_roles();
581  if(my $sj_number = scalar @$orphan_jobs) {
582  print "GarbageCollector:\tfound $sj_number unfinished jobs with no roles, reclaiming.\n\n";
583  foreach my $job (@$orphan_jobs) {
584  $job_adaptor->release_and_age_job($job->dbID, $job->analysis->max_retry_count, 1);
585  }
586  } else {
587  print "GarbageCollector:\tfound none\n";
588  }
589  }
590 }
591 
592 
593  # To tackle the RELOCATED event: this method checks whether there are already workers with these attributes
594 sub find_previous_worker_incarnations {
595  my ($self, $meadow_type, $meadow_name, $process_id) = @_;
596 
597  # This happens in standalone mode, when there is no database
598  return [] unless ref($self);
599 
600  return $self->fetch_all( "status!='DEAD' AND status!='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" );
601 }
602 
603 
604 sub fetch_preregistered_worker {
605  my ($self, $meadow_type, $meadow_name, $process_id) = @_;
606 
607  # This happens in standalone mode, when there is no database
608  return [] unless ref($self);
609 
610  my ($worker) = @{ $self->fetch_all( "status='SUBMITTED' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" ) };
611 
612  return $worker;
613 }
614 
615 
616  # a new version that both checks in and updates the status
617 sub check_in_worker {
618  my ($self, $worker) = @_;
619 
620  my $sql = "UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status='".$worker->status."', work_done='".$worker->work_done."' WHERE worker_id='".$worker->dbID."'";
621 
622  $self->dbc->protected_prepare_execute( [ $sql ],
623  sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "check_in_worker".$after, 'INFO' ); }
624  );
625 }
626 
627 
628 =head2 reset_job_by_dbID_and_sync
629 
630  Arg [1]: int $job_id
631  Example:
632  my $job = $queen->reset_job_by_dbID_and_sync($job_id);
633  Description:
634  For the specified job_id it will fetch just that job,
635  reset it completely as if it has never run, and return it.
636  Specifying a specific job bypasses the safety checks,
637  thus multiple workers could be running the
638  same job simultaneously (use only for debugging).
639  Returntype : none
640  Exceptions :
641  Caller : beekeeper.pl
642 
643 =cut
644 
645 sub reset_job_by_dbID_and_sync {
646  my ($self, $job_id) = @_;
647 
648  my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id);
649 
650  my $stats = $job->analysis->stats;
651 
652  $self->synchronize_AnalysisStats($stats);
653 }
654 
655 
656 ######################################
657 #
658 # Public API interface for beekeeper
659 #
660 ######################################
661 
662 
663  # Note: asking for Queen->fetch_overdue_workers(0) essentially means
664  # "fetch all workers known to the Queen not to be officially dead"
665  #
666 sub fetch_overdue_workers {
667  my ($self,$overdue_secs) = @_;
668 
669  $overdue_secs = 3600 unless(defined($overdue_secs));
670 
671  my $constraint = "status!='DEAD' AND (when_checked_in IS NULL OR ".{
672  'mysql' => "(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(when_checked_in)) > $overdue_secs",
673  'sqlite' => "(strftime('%s','now')-strftime('%s',when_checked_in)) > $overdue_secs",
674  'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - when_checked_in) > $overdue_secs",
675  }->{ $self->dbc->driver }.' )';
676 
677  return $self->fetch_all( $constraint );
678 }
679 
680 
681 =head2 synchronize_hive
682 
683  Arg [1] : $list_of_analyses
684  Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
685  Description: Runs through all analyses in the given list and synchronizes
686  the analysis_stats summary with the states in the job and worker tables.
687  Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
688  Exceptions : none
689  Caller : general
690 
691 =cut
692 
693 sub synchronize_hive {
694  my ($self, $list_of_analyses) = @_;
695 
696  my $start_time = time();
697 
698  print "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
699  foreach my $analysis (@$list_of_analyses) {
700  $self->synchronize_AnalysisStats($analysis->stats);
701  print ( ($analysis->stats()->status eq 'BLOCKED') ? 'x' : 'o');
702  }
703  print "\n";
704 
705  print ''.((time() - $start_time))." seconds to synchronize_hive\n\n";
706 }
707 
708 
709 =head2 safe_synchronize_AnalysisStats
710 
711  Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
712  Example : $self->safe_synchronize_AnalysisStats($stats);
713  Description: Prewrapper around synchronize_AnalysisStats that does
714  checks and grabs sync_lock before proceeding with sync.
715  Used by distributed worker sync system to avoid contention.
716  Returns 1 on success and 0 if the lock could not have been obtained,
717  and so no sync was attempted.
718  Returntype : boolean
719  Caller : general
720 
721 =cut
722 
723 sub safe_synchronize_AnalysisStats {
724  my ($self, $stats) = @_;
725 
726  $stats->refresh();
727  my $was_synching = $stats->sync_lock;
728 
729  my $max_refresh_attempts = 5;
730  while($stats->sync_lock and $max_refresh_attempts--) { # another Worker/Beekeeper is synching this analysis right now
731  # ToDo: it would be nice to report the detected collision
732  sleep(1);
733  $stats->refresh(); # just try to avoid collision
734  }
735 
736  # The sync has just completed and we have the freshest stats
737  if ($was_synching && !$stats->sync_lock) {
738  return 'sync_done_by_friend';
739  }
740 
741  unless( ($stats->status eq 'DONE')
742  or ( ($stats->status eq 'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) {
743 
744  # In case $stats->sync_lock is set, this is basically giving it one last chance
745  my $sql = "UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
746  "WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
747 
748  my $row_count = $self->dbc->do($sql); # try to claim the sync_lock
749 
750  if( $row_count == 1 ) { # if we managed to obtain the lock, let's go and perform the sync:
751  if ($stats->sync_lock) {
752  # Actually the sync has just been completed by another agent. Save time and load the stats it computed
753  $stats->refresh();
754  # And release the lock
755  $stats->sync_lock(0);
756  $stats->adaptor->update_sync_lock($stats);
757  return 'sync_done_by_friend';
758  }
759  $self->synchronize_AnalysisStats($stats, 1);
760  return 'sync_done';
761  } else {
762  # otherwise assume it's locked and just return un-updated
763  return 0;
764  }
765  }
766 
767  return $stats->sync_lock ? 0 : 'stats_fresh_enough';
768 }
769 
770 
771 =head2 synchronize_AnalysisStats
772 
773  Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
774  Example : $self->synchronize_AnalysisStats( $stats );
775  Description: Queries the job and worker tables to get summary counts
776  and rebuilds the AnalysisStats object.
777  Then updates the analysis_stats table with the new summary info.
778  Exceptions : none
779  Caller : general
780 
781 =cut
782 
783 sub synchronize_AnalysisStats {
784  my ($self, $stats, $has_refresh_just_been_done) = @_;
785 
786  if( $stats and $stats->analysis_id ) {
787 
788  $stats->refresh() unless $has_refresh_just_been_done;
789 
790  my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
791 
792  $stats->recalculate_from_job_counts( $job_counts );
793 
794  # $stats->sync_lock(0); ## do we perhaps need it here?
795  $stats->update; #update and release sync_lock
796  }
797 }
798 
799 
800 =head2 check_nothing_to_run_but_semaphored
801 
802  Arg [1] : $list_of_analyses
803  Example : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] );
804  Description: Counts the number of immediately runnable jobs in the given analyses.
805  Exceptions : none
806  Caller : Scheduler
807 
808 =cut
809 
810 sub check_nothing_to_run_but_semaphored { # make sure it is run after a recent sync
811  my ($self, $list_of_analyses) = @_;
812 
813  my $only_semaphored_jobs_to_run = 1;
814  my $total_semaphored_job_count = 0;
815 
816  foreach my $analysis (@$list_of_analyses) {
817  my $stats = $analysis->stats;
818 
819  $only_semaphored_jobs_to_run = 0 if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count );
820  $total_semaphored_job_count += $stats->semaphored_job_count;
821  }
822 
823  return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run );
824 }
825 
826 
827 =head2 print_status_and_return_reasons_to_exit
828 
829  Arg [1] : $list_of_analyses
830  Arg [2] : $debug
831  Example : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] );
832  : foreach my $reason_to_exit (@$reasons_to_exit) {
833  : my $exit_message = $reason_to_exit->{'message'};
834  : my $exit_status = $reason_to_exit->{'exit_status'};
835  Description: Runs through all analyses in the given list, reports failed analyses, and computes some totals.
836  : It returns a list of exit messages and status codes. Each element of the list is a hashref,
837  : with the exit message keyed by 'message' and the status code keyed by 'exit_status'
838  :
839  : Possible status codes are:
840  : 'JOB_FAILED'
841  : 'ANALYSIS_FAILED'
842  : 'NO_WORK'
843  :
844  : If $debug is set, the list will contain all analyses. Otherwise, empty and done analyses
845  : will not be listed
846  Exceptions : none
847  Caller : beekeeper.pl
848 
849 =cut
850 
851 sub print_status_and_return_reasons_to_exit {
852  my ($self, $list_of_analyses, $debug) = @_;
853 
854  my ($total_done_jobs, $total_failed_jobs, $total_jobs, $total_excluded_jobs, $cpumsec_to_do) = (0) x 5;
855  my %skipped_analyses = ('EMPTY' => [], 'DONE' => []);
856  my @analyses_to_display;
857  my @reasons_to_exit;
858 
859  foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
860  my $stats = $analysis->stats;
861  my $failed_job_count = $stats->failed_job_count;
862  my $is_excluded = $stats->is_excluded;
863 
864  if ($debug or !$skipped_analyses{$stats->status}) {
865  push @analyses_to_display, $analysis;
866  } else {
867  push @{$skipped_analyses{$stats->status}}, $analysis;
868  }
869 
870  if ($failed_job_count > 0) {
871  $self->synchronize_AnalysisStats($stats);
872  $stats->determine_status();
873  my $exit_status;
874  my $failure_message;
875  my $logic_name = $analysis->logic_name;
876  my $tolerance = $analysis->failed_job_tolerance;
877  if( $stats->status eq 'FAILED') {
878  $exit_status = 'ANALYSIS_FAILED';
879  $failure_message = "### Analysis '$logic_name' has FAILED (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
880  } else {
881  $exit_status = 'JOB_FAILED';
882  $failure_message = "### Analysis '$logic_name' has failed jobs (failed jobs: $failed_job_count, tolerance: $tolerance\%) ###";
883  }
884  push (@reasons_to_exit, {'message' => $failure_message,
885  'exit_status' => $exit_status});
886  }
887 
888  if ($is_excluded) {
889  my $excluded_job_count = $stats->total_job_count - $stats->done_job_count - $failed_job_count;
890  $total_excluded_jobs += $excluded_job_count;
891  push @{$skipped_analyses{'EXCLUDED'}}, $analysis;
892  }
893  $total_done_jobs += $stats->done_job_count;
894  $total_failed_jobs += $failed_job_count;
895  $total_jobs += $stats->total_job_count;
896  $cpumsec_to_do += $stats->ready_job_count * $stats->avg_msec_per_job;
897  }
898 
899  my $total_jobs_to_do = $total_jobs - $total_done_jobs - $total_failed_jobs - $total_excluded_jobs; # includes SEMAPHORED, READY, CLAIMED, INPROGRESS
900  my $cpuhrs_to_do = $cpumsec_to_do / (1000.0*60*60);
901  my $percentage_completed = $total_jobs
902  ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs)
903  : 0.0;
904 
905  my $max_logic_name_length = max(map {length($_->logic_name)} @analyses_to_display);
906  foreach my $analysis (@analyses_to_display) {
907  print $analysis->stats->toString($max_logic_name_length) . "\n";
908  }
909  print "\n";
910  if (@{$skipped_analyses{'EMPTY'}}) {
911  printf("%d analyses not shown because they don't have any jobs.\n", scalar(@{$skipped_analyses{'EMPTY'}}));
912  }
913  if (@{$skipped_analyses{'DONE'}}) {
914  printf("%d analyses not shown because all their jobs are done.\n", scalar(@{$skipped_analyses{'DONE'}}));
915  }
916  printf("total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed + %d excluded = %d total)\n",
917  scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_excluded_jobs, $total_jobs);
918 
919  unless( $total_jobs_to_do ) {
920  if ($total_excluded_jobs > 0) {
921  push (@reasons_to_exit, {'message' => "### Some analyses are excluded ###",
922  'exit_status' => 'NO_WORK'});
923  }
924  push (@reasons_to_exit, {'message' => "### No jobs left to do ###",
925  'exit_status' => 'NO_WORK'});
926  }
927 
928  return \@reasons_to_exit;
929 }
930 
931 
932 =head2 register_all_workers_dead
933 
934  Example : $queen->register_all_workers_dead();
935  Description: Registers all workers dead
936  Exceptions : none
937  Caller : beekeeper.pl
938 
939 =cut
940 
941 sub register_all_workers_dead {
942  my $self = shift;
943 
944  my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD'" );
945  foreach my $worker (@{$all_workers_considered_alive}) {
946  $self->register_worker_death( $worker );
947  }
948 }
949 
950 
951 sub interval_workers_with_unknown_usage {
952  my $self = shift @_;
953 
954  my %meadow_to_interval = ();
955 
956  my $sql_times = qq{
957  SELECT meadow_type, meadow_name, MIN(when_submitted), IFNULL(max(when_died), MAX(when_submitted)), COUNT(*)
958  FROM worker w
959  LEFT JOIN worker_resource_usage u USING(worker_id)
960  WHERE u.worker_id IS NULL
961  GROUP BY meadow_type, meadow_name
962  };
963  my $sth_times = $self->prepare( $sql_times );
964  $sth_times->execute();
965  while( my ($meadow_type, $meadow_name, $min_submitted, $max_died, $workers_count) = $sth_times->fetchrow_array() ) {
966  $meadow_to_interval{$meadow_type}{$meadow_name} = {
967  'min_submitted' => $min_submitted,
968  'max_died' => $max_died,
969  'workers_count' => $workers_count,
970  };
971  }
972  $sth_times->finish();
973 
974  return \%meadow_to_interval;
975 }
976 
977 
978 sub store_resource_usage {
979  my ($self, $report_entries, $processid_2_workerid) = @_;
980 
981  # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
982 
983  my $sql_delete = 'DELETE FROM worker_resource_usage WHERE worker_id=?';
984  my $sth_delete = $self->prepare( $sql_delete );
985 
986  my $sql_insert = 'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)';
987  my $sth_insert = $self->prepare( $sql_insert );
988 
989  my @not_ours = ();
990 
991  while( my ($process_id, $report_entry) = each %$report_entries ) {
992 
993  if( my $worker_id = $processid_2_workerid->{$process_id} ) {
994  $sth_delete->execute( $worker_id );
995 
996  eval {
997  $sth_insert->execute( $worker_id, @$report_entry{'exit_status', 'mem_megs', 'swap_megs', 'pending_sec', 'cpu_sec', 'lifespan_sec', 'exception_status'} ); # slicing hashref
998  1;
999  } or do {
1000  if($@ =~ /execute failed: Duplicate entry/s) { # ignore the collision with another parallel beekeeper
1001  $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id, "Collision detected when storing resource_usage", 'WORKER_CAUTION' );
1002  } else {
1003  die $@;
1004  }
1005  };
1006  } else {
1007  push @not_ours, $process_id;
1008  #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
1009  }
1010  }
1011  $sth_delete->finish();
1012  $sth_insert->finish();
1013 }
1014 
1015 
1016 1;
public suggest_analysis_to_specialize_a_worker()
public Bio::EnsEMBL::Hive::Storable new()