ensembl-hive  2.6
beekeeper.pl
Go to the documentation of this file.
1 #!/usr/bin/env perl
2 
3 use strict;
4 use warnings;
5 
6  # Finding out own path in order to reference own components (including own modules):
7 use Cwd ();
8 use File::Basename ();
9 BEGIN {
10  $ENV{'EHIVE_ROOT_DIR'} ||= File::Basename::dirname( File::Basename::dirname( Cwd::realpath($0) ) );
11  unshift @INC, $ENV{'EHIVE_ROOT_DIR'}.'/modules';
12 }
13 
14 use File::Path 'make_path';
15 use Getopt::Long qw(:config no_auto_abbrev);
16 use Pod::Usage;
17 
18 use Bio::EnsEMBL::Hive::DBSQL::LogMessageAdaptor ('store_beekeeper_message');
19 use Bio::EnsEMBL::Hive::Utils ('destringify');
20 use Bio::EnsEMBL::Hive::Utils::Slack ('send_beekeeper_message_to_slack');
23 use Bio::EnsEMBL::Hive::Version ('report_versions');
28 
30 
31 
32 main();
33 
34 
35 sub main {
36  $|=1; # make STDOUT unbuffered (STDERR is unbuffered anyway)
37 
38  # ok this is a hack, but I'm going to pretend I've got an object here
39  # by creating a hash ref and passing it around like an object
40  # this is to avoid using global variables in functions, and to consolidate
41  # the globals into a nice '$self' package
42  my $self = {};
43 
44  my $help = 0;
45  my $report_versions = 0;
46  my $loopit = 0;
47  my $sync = 0;
48  my $local = 0;
49  my $show_failed_jobs = 0;
50  my $default_meadow_type = undef;
51  my $submit_workers_max = undef;
52  my $total_running_workers_max = undef;
53  my $submission_options = undef;
54  my $run = 0;
55  my $run_job_id = undef;
56  my $force = undef;
57  my $check_for_dead = 0;
58  my $bury_unkwn_workers = 0;
59  my $all_dead = 0;
60  my $balance_semaphores = 0;
61  my $job_id_for_output = 0;
62  my $show_worker_stats = 0;
63  my $kill_worker_id = 0;
64  my $big_red_button = 0;
65  my $keep_alive = 0; # DEPRECATED
66  my $reset_job_id = 0;
67  my $reset_all_jobs_for_analysis = 0; # DEPRECATED
68  my $reset_failed_jobs_for_analysis = 0; # DEPRECATED
69  my $reset_all_jobs = 0; # Mark DONE, PASSED_ON and FAILED jobs to READY
70  my $reset_failed_jobs = 0; # Mark FAILED jobs to READY
71  my $reset_done_jobs = 0; # Mark DONE and PASSED_ON jobs to READY
72  my $unblock_semaphored_jobs = 0; # Mark SEMAPHORED jobs to READY
73  my $forgive_failed_jobs = 0; # Mark FAILED jobs to DONE
74  my $discard_ready_jobs = 0; # Mark READY jobs to DONE
75 
76  $self->{'url'} = undef;
77  $self->{'reg_conf'} = undef;
78  $self->{'reg_type'} = undef;
79  $self->{'reg_alias'} = undef;
80  $self->{'nosqlvc'} = undef;
81 
82  $self->{'config_files'} = [];
83 
84  $self->{'sleep_minutes'} = 1;
85  $self->{'max_loops'} = 0;
86  $self->{'retry_throwing_jobs'} = undef;
87  $self->{'loop_until'} = undef;
88  $self->{'can_respecialize'} = 1;
89  $self->{'hive_log_dir'} = undef;
90  $self->{'submit_log_dir'} = undef;
91  $self->{'worker_delay_startup_seconds'} = undef;
92  $self->{'worker_crash_on_startup_prob'} = undef;
93 
94  # store all the options passed on the command line for registration
95  # as GetOptions modifies @ARGV
96  my @original_argv = @ARGV;
97 
98  GetOptions(
99  # connection parameters
100  'url=s' => \$self->{'url'},
101  'reg_conf|regfile|reg_file=s' => \$self->{'reg_conf'},
102  'reg_type=s' => \$self->{'reg_type'},
103  'reg_alias|regname|reg_name=s' => \$self->{'reg_alias'},
104  'nosqlvc' => \$self->{'nosqlvc'}, # using "nosqlvc" instead of "sqlvc!" for compatibility with modules where it is a propagated option
105 
106  # json config files
107  'config_file=s@' => $self->{'config_files'},
108 
109  # loop control
110  'run' => \$run,
111  'loop' => \$loopit,
112  'max_loops=i' => \$self->{'max_loops'},
113  'loop_until=s' => \$self->{'loop_until'},
114  'keep_alive' => \$keep_alive,
115  'job_id|run_job_id=i'=> \$run_job_id,
116  'force!' => \$self->{'force'},
117  'sleep=f' => \$self->{'sleep_minutes'},
118 
119  # meadow control
120  'local!' => \$local,
121  'meadow_type=s' => \$default_meadow_type,
122  'total_running_workers_max=i' => \$total_running_workers_max,
123  'submit_workers_max=i' => \$submit_workers_max,
124  'submission_options=s' => \$submission_options,
125 
126  # worker control
127  'job_limit=i' => \$self->{'job_limit'},
128  'life_span|lifespan=i' => \$self->{'life_span'},
129  'logic_name=s' => \$self->{'logic_name'},
130  'analyses_pattern=s' => \$self->{'analyses_pattern'},
131  'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
132  'retry_throwing_jobs!' => \$self->{'retry_throwing_jobs'},
133  'can_respecialize|can_respecialise!' => \$self->{'can_respecialize'},
134  'debug=i' => \$self->{'debug'},
135  'submit_log_dir=s' => \$self->{'submit_log_dir'},
136  'worker_delay_startup_seconds=i' => \$self->{'worker_delay_startup_seconds'},
137  'worker_crash_on_startup_prob=f' => \$self->{'worker_crash_on_startup_prob'},
138 
139  # other commands/options
140  'h|help!' => \$help,
141  'v|version|versions!' => \$report_versions,
142  'sync!' => \$sync,
143  'dead!' => \$check_for_dead,
144  'unkwn!' => \$bury_unkwn_workers,
145  'killworker=i' => \$kill_worker_id,
146  'big_red_button' => \$big_red_button,
147  'alldead!' => \$all_dead,
148  'balance_semaphores'=> \$balance_semaphores,
149  'worker_stats' => \$show_worker_stats,
150  'failed_jobs' => \$show_failed_jobs,
151  'reset_job_id=i' => \$reset_job_id,
152  'reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
153  'reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
154  'reset_failed_jobs' => \$reset_failed_jobs,
155  'reset_all_jobs' => \$reset_all_jobs,
156  'reset_done_jobs' => \$reset_done_jobs,
157  'discard_ready_jobs' => \$discard_ready_jobs,
158  'forgive_failed_jobs' => \$forgive_failed_jobs,
159  'unblock_semaphored_jobs' => \$unblock_semaphored_jobs,
160  'job_output=i' => \$job_id_for_output,
161  ) or die "Error in command line arguments\n";
162 
163  if (@ARGV) {
164  die "ERROR: There are invalid arguments on the command-line: ". join(" ", @ARGV). "\n";
165  }
166 
167  if ($help) {
168  pod2usage({-exitvalue => 0, -verbose => 2});
169  }
170 
171  if($report_versions) {
172  report_versions();
173  exit(0);
174  }
175 
176  my $config = Bio::EnsEMBL::Hive::Utils::Config->new(@{$self->{'config_files'}});
177 
178  # if -keep_alive passed, ensure looping is on and loop_until is forever
179  if ($keep_alive) {
180  $self->{'loop_until'} = 'FOREVER';
181  $loopit = 1;
182  }
183 
184  # if user has specified -loop_until, ensure looping is turned on
185  if ($self->{'loop_until'}) {
186  $loopit = 1;
187  }
188 
189  # if loop_until hasn't been set by the user, or defaulted by a flag,
190  # set it to ANALYSIS_FAILURE
191  unless ($self->{'loop_until'}) {
192  $self->{'loop_until'} = 'ANALYSIS_FAILURE';
193  }
194 
195  if($run or $run_job_id) {
196  $self->{'max_loops'} = 1;
197  } elsif ($loopit) {
198  unless($self->{'max_loops'}) {
199  $self->{'max_loops'} = -1; # unlimited
200  }
201  }
202 
203  if($self->{'url'} or $self->{'reg_alias'}) {
204 
205  $self->{'pipeline'} = Bio::EnsEMBL::Hive::HivePipeline->new(
206  -url => $self->{'url'},
207  -reg_conf => $self->{'reg_conf'},
208  -reg_type => $self->{'reg_type'},
209  -reg_alias => $self->{'reg_alias'},
210  -no_sql_schema_version_check => $self->{'nosqlvc'},
211  );
212 
213  $self->{'dba'} = $self->{'pipeline'}->hive_dba();
214 
215  } else {
216  die "\nERROR: Connection parameters (url or reg_conf+reg_alias) need to be specified\n";
217  }
218 
219  $self->{'options'} = join(" ", @original_argv);
220 
221  # make -loop_until case insensitive
222  $self->{'loop_until'} = uc($self->{'loop_until'});
223 
224  my @allowed_loop_until_values = qw(ANALYSIS_FAILURE FOREVER JOB_FAILURE NO_WORK);
225  unless (grep {$_ eq $self->{'loop_until'}} @allowed_loop_until_values) {
226  die sprintf('"%s" is not a recognized value for -loop_until. Use one of %s', $self->{'loop_until'}, join('/', @allowed_loop_until_values));
227  }
228 
229  my $pipeline_name = $self->{'pipeline'}->hive_pipeline_name;
230 
231  if($pipeline_name) {
232  print "Pipeline name: $pipeline_name\n";
233  } else {
234  print STDERR "+---------------------------------------------------------------------+\n";
235  print STDERR "! !\n";
236  print STDERR "! WARNING: !\n";
237  print STDERR "! !\n";
238  print STDERR "! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
239  print STDERR "! This may seriously impair your beekeeping experience unless you are !\n";
240  print STDERR "! the only farm user. The name should be set in your PipeConfig file, !\n";
241  print STDERR "! or if you are running an old pipeline you can just set it by hand !\n";
242  print STDERR "! in the 'meta' table. !\n";
243  print STDERR "! !\n";
244  print STDERR "+---------------------------------------------------------------------+\n";
245  }
246 
247  unless ($self->{'dba'}->dbc->has_write_access) {
248  my $dbc = $self->{'dba'}->dbc;
249  print STDERR "\n";
250  print STDERR "*" x 70, "\n";
251  print STDERR sprintf("* It appears that %s doesn't have INSERT/UPDATE/DELETE privileges\n", $dbc->username);
252  print STDERR sprintf("* on this database (%s). Please check the credentials\n", $dbc->dbname);
253  print STDERR "*\n";
254  print STDERR "*" x 70, "\n";
255  print STDERR "\n";
256  undef $run_job_id;
257  undef $reset_job_id;
258  undef $reset_all_jobs;
259  undef $reset_failed_jobs;
260  undef $reset_done_jobs;
261  undef $unblock_semaphored_jobs;
262  undef $forgive_failed_jobs;
263  undef $discard_ready_jobs;
264  undef $kill_worker_id;
265  undef $sync;
266  $self->{'read_only'} = 1;
267  }
268 
269  if($run_job_id) {
270  $submit_workers_max = 1;
271  }
272 
273  $default_meadow_type = 'LOCAL' if($local);
274  my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
275  $self->{'available_meadow_list'} = $valley->get_available_meadow_list();
276 
277  $valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
278 
279  my $default_meadow = $valley->get_default_meadow();
280  print "Default meadow: ".$default_meadow->signature."\n\n";
281 
282  $default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
283  $default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
284 
285  my $queen = $self->{'dba'}->get_Queen;
286 
287  if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
288 
289  if($job_id_for_output) {
290  printf("===== Job output\n");
291  my $job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
292  print $job->toString. "\n";
293  }
294 
295  if($reset_all_jobs_for_analysis) {
296  die "Deprecated option -reset_all_jobs_for_analysis. Please use -reset_all_jobs in combination with -analyses_pattern <pattern>";
297  }
298  if($reset_failed_jobs_for_analysis) {
299  die "Deprecated option -reset_failed_jobs_for_analysis. Please use -reset_failed_jobs in combination with -analyses_pattern <pattern>";
300  }
301 
302  if( $self->{'logic_name'} ) { # FIXME: for now, logic_name will override analyses_pattern quietly
303  warn "-logic_name is now deprecated, please use -analyses_pattern that extends the functionality of -logic_name .\n";
304  $self->{'analyses_pattern'} = $self->{'logic_name'};
305  }
306 
307  # May die if running within a non-LOCAL meadow
308  unless ($self->{'read_only'}) {
309  $self->{'beekeeper'} = register_beekeeper($valley, $self);
310  }
311  $self->{'logmessage_adaptor'} = $self->{'dba'}->get_LogMessageAdaptor();
312 
313  # Check other beekeepers in our meadow to see if they are still alive
314  $self->{'beekeeper'}->adaptor->bury_other_beekeepers($self->{'beekeeper'}) unless $self->{'read_only'};
315 
316  if ($kill_worker_id) {
317  my $kill_worker;
318  eval {$kill_worker = $queen->fetch_by_dbID($kill_worker_id) or die};
319  if ($@) {
320  log_and_die($self, "Could not fetch Worker with dbID='$kill_worker_id' to kill");
321  }
322 
323  unless( $kill_worker->cause_of_death() ) {
324  if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
325 
326  if( $meadow->check_worker_is_alive_and_mine($kill_worker) ) {
327  printf("Killing worker: %10d %35s %15s : ",
328  $kill_worker->dbID, $kill_worker->meadow_host, $kill_worker->process_id);
329 
330  $meadow->kill_worker($kill_worker);
331  $kill_worker->cause_of_death('KILLED_BY_USER');
332  $queen->register_worker_death($kill_worker);
333  # what about clean-up? Should we do it here or not?
334  } else {
335  log_and_die($self, "According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
336  }
337  } else {
338  log_and_die($self, "Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill");
339  }
340  } else {
341  log_and_die($self, "According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
342  }
343  }
344 
345  if ( $big_red_button ) {
346  return big_red_button( $self, $valley );
347  }
348 
349  my $run_job;
350  if($run_job_id) {
351  eval {$run_job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id ) or die};
352  if ($@) {
353  log_and_die($self, "Could not fetch Job with dbID=$run_job_id.\n");
354  }
355  }
356 
357  my $list_of_analyses = $run_job
358  ? [ $run_job->analysis ]
359  : $self->{'pipeline'}->collection_of('Analysis')->find_all_by_pattern( $self->{'analyses_pattern'} );
360 
361  if( $self->{'analyses_pattern'} ) {
362  if( @$list_of_analyses ) {
363  print "Beekeeper : the following Analyses matched your -analyses_pattern '".$self->{'analyses_pattern'}."' : "
364  . join(', ', map { $_->logic_name.'('.$_->dbID.')' } sort {$a->dbID <=> $b->dbID} @$list_of_analyses)
365  . "\nBeekeeper : ", scalar($self->{'pipeline'}->collection_of('Analysis')->list())-scalar(@$list_of_analyses), " Analyses are not shown\n\n";
366  } else {
367  log_and_die($self, "Beekeeper : the -analyses_pattern '".$self->{'analyses_pattern'}."' did not match any Analyses.\n");
368  }
369  }
370 
371  my $has_task = ($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs || $unblock_semaphored_jobs || $forgive_failed_jobs || $discard_ready_jobs);
372  if($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs) {
373  if (($reset_all_jobs || $reset_done_jobs) and not $self->{'analyses_pattern'}) {
374  log_and_die($self, "Beekeeper : do you really want to reset *all* the Jobs ? If yes, add \"-analyses_pattern '%'\" to the command line\n");
375  }
376  my $statuses_to_reset = $reset_failed_jobs ? [ 'FAILED' ] : ($reset_done_jobs ? [ 'DONE', 'PASSED_ON' ] : [ 'DONE', 'FAILED', 'PASSED_ON' ]);
377  $self->{'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id( $list_of_analyses, $statuses_to_reset );
378  }
379 
380  if ($unblock_semaphored_jobs) {
381  $self->{'dba'}->get_AnalysisJobAdaptor->unblock_jobs_for_analysis_id( $list_of_analyses );
382  }
383 
384  if ($discard_ready_jobs) {
385  $self->{'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses, 'READY' );
386  }
387 
388  if ($forgive_failed_jobs) {
389  $self->{'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses, 'FAILED' );
390  }
391 
392  $queen->synchronize_hive( $list_of_analyses ) if $has_task;
393 
394  if($all_dead) { $queen->register_all_workers_dead(); }
395  if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
396  if($bury_unkwn_workers) { $queen->check_for_dead_workers($valley, 1, 1); }
397  if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
398 
399  my $has_error = 0;
400  if ($self->{'max_loops'}) { # positive $max_loop means limited, negative means unlimited
401 
402  $has_error = run_autonomously($self, $self->{'pipeline'}, $self->{'max_loops'}, $self->{'loop_until'}, $valley, $list_of_analyses, $self->{'analyses_pattern'}, $run_job_id);
403 
404  } else {
405  # the output of several methods will look differently depending on $analysis being [un]defined
406 
407  if($sync) {
408  $queen->synchronize_hive( $list_of_analyses );
409  }
410  my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{'debug'} );
411 
412  if($show_worker_stats) {
413  print "\n===== List of live Workers according to the Queen: ======\n";
414  foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
415  print $worker->toString(1)."\n";
416  }
417  }
418  $self->{'dba'}->get_RoleAdaptor->print_active_role_counts;
419 
420  Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $list_of_analyses) unless $self->{'read_only'}; # show what would be submitted, but do not actually submit
421 
422  if($show_failed_jobs) {
423  print("===== failed Jobs\n");
424  my $failed_job_list = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status( $list_of_analyses , 'FAILED');
425 
426  foreach my $job (@{$failed_job_list}) {
427  print $job->toString. "\n";
428  }
429  }
430  $self->{'beekeeper'}->set_cause_of_death('LOOP_LIMIT') unless $self->{'read_only'};
431  }
432 
433  exit($has_error);
434 }
435 
436 #######################
437 #
438 # subroutines
439 #
440 #######################
441 
442 sub log_and_die {
443  my ($self, $message) = @_;
444 
445  my $beekeeper = $self->{'beekeeper'};
446  if ($beekeeper) {
447  $self->{'logmessage_adaptor'}->store_beekeeper_message($beekeeper->dbID, $message, 'PIPELINE_ERROR', 'TASK_FAILED');
448  $beekeeper->set_cause_of_death('TASK_FAILED');
449  }
450  die $message;
451 }
452 
454  my ($self, $analyses_pattern, $run_job_id) = @_;
455 
456  my $worker_cmd = 'runWorker.pl';
457 
458  foreach my $worker_option ('url', 'reg_conf', 'reg_type', 'reg_alias', 'job_limit', 'life_span',
459  'worker_delay_startup_seconds', 'worker_crash_on_startup_prob', 'hive_log_dir', 'debug') {
460  if(defined(my $value = $self->{$worker_option})) {
461  $worker_cmd .= " -${worker_option} $value";
462  }
463  }
464 
465  foreach my $worker_flag ('retry_throwing_jobs', 'can_respecialize', 'force', 'nosqlvc') {
466  if(defined(my $value = $self->{$worker_flag})) {
467  if ($value == 0) {
468  $worker_cmd .= " -no${worker_flag}";
469  } else {
470  $worker_cmd .= " -${worker_flag}";
471  }
472  }
473  }
474 
475  # This option can have multiple values
476  $worker_cmd .= " -config_file $_" for @{$self->{'config_files'}};
477 
478  # special task:
479  if ($run_job_id) {
480  $worker_cmd .= " -job_id $run_job_id";
481  } elsif ($analyses_pattern) {
482  $worker_cmd .= " -analyses_pattern '".$analyses_pattern."'";
483  }
484 
485  return $worker_cmd;
486 }
487 
488 sub register_beekeeper {
489  my ($valley, $self) = @_;
490 
491  my $loop_limit = undef;
492  if ($self->{'max_loops'} > -1) {
493  $loop_limit = $self->{'max_loops'};
494  }
495 
496  my $meadow_signatures = join(",",
497  map {$_->signature} @{$self->{'available_meadow_list'}});
498 
499  # The new instance is partly initalised with the output of Valley::whereami()
500  my $beekeeper = Bio::EnsEMBL::Hive::Beekeeper->new_from_Valley($valley,
501  'sleep_minutes' => $self->{'sleep_minutes'},
502  'analyses_pattern' => $self->{'analyses_pattern'},
503  'loop_limit' => $loop_limit,
504  'loop_until' => $self->{'loop_until'},
505  'options' => $self->{'options'},
506  'meadow_signatures' => $meadow_signatures,
507  );
508 
509  $self->{'dba'}->get_BeekeeperAdaptor->store($beekeeper);
510  unless ($self->{'beekeeper_id'} = $beekeeper->dbID) {
511  die "There was a problem registering this Beekeeper with the eHive database.";
512  }
513  return $beekeeper;
514 }
515 
516 
517 sub big_red_button {
518  my ( $self, $valley ) = @_;
519 
520  my $bk_a = $self->{dba}->get_BeekeeperAdaptor();
521  my $blocked_beekeepers;
522 
523  # Save a list of IDs of beekeepers which were blocked earlier so
524  # that we can not mention them while reporting the current blocking.
525  $blocked_beekeepers = $bk_a->fetch_all( 'is_blocked = 1' );
526  my %previously_blocked_ids;
527  while ( my $blocked_bk = shift @{ $blocked_beekeepers } ) {
528  $previously_blocked_ids{ $blocked_bk->dbID() } = 1;
529  }
530 
531  # Begin the shutdown by blocking all registered beekeepers so that
532  # none of them start spawning new workers just as this one tries to
533  # kill all workers.
534  $bk_a->block_all_alive_beekeepers();
535 
536  # Report which beekeepers, self excluded, we have just blocked
537  $blocked_beekeepers = $bk_a->fetch_all( 'is_blocked = 1' );
538  my $my_dbid = $self->{'beekeeper'}->dbID();
539  my @newly_blocked = grep {
540  ( ! exists $previously_blocked_ids{ $_->dbID() } )
541  && ( $_->dbID() != $my_dbid )
542  } @{ $blocked_beekeepers };
543  while ( my $blocked_bk = shift @newly_blocked ) {
544  print 'Blocked beekeeper ' . $blocked_bk->dbID() . ': '
545  . $blocked_bk->toString() . "\n";
546  }
547 
548  # Next, kill all workers which are still alive.
549  # FIXME: double-check correct job status:
550  # - running ones should be marked as 'failed'
551  # - claimed but unstarted ones should get back to 'unclaimed'
552  my $queen = $self->{'dba'}->get_Queen();
553  $queen->kill_all_workers( $valley );
554 
555  return 0;
556 }
557 
558 
559 sub run_autonomously {
560  my ($self, $pipeline, $max_loops, $loop_until, $valley, $list_of_analyses, $analyses_pattern, $run_job_id) = @_;
561 
562  my $hive_dba = $pipeline->hive_dba;
563  my $queen = $hive_dba->get_Queen;
564  my $meadow_user = $self->{'beekeeper'} && $self->{'beekeeper'}->meadow_user;
565 
566  my $pathless_resourceless_worker_cmd = generate_worker_cmd($self, $analyses_pattern, $run_job_id);
567 
568  my $iteration=0;
569  my $reasons_to_exit;
570 
571  BKLOOP: while( ($iteration++ != $max_loops) or ($loop_until eq 'FOREVER') ) { # NB: the order of conditions is important!
572 
573  print("\nBeekeeper : loop #$iteration ======================================================\n");
574 
575  my $pipeline_name = $pipeline->hive_pipeline_name;
576  if ($pipeline_name) {
577  print "\nPipeline name: $pipeline_name";
578  }
579  print "\nPipeline URL: ".$self->{'url'}."\n\n";
580 
581  $queen->check_for_dead_workers($valley, 0) unless $self->{'read_only'};
582 
583  # this section is where the beekeeper decides whether or not to stop looping
584  $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{'debug'});
585  my @job_fail_statuses = grep({$_->{'exit_status'} eq 'JOB_FAILED'} @$reasons_to_exit);
586  my @analysis_fail_statuses = grep({$_->{'exit_status'} eq 'ANALYSIS_FAILED'} @$reasons_to_exit);
587  my @no_work_statuses = grep({$_->{'exit_status'} eq 'NO_WORK'} @$reasons_to_exit);
588 
589  my $found_reason_to_exit = 0;
590 
591  if (($loop_until eq 'JOB_FAILURE') &&
592  (scalar(@job_fail_statuses)) > 0) {
593  print "Beekeeper : last loop because at least one Job failed and loop-until mode is '$loop_until'\n";
594  print "Beekeeper : details from analyses with failed Jobs:\n";
595  print join("\n", map {$_->{'message'}} @job_fail_statuses) . "\n";
596  $found_reason_to_exit = 1;
597  last BKLOOP;
598  }
599 
600  if (scalar(@analysis_fail_statuses > 0)) {
601  # at least one analysis has hit its fault tolerance
602  if (($loop_until eq 'FOREVER') ||
603  ($loop_until eq 'NO_WORK')) {
604  if (scalar(@no_work_statuses) == 0) {
605  print "Beekeeper : detected the following exit condition(s), but staying alive because loop-until mode is set to '$loop_until' :\n" .
606  join(", ", map {$_->{'message'}} @analysis_fail_statuses) . "\n";
607  }
608  } else {
609  # loop_until_mode is either job_failure or analysis_failure, and both of these exit on analysis failure
610  unless ($found_reason_to_exit) {
611  print "Beekeeper : last loop because at least one analysis failed and loop-until mode is '$loop_until'\n";
612  print "Beekeeper : details from analyses with failed Jobs:\n";
613  print join("\n", map {$_->{'message'}} @analysis_fail_statuses) . "\n";
614  $found_reason_to_exit = 1;
615  last BKLOOP;
616  }
617  }
618  }
619 
620  if ((scalar(@no_work_statuses) > 0) &&
621  ($loop_until ne 'FOREVER')) {
622  print "Beekeeper : last loop because there is no more work and loop-until mode is '$loop_until'\n"
623  unless ($found_reason_to_exit);
624  last BKLOOP;
625  }
626 
627  # end of testing for loop end conditions
628 
629  $hive_dba->get_RoleAdaptor->print_active_role_counts;
630 
631  my $workers_to_submit_by_meadow_type_rc_name = $self->{'read_only'} ? {} :
633 
634  if( keys %$workers_to_submit_by_meadow_type_rc_name ) {
635 
636  my $submit_log_subdir;
637 
638  if( $self->{'submit_log_dir'} ) {
639  $submit_log_subdir = $self->{'submit_log_dir'}."/submit_bk".$self->{'beekeeper_id'}."_iter${iteration}";
640  make_path( $submit_log_subdir );
641  }
642 
643  # create an "index" over the freshly loaded RC/RD collections:
644  my %meadow_type_rc_name2resource_param_list = ();
645  foreach my $rd ( $pipeline->collection_of('ResourceDescription')->list ) {
646  my $rc_name = $rd->resource_class->name;
647  $meadow_type_rc_name2resource_param_list{ $rd->meadow_type }{ $rc_name } = [ $rd->submission_cmd_args, $rd->worker_cmd_args ];
648  }
649 
650  foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {
651 
652  my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
653 
654  foreach my $rc_name (keys %{ $workers_to_submit_by_meadow_type_rc_name->{$meadow_type} }) {
655  my $this_meadow_rc_worker_count = $workers_to_submit_by_meadow_type_rc_name->{$meadow_type}{$rc_name};
656 
657  my $submission_message = "submitting $this_meadow_rc_worker_count workers (rc_name=$rc_name) to ".$this_meadow->signature();
658  print "\nBeekeeper : $submission_message\n";
659  $self->{'logmessage_adaptor'}->store_beekeeper_message($self->{'beekeeper_id'},
660  "loop iteration $iteration, $submission_message",
661  'INFO', 'ALIVE');
662 
663  my ($submission_cmd_args, $worker_cmd_args) = @{ $meadow_type_rc_name2resource_param_list{ $meadow_type }{ $rc_name } || [] };
664 
665  my $specific_worker_cmd = $this_meadow->runWorker_path
666  . $pathless_resourceless_worker_cmd
667  . (defined($worker_cmd_args) ? " $worker_cmd_args" : '')
668  . ' -preregistered';
669 
670  my $meadow_process_ids = [];
671  if ($this_meadow_rc_worker_count > 1 and !$this_meadow->config_get('CanSubmitJobArrays')) {
672  foreach my $i (1..$this_meadow_rc_worker_count) {
673  my $submitted_process_id = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, 1, $self->{'beekeeper_id'}.'_'."$iteration.$i", $rc_name, $submission_cmd_args || '', $submit_log_subdir);
674  push @$meadow_process_ids, $submitted_process_id->[0];
675  }
676  } else {
677  $meadow_process_ids = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, $this_meadow_rc_worker_count, $self->{'beekeeper_id'}.'_'.$iteration, $rc_name, $submission_cmd_args || '', $submit_log_subdir);
678  }
679 
680  print "Submitted the following process_ids to ".$this_meadow->signature.": ".join(', ', @$meadow_process_ids)."\n";
681 
682  my $resource_class = $pipeline->collection_of('ResourceClass')->find_one_by('name', $rc_name);
683  my $meadow_name = $this_meadow->cached_name;
684 
685  my @pre_allocated_workers = map {
687  'meadow_type' => $meadow_type, # non-unique key components
688  'meadow_name' => $meadow_name,
689  'meadow_user' => $meadow_user,
690  'process_id' => $_,
691 
692  'resource_class' => $resource_class, # non-key, but known at the time of pre-allocation
693  'beekeeper_id' => $self->{'beekeeper_id'},
694 
695  'status' => 'SUBMITTED',
696  )
697  } @$meadow_process_ids;
698 
699  $queen->store( \@pre_allocated_workers );
700  }
701  }
702 
703  } elsif ($self->{'read_only'}) {
704  print STDERR "\n";
705  print STDERR "*" x 70, "\n";
706  print STDERR "* beekeeper.pl is running in read-only mode, i.e. it only\n";
707  print STDERR "* prints the current status of the pipeline.\n";
708  print STDERR "*\n";
709  print STDERR "*" x 70, "\n";
710  print STDERR "\n";
711 
712  } else {
713  print "\nBeekeeper : not submitting any workers this iteration\n";
714  $self->{'logmessage_adaptor'}->store_beekeeper_message($self->{'beekeeper_id'},
715  "loop iteration $iteration, 0 workers submitted",
716  'INFO', 'ALIVE');
717  }
718 
719  if( $iteration != $max_loops ) { # skip the last sleep
720  while (1) {
721  $hive_dba->dbc->disconnect_if_idle;
722  printf("Beekeeper : going to sleep for %.2f minute(s). Expect next iteration at %s\n", $self->{'sleep_minutes'}, scalar localtime(time+$self->{'sleep_minutes'}*60));
723  sleep($self->{'sleep_minutes'}*60);
724  last if $self->{'read_only'};
725  # this is a good time to check up on other beekeepers as well:
726  $self->{'beekeeper'}->adaptor->bury_other_beekeepers($self->{'beekeeper'});
727  if ($self->{'beekeeper'}->check_if_blocked()) {
728  print "Beekeeper : We have been blocked !\n".
729  "This can happen if a Job has explicitly required the Beekeeper to stop (have a look at log_message).\n".
730  "It may also happen if someone has set is_blocked=1 in the beekeeper table for beekeeper_id=".$self->{'beekeeper_id'}.".\n";
731  } else {
732  last;
733  }
734  }
735 
736  # after waking up reload Resources and Analyses to stay current.
737  unless($run_job_id) {
738  # reset all the collections so that fresher data will be used at this iteration:
739  $pipeline->invalidate_collections();
740  $pipeline->invalidate_hive_current_load();
741 
742  $list_of_analyses = $pipeline->collection_of('Analysis')->find_all_by_pattern( $analyses_pattern );
743  }
744  }
745  }
746 
747  # in this section, the beekeeper determines why it exited, sets an appropriate cause of death,
748  # and prints/logs an appropriate message
749  my @stringified_reasons_builder;
750  my $beekeeper_cause_of_death;
751  my $cause_of_death_is_error;
752  my %exit_statuses; # keep a set of unique exit statuses seen
753  if ($reasons_to_exit) {
754  foreach my $reason_to_exit (@$reasons_to_exit) {
755  $exit_statuses{$reason_to_exit->{'exit_status'}} = 1;
756  push(@stringified_reasons_builder, $reason_to_exit->{'message'});
757  }
758  }
759 
760  my $stringified_reasons = join(", ", @stringified_reasons_builder);
761 
762  if (($loop_until eq 'JOB_FAILURE') &&
763  (grep(/JOB_FAILED/, keys(%exit_statuses)))) {
764  $beekeeper_cause_of_death = 'JOB_FAILED';
765  $cause_of_death_is_error = 1;
766  }
767 
768  if (($loop_until eq 'ANALYSIS_FAILURE') &&
769  (grep(/ANALYSIS_FAILED/, keys(%exit_statuses)))) {
770  $beekeeper_cause_of_death = 'ANALYSIS_FAILED';
771  $cause_of_death_is_error = 1;
772  }
773 
774  if (!$beekeeper_cause_of_death) {
775  if (grep(/NO_WORK/, keys(%exit_statuses))) {
776  $beekeeper_cause_of_death = 'NO_WORK';
777  } else {
778  $beekeeper_cause_of_death = 'LOOP_LIMIT';
779  }
780  $cause_of_death_is_error = 0;
781  }
782 
783  $self->{'logmessage_adaptor'}->store_beekeeper_message($self->{'beekeeper_id'},
784  "stopped looping because of $stringified_reasons",
785  $cause_of_death_is_error ? 'PIPELINE_ERROR' : 'INFO',
786  $beekeeper_cause_of_death) unless $self->{'read_only'};
787 
788  if ($reasons_to_exit and $ENV{EHIVE_SLACK_WEBHOOK}) {
789  send_beekeeper_message_to_slack($ENV{EHIVE_SLACK_WEBHOOK}, $self->{'pipeline'}, $cause_of_death_is_error, 1, $stringified_reasons, $loop_until);
790  }
791 
792  $self->{'beekeeper'}->set_cause_of_death($beekeeper_cause_of_death) unless $self->{'read_only'};
793  printf("Beekeeper: dbc %d disconnect cycles\n", $hive_dba->dbc->disconnect_count);
794  return $cause_of_death_is_error;
795 }
796 
797 
798 __DATA__
799 
800 =pod
801 
802 =head1 NAME
803 
804 beekeeper.pl [options]
805 
806 =head1 DESCRIPTION
807 
808 The Beekeeper is in charge of interfacing between the eHive database a compute resource or 'compute farm'.
809 Its Job is to synchronise both, to assess the compute requirements of the pipeline
810 and to send the requested number of workers to open machines via the runWorker.pl script.
811 
812 It is also responsible for identifying workers which died
813 unexpectedly so that dead workers can be released and unfinished Jobs reclaimed.
814 
815 =head1 USAGE EXAMPLES
816 
817  # Usually run after the pipeline has been created to calculate the internal statistics necessary for eHive functioning
818  beekeeper.pl -url mysql://username:secret@hostname:port/ehive_dbname -sync
819 
820  # Do not run any additional Workers, just check for the current status of the pipeline:
821  beekeeper.pl -url mysql://username:secret@hostname:port/ehive_dbname
822 
823  # Run the pipeline in automatic mode (-loop), run all the workers locally (-meadow_type LOCAL) and allow for 3 parallel workers (-total_running_workers_max 3)
824  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -meadow_type LOCAL -total_running_workers_max 3 -loop
825 
826  # Run in automatic mode, but only restrict to running blast-related analyses with the exception of analyses 4..6
827  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -analyses_pattern 'blast%-4..6' -loop
828 
829  # Restrict the normal execution to one iteration only - can be used for testing a newly set up pipeline
830  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -run
831 
832  # Reset failed 'buggy_analysis' Jobs to 'READY' state, so that they can be run again
833  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -analyses_pattern buggy_analysis -reset_failed_jobs
834 
835  # Do a cleanup: find and bury dead workers, reclaim their Jobs
836  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -dead
837 
838 =head1 OPTIONS
839 
840 =head2 Connection parameters
841 
842 =over
843 
844 =item --reg_conf <path>
845 
846 Path to a Registry configuration file
847 
848 =item --reg_type <string>
849 
850 Type of the registry entry ("hive", "core", "compara", etc. - defaults to "hive")
851 
852 =item --reg_alias <string>
853 
854 Species / alias name for the eHive DBAdaptor
855 
856 =item --url <url string>
857 
858 URL defining where eHive database is located
859 
860 =item --nosqlvc
861 
862 "No SQL Version Check" - set if you want to force working with a database created by a potentially schema-incompatible API
863 
864 =back
865 
866 =head2 Configs overriding
867 
868 =over
869 
870 =item --config_file <string>
871 
872 JSON file (with absolute path) to override the default configurations (could be multiple)
873 
874 =back
875 
876 =head2 Looping control
877 
878 =over
879 
880 =item --loop
881 
882 run autonomously, loops and sleeps. Equivalent to -loop_until ANALYSIS_FAILURE
883 
884 =item --loop_until
885 
886 sets the level of event that will cause the Beekeeper to stop looping:
887 
888 =over
889 
890 =item JOB_FAILURE
891 
892 stop looping if any Job fails
893 
894 =item ANALYSIS_FAILURE
895 
896 stop looping if any Analysis has Job failures exceeding its fault tolerance
897 
898 =item NO_WORK
899 
900 ignore Job and Analysis failures, keep looping until there is no work
901 
902 =item FOREVER
903 
904 ignore failures and no work, keep looping
905 
906 =back
907 
908 =item --keep_alive
909 
910 (Deprecated) alias for -loop_until FOREVER
911 
912 =item --max_loops <num>
913 
914 perform max this # of loops in autonomous mode. The Beekeeper will stop when
915 it has performed max_loops loops, even in FOREVER mode
916 
917 =item --job_id <job_id>
918 
919 run one iteration for this job_id
920 
921 =item --run
922 
923 run one iteration of automation loop
924 
925 =item --sleep <num>
926 
927 when looping, sleep <num> minutes (default 1 min)
928 
929 =back
930 
931 =head2 Current Meadow control
932 
933 =over
934 
935 =item --meadow_type <string>
936 
937 the desired Meadow class name, such as 'LSF' or 'LOCAL'
938 
939 =item --total_running_workers_max <num>
940 
941 max # workers to be running in parallel
942 
943 =item --submit_workers_max <num>
944 
945 max # workers to create per loop iteration
946 
947 =item --submission_options <string>
948 
949 passes <string> to the Meadow submission command as <options> (formerly lsf_options)
950 
951 =item --submit_log_dir <dir>
952 
953 record submission output+error streams into files under the given directory (to see why some workers fail after submission)
954 
955 =back
956 
957 =head2 Worker control
958 
959 =over
960 
961 =item --analyses_pattern <string>
962 
963 restrict the sync operation, printing of stats or looping of the Beekeeper to the specified subset of Analyses
964 
965 =item --nocan_respecialize
966 
967 prevent workers from re-specializing into another Analysis (within resource_class) after their previous Analysis is exhausted
968 
969 =item --force
970 
971 run all workers with -force (see runWorker.pl)
972 
973 =item --killworker <worker_id>
974 
975 kill Worker by worker_id
976 
977 =item --life_span <num>
978 
979 number of minutes each Worker is allowed to run
980 
981 =item --job_limit <num>
982 
983 Number of Jobs to run before Worker can die naturally
984 
985 =item --retry_throwing_jobs
986 
987 if a Job dies *knowingly* (e.g. by encountering a die statement in the Runnable), should we retry it by default?
988 
989 =item --hive_log_dir <path>
990 
991 directory where stdout/stderr of the eHive is redirected
992 
993 =item --worker_delay_startup_seconds <number>
994 
995 number of seconds each Worker has to wait before first talking to the database (0 by default, useful for debugging)
996 
997 =item --worker_crash_on_startup_prob <float>
998 
999 probability of each Worker failing at startup (0 by default, useful for debugging)
1000 
1001 =item --debug <debug_level>
1002 
1003 set debug level of the workers
1004 
1005 =back
1006 
1007 =head2 Other commands/options
1008 
1009 =over
1010 
1011 =item --help
1012 
1013 print this help
1014 
1015 =item --versions
1016 
1017 report both eHive code version and eHive database schema version
1018 
1019 =item --dead
1020 
1021 detect all unaccounted dead workers and reset their Jobs for resubmission
1022 
1023 =item --sync
1024 
1025 re-synchronise the ehive
1026 
1027 =item --unkwn
1028 
1029 detect all workers in UNKWN state and reset their Jobs for resubmission (careful, they *may* reincarnate!)
1030 
1031 =item --big_red_button
1032 
1033 shut everything down: block all beekeepers connected to the pipeline and terminate workers
1034 
1035 =item --alldead
1036 
1037 tell the database all workers are dead (no checks are performed in this mode, so be very careful!)
1038 
1039 =item --balance_semaphores
1040 
1041 set all Semaphore counters to the numbers of unDONE fan Jobs (emergency use only)
1042 
1043 =item --worker_stats
1044 
1045 show status of each running Worker
1046 
1047 =item --failed_jobs
1048 
1049 show all failed Jobs
1050 
1051 =item --job_output <job_id>
1052 
1053 print details for one Job
1054 
1055 =item --reset_job_id <num>
1056 
1057 reset a Job back to READY so it can be rerun
1058 
1059 =item --reset_failed_jobs
1060 
1061 reset FAILED Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1062 
1063 =item --reset_done_jobs
1064 
1065 reset DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1066 
1067 =item --reset_all_jobs
1068 
1069 reset FAILED, DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1070 
1071 =item --forgive_failed_jobs
1072 
1073 mark FAILED Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1074 
1075 =item --discard_ready_jobs
1076 
1077 mark READY Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1078 
1079 =item --unblock_semaphored_jobs
1080 
1081 set SEMAPHORED Jobs of analyses matching -analyses_pattern to READY so they can start
1082 
1083 =back
1084 
1085 =head1 LICENSE
1086 
1087  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
1088  Copyright [2016-2024] EMBL-European Bioinformatics Institute
1089 
1090  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
1091  You may obtain a copy of the License at
1092 
1093  http://www.apache.org/licenses/LICENSE-2.0
1094 
1095  Unless required by applicable law or agreed to in writing, software distributed under the License
1096  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1097  See the License for the specific language governing permissions and limitations under the License.
1098 
1099 =head1 CONTACT
1100 
1101 Please subscribe to the eHive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss eHive-related questions or to be notified of our updates
1102 
1103 =cut
1104 
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::Utils::URL::hide_url_password
public Void hide_url_password()
Bio::EnsEMBL::Hive::Utils::URL
Definition: URL.pm:11
map
public map()
Bio::EnsEMBL::Hive::Beekeeper::new_from_Valley
public Bio::EnsEMBL::Hive::Beekeeper new_from_Valley()
log_and_die
public log_and_die()
Bio::EnsEMBL::Hive::Utils::Config::new
public new()
run_autonomously
public run_autonomously()
Bio::EnsEMBL::Hive::Valley::new
public new()
Bio::EnsEMBL::Hive::Utils::Config
Definition: Config.pm:12
Bio::EnsEMBL::Hive::HivePipeline::new
public new()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Beekeeper
Definition: Beekeeper.pm:13
Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary
public schedule_workers_resync_if_necessary()
Bio::EnsEMBL::Hive::Storable::new
public Bio::EnsEMBL::Hive::Storable new()
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
debug
public debug()
Bio::EnsEMBL::Hive::HivePipeline
Definition: HivePipeline.pm:13
main
public main()
register_beekeeper
public register_beekeeper()
BEGIN
public BEGIN()
run
public run()
generate_worker_cmd
public generate_worker_cmd()
Bio::EnsEMBL::Hive::Valley
Definition: Valley.pm:16
Bio::EnsEMBL::Hive::Scheduler
Definition: Scheduler.pm:15
Bio::EnsEMBL::Hive::DBSQL::LogMessageAdaptor
Definition: LogMessageAdaptor.pm:22
Bio::EnsEMBL::Hive::Utils::Slack
Definition: Slack.pm:10
big_red_button
public big_red_button()