ensembl-hive  2.8.1
beekeeper.pl
Go to the documentation of this file.
1 #!/usr/bin/env perl
2 
3 use strict;
4 use warnings;
5 
6  # Finding out own path in order to reference own components (including own modules):
7 use Cwd ();
8 use File::Basename ();
9 BEGIN {
10  $ENV{'EHIVE_ROOT_DIR'} ||= File::Basename::dirname( File::Basename::dirname( Cwd::realpath($0) ) );
11  unshift @INC, $ENV{'EHIVE_ROOT_DIR'}.'/modules';
12 }
13 
14 use File::Path 'make_path';
15 use Getopt::Long qw(:config no_auto_abbrev);
16 use Pod::Usage;
17 
18 use Bio::EnsEMBL::Hive::DBSQL::LogMessageAdaptor ('store_beekeeper_message');
19 use Bio::EnsEMBL::Hive::Utils ('destringify');
20 use Bio::EnsEMBL::Hive::Utils::Slack ('send_beekeeper_message_to_slack');
23 use Bio::EnsEMBL::Hive::Version ('report_versions');
28 
30 
31 
32 main();
33 
34 
35 sub main {
36  $|=1; # make STDOUT unbuffered (STDERR is unbuffered anyway)
37 
38  # ok this is a hack, but I'm going to pretend I've got an object here
39  # by creating a hash ref and passing it around like an object
40  # this is to avoid using global variables in functions, and to consolidate
41  # the globals into a nice '$self' package
42  my $self = {};
43 
44  my $help = 0;
45  my $report_versions = 0;
46  my $loopit = 0;
47  my $sync = 0;
48  my $local = 0;
49  my $show_failed_jobs = 0;
50  my $default_meadow_type = undef;
51  my $submit_workers_max = undef;
52  my $total_running_workers_max = undef;
53  my $submission_options = undef;
54  my $run = 0;
55  my $run_job_id = undef;
56  my $force = undef;
57  my $check_for_dead = 0;
58  my $bury_unkwn_workers = 0;
59  my $all_dead = 0;
60  my $balance_semaphores = 0;
61  my $job_id_for_output = 0;
62  my $show_worker_stats = 0;
63  my $kill_worker_id = 0;
64  my $big_red_button = 0;
65  my $keep_alive = 0; # DEPRECATED
66  my $reset_job_id = 0;
67  my $reset_all_jobs_for_analysis = 0; # DEPRECATED
68  my $reset_failed_jobs_for_analysis = 0; # DEPRECATED
69  my $reset_all_jobs = 0; # Mark DONE, PASSED_ON and FAILED jobs to READY
70  my $reset_failed_jobs = 0; # Mark FAILED jobs to READY
71  my $reset_done_jobs = 0; # Mark DONE and PASSED_ON jobs to READY
72  my $unblock_semaphored_jobs = 0; # Mark SEMAPHORED jobs to READY
73  my $forgive_failed_jobs = 0; # Mark FAILED jobs to DONE
74  my $discard_ready_jobs = 0; # Mark READY jobs to DONE
75 
76  $self->{'url'} = undef;
77  $self->{'reg_conf'} = undef;
78  $self->{'reg_type'} = undef;
79  $self->{'reg_alias'} = undef;
80  $self->{'nosqlvc'} = undef;
81 
82  $self->{'config_files'} = [];
83 
84  $self->{'sleep_minutes'} = 1;
85  $self->{'max_loops'} = 0;
86  $self->{'retry_throwing_jobs'} = undef;
87  $self->{'loop_until'} = undef;
88  $self->{'can_respecialize'} = 1;
89  $self->{'hive_log_dir'} = undef;
90  $self->{'submit_log_dir'} = undef;
91  $self->{'worker_delay_startup_seconds'} = undef;
92  $self->{'worker_crash_on_startup_prob'} = undef;
93 
94  # store all the options passed on the command line for registration
95  # as GetOptions modifies @ARGV
96  my @original_argv = @ARGV;
97 
98  GetOptions(
99  # connection parameters
100  'url=s' => \$self->{'url'},
101  'reg_conf|regfile|reg_file=s' => \$self->{'reg_conf'},
102  'reg_type=s' => \$self->{'reg_type'},
103  'reg_alias|regname|reg_name=s' => \$self->{'reg_alias'},
104  'nosqlvc' => \$self->{'nosqlvc'}, # using "nosqlvc" instead of "sqlvc!" for compatibility with modules where it is a propagated option
105 
106  # json config files
107  'config_file=s@' => $self->{'config_files'},
108 
109  # loop control
110  'run' => \$run,
111  'loop' => \$loopit,
112  'max_loops=i' => \$self->{'max_loops'},
113  'loop_until=s' => \$self->{'loop_until'},
114  'keep_alive' => \$keep_alive,
115  'job_id|run_job_id=i'=> \$run_job_id,
116  'force!' => \$self->{'force'},
117  'sleep=f' => \$self->{'sleep_minutes'},
118 
119  # meadow control
120  'local!' => \$local,
121  'meadow_type=s' => \$default_meadow_type,
122  'total_running_workers_max=i' => \$total_running_workers_max,
123  'submit_workers_max=i' => \$submit_workers_max,
124  'submission_options=s' => \$submission_options,
125 
126  # worker control
127  'job_limit=i' => \$self->{'job_limit'},
128  'life_span|lifespan=i' => \$self->{'life_span'},
129  'logic_name=s' => \$self->{'logic_name'},
130  'analyses_pattern=s' => \$self->{'analyses_pattern'},
131  'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
132  'retry_throwing_jobs!' => \$self->{'retry_throwing_jobs'},
133  'can_respecialize|can_respecialise!' => \$self->{'can_respecialize'},
134  'debug=i' => \$self->{'debug'},
135  'submit_log_dir=s' => \$self->{'submit_log_dir'},
136  'worker_delay_startup_seconds=i' => \$self->{'worker_delay_startup_seconds'},
137  'worker_crash_on_startup_prob=f' => \$self->{'worker_crash_on_startup_prob'},
138 
139  # other commands/options
140  'h|help!' => \$help,
141  'v|version|versions!' => \$report_versions,
142  'sync!' => \$sync,
143  'dead!' => \$check_for_dead,
144  'unkwn!' => \$bury_unkwn_workers,
145  'killworker=i' => \$kill_worker_id,
146  'big_red_button' => \$big_red_button,
147  'alldead!' => \$all_dead,
148  'balance_semaphores'=> \$balance_semaphores,
149  'worker_stats' => \$show_worker_stats,
150  'failed_jobs' => \$show_failed_jobs,
151  'reset_job_id=i' => \$reset_job_id,
152  'reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
153  'reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
154  'reset_failed_jobs' => \$reset_failed_jobs,
155  'reset_all_jobs' => \$reset_all_jobs,
156  'reset_done_jobs' => \$reset_done_jobs,
157  'discard_ready_jobs' => \$discard_ready_jobs,
158  'forgive_failed_jobs' => \$forgive_failed_jobs,
159  'unblock_semaphored_jobs' => \$unblock_semaphored_jobs,
160  'job_output=i' => \$job_id_for_output,
161  ) or die "Error in command line arguments\n";
162 
163  if (@ARGV) {
164  die "ERROR: There are invalid arguments on the command-line: ". join(" ", @ARGV). "\n";
165  }
166 
167  if ($help) {
168  pod2usage({-exitvalue => 0, -verbose => 2});
169  }
170 
171  if($report_versions) {
172  report_versions();
173  exit(0);
174  }
175 
176  my $config = Bio::EnsEMBL::Hive::Utils::Config->new(@{$self->{'config_files'}});
177 
178  # if -keep_alive passed, ensure looping is on and loop_until is forever
179  if ($keep_alive) {
180  $self->{'loop_until'} = 'FOREVER';
181  $loopit = 1;
182  }
183 
184  # if user has specified -loop_until, ensure looping is turned on
185  if ($self->{'loop_until'}) {
186  $loopit = 1;
187  }
188 
189  # if loop_until hasn't been set by the user, or defaulted by a flag,
190  # set it to ANALYSIS_FAILURE
191  unless ($self->{'loop_until'}) {
192  $self->{'loop_until'} = 'ANALYSIS_FAILURE';
193  }
194 
195  if($run or $run_job_id) {
196  $self->{'max_loops'} = 1;
197  } elsif ($loopit) {
198  unless($self->{'max_loops'}) {
199  $self->{'max_loops'} = -1; # unlimited
200  }
201  }
202 
203  if($self->{'url'} or $self->{'reg_alias'}) {
204 
205  $self->{'pipeline'} = Bio::EnsEMBL::Hive::HivePipeline->new(
206  -url => $self->{'url'},
207  -reg_conf => $self->{'reg_conf'},
208  -reg_type => $self->{'reg_type'},
209  -reg_alias => $self->{'reg_alias'},
210  -no_sql_schema_version_check => $self->{'nosqlvc'},
211  );
212 
213  $self->{'dba'} = $self->{'pipeline'}->hive_dba();
214 
215  } else {
216  die "\nERROR: Connection parameters (url or reg_conf+reg_alias) need to be specified\n";
217  }
218 
219  $self->{'options'} = join(" ", @original_argv);
220 
221  # make -loop_until case insensitive
222  $self->{'loop_until'} = uc($self->{'loop_until'});
223 
224  my @allowed_loop_until_values = qw(ANALYSIS_FAILURE FOREVER JOB_FAILURE NO_WORK);
225  unless (grep {$_ eq $self->{'loop_until'}} @allowed_loop_until_values) {
226  die sprintf('"%s" is not a recognized value for -loop_until. Use one of %s', $self->{'loop_until'}, join('/', @allowed_loop_until_values));
227  }
228 
229  my $pipeline_name = $self->{'pipeline'}->hive_pipeline_name;
230 
231  if($pipeline_name) {
232  print "Pipeline name: $pipeline_name\n";
233  } else {
234  print STDERR "+---------------------------------------------------------------------+\n";
235  print STDERR "! !\n";
236  print STDERR "! WARNING: !\n";
237  print STDERR "! !\n";
238  print STDERR "! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
239  print STDERR "! This may seriously impair your beekeeping experience unless you are !\n";
240  print STDERR "! the only farm user. The name should be set in your PipeConfig file, !\n";
241  print STDERR "! or if you are running an old pipeline you can just set it by hand !\n";
242  print STDERR "! in the 'meta' table. !\n";
243  print STDERR "! !\n";
244  print STDERR "+---------------------------------------------------------------------+\n";
245  }
246 
247  unless ($self->{'dba'}->dbc->has_write_access) {
248  my $dbc = $self->{'dba'}->dbc;
249  print STDERR "\n";
250  print STDERR "*" x 70, "\n";
251  print STDERR sprintf("* It appears that %s doesn't have INSERT/UPDATE/DELETE privileges\n", $dbc->username);
252  print STDERR sprintf("* on this database (%s). Please check the credentials\n", $dbc->dbname);
253  print STDERR "*\n";
254  print STDERR "*" x 70, "\n";
255  print STDERR "\n";
256  undef $run_job_id;
257  undef $reset_job_id;
258  undef $reset_all_jobs;
259  undef $reset_failed_jobs;
260  undef $reset_done_jobs;
261  undef $unblock_semaphored_jobs;
262  undef $forgive_failed_jobs;
263  undef $discard_ready_jobs;
264  undef $kill_worker_id;
265  undef $sync;
266  $self->{'read_only'} = 1;
267  }
268 
269  if($run_job_id) {
270  $submit_workers_max = 1;
271  }
272 
273  $default_meadow_type = 'LOCAL' if($local);
274  my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
275  $self->{'available_meadow_list'} = $valley->get_available_meadow_list();
276 
277  $valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
278 
279  my $default_meadow = $valley->get_default_meadow();
280  print "Default meadow: ".$default_meadow->signature."\n\n";
281 
282  $default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
283  $default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
284 
285  my $queen = $self->{'dba'}->get_Queen;
286 
287  if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
288 
289  if($job_id_for_output) {
290  printf("===== Job output\n");
291  my $job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
292  print $job->toString. "\n";
293  }
294 
295  if($reset_all_jobs_for_analysis) {
296  die "Deprecated option -reset_all_jobs_for_analysis. Please use -reset_all_jobs in combination with -analyses_pattern <pattern>";
297  }
298  if($reset_failed_jobs_for_analysis) {
299  die "Deprecated option -reset_failed_jobs_for_analysis. Please use -reset_failed_jobs in combination with -analyses_pattern <pattern>";
300  }
301 
302  if( $self->{'logic_name'} ) { # FIXME: for now, logic_name will override analyses_pattern quietly
303  warn "-logic_name is now deprecated, please use -analyses_pattern that extends the functionality of -logic_name .\n";
304  $self->{'analyses_pattern'} = $self->{'logic_name'};
305  }
306 
307  # May die if running within a non-LOCAL meadow
308  unless ($self->{'read_only'}) {
309  $self->{'beekeeper'} = register_beekeeper($valley, $self);
310  }
311  $self->{'logmessage_adaptor'} = $self->{'dba'}->get_LogMessageAdaptor();
312 
313  # Check other beekeepers in our meadow to see if they are still alive
314  $self->{'beekeeper'}->adaptor->bury_other_beekeepers($self->{'beekeeper'}) unless $self->{'read_only'};
315 
316  if ($kill_worker_id) {
317  my $kill_worker;
318  eval {$kill_worker = $queen->fetch_by_dbID($kill_worker_id) or die};
319  if ($@) {
320  log_and_die($self, "Could not fetch Worker with dbID='$kill_worker_id' to kill");
321  }
322 
323  unless( $kill_worker->cause_of_death() ) {
324  if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
325 
326  if( $meadow->check_worker_is_alive_and_mine($kill_worker) ) {
327  printf("Killing worker: %10d %35s %15s : ",
328  $kill_worker->dbID, $kill_worker->meadow_host, $kill_worker->process_id);
329 
330  $meadow->kill_worker($kill_worker);
331  $kill_worker->cause_of_death('KILLED_BY_USER');
332  $queen->register_worker_death($kill_worker);
333  # what about clean-up? Should we do it here or not?
334  } else {
335  log_and_die($self, "According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
336  }
337  } else {
338  log_and_die($self, "Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill");
339  }
340  } else {
341  log_and_die($self, "According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
342  }
343  }
344 
345  if ( $big_red_button ) {
346  return big_red_button( $self, $valley );
347  }
348 
349  my $run_job;
350  if($run_job_id) {
351  eval {$run_job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id ) or die};
352  if ($@) {
353  log_and_die($self, "Could not fetch Job with dbID=$run_job_id.\n");
354  }
355  }
356 
357  my $list_of_analyses = $run_job
358  ? [ $run_job->analysis ]
359  : $self->{'pipeline'}->collection_of('Analysis')->find_all_by_pattern( $self->{'analyses_pattern'} );
360 
361  if( $self->{'analyses_pattern'} ) {
362  if( @$list_of_analyses ) {
363  print "Beekeeper : the following Analyses matched your -analyses_pattern '".$self->{'analyses_pattern'}."' : "
364  . join(', ', map { $_->logic_name.'('.$_->dbID.')' } sort {$a->dbID <=> $b->dbID} @$list_of_analyses)
365  . "\nBeekeeper : ", scalar($self->{'pipeline'}->collection_of('Analysis')->list())-scalar(@$list_of_analyses), " Analyses are not shown\n\n";
366  } else {
367  log_and_die($self, "Beekeeper : the -analyses_pattern '".$self->{'analyses_pattern'}."' did not match any Analyses.\n");
368  }
369  }
370 
371  my $has_task = ($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs || $unblock_semaphored_jobs || $forgive_failed_jobs || $discard_ready_jobs);
372  if($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs) {
373  if (($reset_all_jobs || $reset_done_jobs) and not $self->{'analyses_pattern'}) {
374  log_and_die($self, "Beekeeper : do you really want to reset *all* the Jobs ? If yes, add \"-analyses_pattern '%'\" to the command line\n");
375  }
376  my $statuses_to_reset = $reset_failed_jobs ? [ 'FAILED' ] : ($reset_done_jobs ? [ 'DONE', 'PASSED_ON' ] : [ 'DONE', 'FAILED', 'PASSED_ON' ]);
377  $self->{'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id( $list_of_analyses, $statuses_to_reset );
378  }
379 
380  if ($unblock_semaphored_jobs) {
381  $self->{'dba'}->get_AnalysisJobAdaptor->unblock_jobs_for_analysis_id( $list_of_analyses );
382  }
383 
384  if ($discard_ready_jobs) {
385  $self->{'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses, 'READY' );
386  }
387 
388  if ($forgive_failed_jobs) {
389  $self->{'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses, 'FAILED' );
390  }
391 
392  $queen->synchronize_hive( $list_of_analyses ) if $has_task;
393 
394  if($all_dead) { $queen->register_all_workers_dead(); }
395  if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
396  if($bury_unkwn_workers) { $queen->check_for_dead_workers($valley, 1, 1); }
397  if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
398 
399  my $has_error = 0;
400  if ($self->{'max_loops'}) { # positive $max_loop means limited, negative means unlimited
401 
402  $has_error = run_autonomously($self, $self->{'pipeline'}, $self->{'max_loops'}, $self->{'loop_until'}, $valley, $list_of_analyses, $self->{'analyses_pattern'}, $run_job_id);
403 
404  } else {
405  # the output of several methods will look differently depending on $analysis being [un]defined
406 
407  if($sync) {
408  $queen->synchronize_hive( $list_of_analyses );
409  }
410  my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{'debug'} );
411 
412  if($show_worker_stats) {
413  print "\n===== List of live Workers according to the Queen: ======\n";
414  foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
415  print $worker->toString(1)."\n";
416  }
417  }
418  $self->{'dba'}->get_RoleAdaptor->print_active_role_counts;
419 
420  Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $list_of_analyses) unless $self->{'read_only'}; # show what would be submitted, but do not actually submit
421 
422  if($show_failed_jobs) {
423  print("===== failed Jobs\n");
424  my $failed_job_list = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status( $list_of_analyses , 'FAILED');
425 
426  foreach my $job (@{$failed_job_list}) {
427  print $job->toString. "\n";
428  }
429  }
430  $self->{'beekeeper'}->set_cause_of_death('LOOP_LIMIT') unless $self->{'read_only'};
431  }
432 
433  exit($has_error);
434 }
435 
436 #######################
437 #
438 # subroutines
439 #
440 #######################
441 
442 sub log_and_die {
443  my ($self, $message) = @_;
444 
445  my $beekeeper = $self->{'beekeeper'};
446  if ($beekeeper) {
447  $self->{'logmessage_adaptor'}->store_beekeeper_message($beekeeper->dbID, $message, 'PIPELINE_ERROR', 'TASK_FAILED');
448  $beekeeper->set_cause_of_death('TASK_FAILED');
449  }
450  die $message;
451 }
452 
454  my ($self, $analyses_pattern, $run_job_id) = @_;
455 
456  my $worker_cmd = 'runWorker.pl';
457 
458  foreach my $worker_option ('url', 'reg_conf', 'reg_type', 'reg_alias', 'job_limit', 'life_span',
459  'worker_delay_startup_seconds', 'worker_crash_on_startup_prob', 'hive_log_dir', 'debug') {
460  if(defined(my $value = $self->{$worker_option})) {
461  $worker_cmd .= " -${worker_option} $value";
462  }
463  }
464 
465  foreach my $worker_flag ('retry_throwing_jobs', 'can_respecialize', 'force', 'nosqlvc') {
466  if(defined(my $value = $self->{$worker_flag})) {
467  if ($value == 0) {
468  $worker_cmd .= " -no${worker_flag}";
469  } else {
470  $worker_cmd .= " -${worker_flag}";
471  }
472  }
473  }
474 
475  # This option can have multiple values
476  $worker_cmd .= " -config_file $_" for @{$self->{'config_files'}};
477 
478  # special task:
479  if ($run_job_id) {
480  $worker_cmd .= " -job_id $run_job_id";
481  } elsif ($analyses_pattern) {
482  $worker_cmd .= " -analyses_pattern '".$analyses_pattern."'";
483  }
484 
485  return $worker_cmd;
486 }
487 
488 sub register_beekeeper {
489  my ($valley, $self) = @_;
490 
491  my $loop_limit = undef;
492  if ($self->{'max_loops'} > -1) {
493  $loop_limit = $self->{'max_loops'};
494  }
495 
496  my $meadow_signatures = join(",",
497  map {$_->signature} @{$self->{'available_meadow_list'}});
498 
499  # The new instance is partly initalised with the output of Valley::whereami()
500  my $beekeeper = Bio::EnsEMBL::Hive::Beekeeper->new_from_Valley($valley,
501  'sleep_minutes' => $self->{'sleep_minutes'},
502  'analyses_pattern' => $self->{'analyses_pattern'},
503  'loop_limit' => $loop_limit,
504  'loop_until' => $self->{'loop_until'},
505  'options' => $self->{'options'},
506  'meadow_signatures' => $meadow_signatures,
507  );
508 
509  $self->{'dba'}->get_BeekeeperAdaptor->store($beekeeper);
510  unless ($self->{'beekeeper_id'} = $beekeeper->dbID) {
511  die "There was a problem registering this Beekeeper with the eHive database.";
512  }
513  return $beekeeper;
514 }
515 
516 
517 sub big_red_button {
518  my ( $self, $valley ) = @_;
519 
520  my $bk_a = $self->{dba}->get_BeekeeperAdaptor();
521  my $blocked_beekeepers;
522 
523  # Save a list of IDs of beekeepers which were blocked earlier so
524  # that we can not mention them while reporting the current blocking.
525  $blocked_beekeepers = $bk_a->fetch_all( 'is_blocked = 1' );
526  my %previously_blocked_ids;
527  while ( my $blocked_bk = shift @{ $blocked_beekeepers } ) {
528  $previously_blocked_ids{ $blocked_bk->dbID() } = 1;
529  }
530 
531  # Begin the shutdown by blocking all registered beekeepers so that
532  # none of them start spawning new workers just as this one tries to
533  # kill all workers.
534  $bk_a->block_all_alive_beekeepers();
535 
536  # Report which beekeepers, self excluded, we have just blocked
537  $blocked_beekeepers = $bk_a->fetch_all( 'is_blocked = 1' );
538  my $my_dbid = $self->{'beekeeper'}->dbID();
539  my @newly_blocked = grep {
540  ( ! exists $previously_blocked_ids{ $_->dbID() } )
541  && ( $_->dbID() != $my_dbid )
542  } @{ $blocked_beekeepers };
543  while ( my $blocked_bk = shift @newly_blocked ) {
544  print 'Blocked beekeeper ' . $blocked_bk->dbID() . ': '
545  . $blocked_bk->toString() . "\n";
546  }
547 
548  # Next, kill all workers which are still alive.
549  # FIXME: double-check correct job status:
550  # - running ones should be marked as 'failed'
551  # - claimed but unstarted ones should get back to 'unclaimed'
552  my $queen = $self->{'dba'}->get_Queen();
553  $queen->kill_all_workers( $valley );
554 
555  return 0;
556 }
557 
558 
559 sub run_autonomously {
560  my ($self, $pipeline, $max_loops, $loop_until, $valley, $list_of_analyses, $analyses_pattern, $run_job_id) = @_;
561 
562  my $hive_dba = $pipeline->hive_dba;
563  my $queen = $hive_dba->get_Queen;
564  my $meadow_user = $self->{'beekeeper'} && $self->{'beekeeper'}->meadow_user;
565 
566  my $pathless_resourceless_worker_cmd = generate_worker_cmd($self, $analyses_pattern, $run_job_id);
567 
568  my $iteration=0;
569  my $reasons_to_exit;
570 
571  BKLOOP: while( ($iteration++ != $max_loops) or ($loop_until eq 'FOREVER') ) { # NB: the order of conditions is important!
572 
573  print("\nBeekeeper : loop #$iteration ======================================================\n");
574 
575  my $pipeline_name = $pipeline->hive_pipeline_name;
576  if ($pipeline_name) {
577  print "\nPipeline name: $pipeline_name";
578  }
579  print "\nPipeline URL: ".$self->{'url'}."\n\n";
580 
581  $queen->check_for_dead_workers($valley, 0) unless $self->{'read_only'};
582 
583  # this section is where the beekeeper decides whether or not to stop looping
584  $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{'debug'});
585  my @job_fail_statuses = grep({$_->{'exit_status'} eq 'JOB_FAILED'} @$reasons_to_exit);
586  my @analysis_fail_statuses = grep({$_->{'exit_status'} eq 'ANALYSIS_FAILED'} @$reasons_to_exit);
587  my @no_work_statuses = grep({$_->{'exit_status'} eq 'NO_WORK'} @$reasons_to_exit);
588 
589  my $found_reason_to_exit = 0;
590 
591  if (($loop_until eq 'JOB_FAILURE') &&
592  (scalar(@job_fail_statuses)) > 0) {
593  print "Beekeeper : last loop because at least one Job failed and loop-until mode is '$loop_until'\n";
594  print "Beekeeper : details from analyses with failed Jobs:\n";
595  print join("\n", map {$_->{'message'}} @job_fail_statuses) . "\n";
596  $found_reason_to_exit = 1;
597  last BKLOOP;
598  }
599 
600  if (scalar(@analysis_fail_statuses > 0)) {
601  # at least one analysis has hit its fault tolerance
602  if (($loop_until eq 'FOREVER') ||
603  ($loop_until eq 'NO_WORK')) {
604  if (scalar(@no_work_statuses) == 0) {
605  print "Beekeeper : detected the following exit condition(s), but staying alive because loop-until mode is set to '$loop_until' :\n" .
606  join(", ", map {$_->{'message'}} @analysis_fail_statuses) . "\n";
607  }
608  } else {
609  # loop_until_mode is either job_failure or analysis_failure, and both of these exit on analysis failure
610  unless ($found_reason_to_exit) {
611  print "Beekeeper : last loop because at least one analysis failed and loop-until mode is '$loop_until'\n";
612  print "Beekeeper : details from analyses with failed Jobs:\n";
613  print join("\n", map {$_->{'message'}} @analysis_fail_statuses) . "\n";
614  $found_reason_to_exit = 1;
615  last BKLOOP;
616  }
617  }
618  }
619 
620  if ((scalar(@no_work_statuses) > 0) &&
621  ($loop_until ne 'FOREVER')) {
622  print "Beekeeper : last loop because there is no more work and loop-until mode is '$loop_until'\n"
623  unless ($found_reason_to_exit);
624  last BKLOOP;
625  }
626 
627  # end of testing for loop end conditions
628 
629  $hive_dba->get_RoleAdaptor->print_active_role_counts;
630 
631  my $workers_to_submit_by_meadow_type_rc_name = $self->{'read_only'} ? {} :
633 
634  if( keys %$workers_to_submit_by_meadow_type_rc_name ) {
635 
636  my $submit_log_subdir;
637 
638  if( $self->{'submit_log_dir'} ) {
639  $submit_log_subdir = $self->{'submit_log_dir'}."/submit_bk".$self->{'beekeeper_id'}."_iter${iteration}";
640  make_path( $submit_log_subdir );
641  }
642 
643  # create an "index" over the freshly loaded RC/RD collections:
644  my %meadow_type_rc_name2resource_param_list = ();
645  foreach my $rd ( $pipeline->collection_of('ResourceDescription')->list ) {
646  my $rc_name = $rd->resource_class->name;
647  $meadow_type_rc_name2resource_param_list{ $rd->meadow_type }{ $rc_name } = [ $rd->submission_cmd_args, $rd->worker_cmd_args ];
648  }
649 
650  foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {
651 
652  my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
653 
654  foreach my $rc_name (keys %{ $workers_to_submit_by_meadow_type_rc_name->{$meadow_type} }) {
655  my $this_meadow_rc_worker_count = $workers_to_submit_by_meadow_type_rc_name->{$meadow_type}{$rc_name};
656 
657  my $submission_message = "submitting $this_meadow_rc_worker_count workers (rc_name=$rc_name) to ".$this_meadow->signature();
658  print "\nBeekeeper : $submission_message\n";
659  # Log to DB unless mode is Forever - assume we want no DB entry when we loop forever
660  if ($loop_until ne 'FOREVER') {
661  $self->{'logmessage_adaptor'}->store_beekeeper_message($self->{'beekeeper_id'},
662  "loop iteration $iteration, $submission_message",
663  'INFO', 'ALIVE');
664  }
665 
666  my ($submission_cmd_args, $worker_cmd_args) = @{ $meadow_type_rc_name2resource_param_list{ $meadow_type }{ $rc_name } || [] };
667 
668  my $specific_worker_cmd = $this_meadow->runWorker_path
669  . $pathless_resourceless_worker_cmd
670  . (defined($worker_cmd_args) ? " $worker_cmd_args" : '')
671  . ' -preregistered';
672 
673  my $meadow_process_ids = [];
674  if ($this_meadow_rc_worker_count > 1 and !$this_meadow->config_get('CanSubmitJobArrays')) {
675  foreach my $i (1..$this_meadow_rc_worker_count) {
676  my $submitted_process_id = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, 1, $self->{'beekeeper_id'}.'_'."$iteration.$i", $rc_name, $submission_cmd_args || '', $submit_log_subdir);
677  push @$meadow_process_ids, $submitted_process_id->[0];
678  }
679  } else {
680  $meadow_process_ids = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, $this_meadow_rc_worker_count, $self->{'beekeeper_id'}.'_'.$iteration, $rc_name, $submission_cmd_args || '', $submit_log_subdir);
681  }
682 
683  print "Submitted the following process_ids to ".$this_meadow->signature.": ".join(', ', @$meadow_process_ids)."\n";
684 
685  my $resource_class = $pipeline->collection_of('ResourceClass')->find_one_by('name', $rc_name);
686  my $meadow_name = $this_meadow->cached_name;
687 
688  my @pre_allocated_workers = map {
690  'meadow_type' => $meadow_type, # non-unique key components
691  'meadow_name' => $meadow_name,
692  'meadow_user' => $meadow_user,
693  'process_id' => $_,
694 
695  'resource_class' => $resource_class, # non-key, but known at the time of pre-allocation
696  'beekeeper_id' => $self->{'beekeeper_id'},
697 
698  'status' => 'SUBMITTED',
699  )
700  } @$meadow_process_ids;
701 
702  $queen->store( \@pre_allocated_workers );
703  }
704  }
705 
706  } elsif ($self->{'read_only'}) {
707  print STDERR "\n";
708  print STDERR "*" x 70, "\n";
709  print STDERR "* beekeeper.pl is running in read-only mode, i.e. it only\n";
710  print STDERR "* prints the current status of the pipeline.\n";
711  print STDERR "*\n";
712  print STDERR "*" x 70, "\n";
713  print STDERR "\n";
714 
715  } else {
716  # Log to STDOUT and DB unless mode is Forever - assume we want things quiet when we loop forever
717  if ($loop_until ne 'FOREVER') {
718  print "\nBeekeeper : not submitting any workers this iteration\n";
719  $self->{'logmessage_adaptor'}->store_beekeeper_message($self->{'beekeeper_id'},
720  "loop iteration $iteration, 0 workers submitted",
721  'INFO', 'ALIVE');
722  }
723  }
724 
725  if( $iteration != $max_loops ) { # skip the last sleep
726  while (1) {
727  $hive_dba->dbc->disconnect_if_idle;
728  printf("Beekeeper : going to sleep for %.2f minute(s). Expect next iteration at %s\n", $self->{'sleep_minutes'}, scalar localtime(time+$self->{'sleep_minutes'}*60));
729  sleep($self->{'sleep_minutes'}*60);
730  last if $self->{'read_only'};
731  # this is a good time to check up on other beekeepers as well:
732  $self->{'beekeeper'}->adaptor->bury_other_beekeepers($self->{'beekeeper'});
733  if ($self->{'beekeeper'}->check_if_blocked()) {
734  print "Beekeeper : We have been blocked !\n".
735  "This can happen if a Job has explicitly required the Beekeeper to stop (have a look at log_message).\n".
736  "It may also happen if someone has set is_blocked=1 in the beekeeper table for beekeeper_id=".$self->{'beekeeper_id'}.".\n";
737  } else {
738  last;
739  }
740  }
741 
742  # after waking up reload Resources and Analyses to stay current.
743  unless($run_job_id) {
744  # reset all the collections so that fresher data will be used at this iteration:
745  $pipeline->invalidate_collections();
746  $pipeline->invalidate_hive_current_load();
747 
748  $list_of_analyses = $pipeline->collection_of('Analysis')->find_all_by_pattern( $analyses_pattern );
749  }
750  }
751  }
752 
753  # in this section, the beekeeper determines why it exited, sets an appropriate cause of death,
754  # and prints/logs an appropriate message
755  my @stringified_reasons_builder;
756  my $beekeeper_cause_of_death;
757  my $cause_of_death_is_error;
758  my %exit_statuses; # keep a set of unique exit statuses seen
759  if ($reasons_to_exit) {
760  foreach my $reason_to_exit (@$reasons_to_exit) {
761  $exit_statuses{$reason_to_exit->{'exit_status'}} = 1;
762  push(@stringified_reasons_builder, $reason_to_exit->{'message'});
763  }
764  }
765 
766  my $stringified_reasons = join(", ", @stringified_reasons_builder);
767 
768  if (($loop_until eq 'JOB_FAILURE') &&
769  (grep(/JOB_FAILED/, keys(%exit_statuses)))) {
770  $beekeeper_cause_of_death = 'JOB_FAILED';
771  $cause_of_death_is_error = 1;
772  }
773 
774  if (($loop_until eq 'ANALYSIS_FAILURE') &&
775  (grep(/ANALYSIS_FAILED/, keys(%exit_statuses)))) {
776  $beekeeper_cause_of_death = 'ANALYSIS_FAILED';
777  $cause_of_death_is_error = 1;
778  }
779 
780  if (!$beekeeper_cause_of_death) {
781  if (grep(/NO_WORK/, keys(%exit_statuses))) {
782  $beekeeper_cause_of_death = 'NO_WORK';
783  } else {
784  $beekeeper_cause_of_death = 'LOOP_LIMIT';
785  }
786  $cause_of_death_is_error = 0;
787  }
788 
789  $self->{'logmessage_adaptor'}->store_beekeeper_message($self->{'beekeeper_id'},
790  "stopped looping because of $stringified_reasons",
791  $cause_of_death_is_error ? 'PIPELINE_ERROR' : 'INFO',
792  $beekeeper_cause_of_death) unless $self->{'read_only'};
793 
794  if ($reasons_to_exit and $ENV{EHIVE_SLACK_WEBHOOK}) {
795  send_beekeeper_message_to_slack($ENV{EHIVE_SLACK_WEBHOOK}, $self->{'pipeline'}, $cause_of_death_is_error, 1, $stringified_reasons, $loop_until);
796  }
797 
798  $self->{'beekeeper'}->set_cause_of_death($beekeeper_cause_of_death) unless $self->{'read_only'};
799  printf("Beekeeper: dbc %d disconnect cycles\n", $hive_dba->dbc->disconnect_count);
800  return $cause_of_death_is_error;
801 }
802 
803 
804 __DATA__
805 
806 =pod
807 
808 =head1 NAME
809 
810 beekeeper.pl [options]
811 
812 =head1 DESCRIPTION
813 
814 The Beekeeper is in charge of interfacing between the eHive database a compute resource or 'compute farm'.
815 Its Job is to synchronise both, to assess the compute requirements of the pipeline
816 and to send the requested number of workers to open machines via the runWorker.pl script.
817 
818 It is also responsible for identifying workers which died
819 unexpectedly so that dead workers can be released and unfinished Jobs reclaimed.
820 
821 =head1 USAGE EXAMPLES
822 
823  # Usually run after the pipeline has been created to calculate the internal statistics necessary for eHive functioning
824  beekeeper.pl -url mysql://username:secret@hostname:port/ehive_dbname -sync
825 
826  # Do not run any additional Workers, just check for the current status of the pipeline:
827  beekeeper.pl -url mysql://username:secret@hostname:port/ehive_dbname
828 
829  # Run the pipeline in automatic mode (-loop), run all the workers locally (-meadow_type LOCAL) and allow for 3 parallel workers (-total_running_workers_max 3)
830  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -meadow_type LOCAL -total_running_workers_max 3 -loop
831 
832  # Run in automatic mode, but only restrict to running blast-related analyses with the exception of analyses 4..6
833  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -analyses_pattern 'blast%-4..6' -loop
834 
835  # Restrict the normal execution to one iteration only - can be used for testing a newly set up pipeline
836  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -run
837 
838  # Reset failed 'buggy_analysis' Jobs to 'READY' state, so that they can be run again
839  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -analyses_pattern buggy_analysis -reset_failed_jobs
840 
841  # Do a cleanup: find and bury dead workers, reclaim their Jobs
842  beekeeper.pl -url mysql://username:secret@hostname:port/long_mult_test -dead
843 
844 =head1 OPTIONS
845 
846 =head2 Connection parameters
847 
848 =over
849 
850 =item --reg_conf <path>
851 
852 Path to a Registry configuration file
853 
854 =item --reg_type <string>
855 
856 Type of the registry entry ("hive", "core", "compara", etc. - defaults to "hive")
857 
858 =item --reg_alias <string>
859 
860 Species / alias name for the eHive DBAdaptor
861 
862 =item --url <url string>
863 
864 URL defining where eHive database is located
865 
866 =item --nosqlvc
867 
868 "No SQL Version Check" - set if you want to force working with a database created by a potentially schema-incompatible API
869 
870 =back
871 
872 =head2 Configs overriding
873 
874 =over
875 
876 =item --config_file <string>
877 
878 JSON file (with absolute path) to override the default configurations (could be multiple)
879 
880 =back
881 
882 =head2 Looping control
883 
884 =over
885 
886 =item --loop
887 
888 run autonomously, loops and sleeps. Equivalent to -loop_until ANALYSIS_FAILURE
889 
890 =item --loop_until
891 
892 sets the level of event that will cause the Beekeeper to stop looping:
893 
894 =over
895 
896 =item JOB_FAILURE
897 
898 stop looping if any Job fails
899 
900 =item ANALYSIS_FAILURE
901 
902 stop looping if any Analysis has Job failures exceeding its fault tolerance
903 
904 =item NO_WORK
905 
906 ignore Job and Analysis failures, keep looping until there is no work
907 
908 =item FOREVER
909 
910 ignore failures and no work, keep looping
911 
912 =back
913 
914 =item --keep_alive
915 
916 (Deprecated) alias for -loop_until FOREVER
917 
918 =item --max_loops <num>
919 
920 perform max this # of loops in autonomous mode. The Beekeeper will stop when
921 it has performed max_loops loops, even in FOREVER mode
922 
923 =item --job_id <job_id>
924 
925 run one iteration for this job_id
926 
927 =item --run
928 
929 run one iteration of automation loop
930 
931 =item --sleep <num>
932 
933 when looping, sleep <num> minutes (default 1 min)
934 
935 =back
936 
937 =head2 Current Meadow control
938 
939 =over
940 
941 =item --meadow_type <string>
942 
943 the desired Meadow class name, such as 'LSF' or 'LOCAL'
944 
945 =item --total_running_workers_max <num>
946 
947 max # workers to be running in parallel
948 
949 =item --submit_workers_max <num>
950 
951 max # workers to create per loop iteration
952 
953 =item --submission_options <string>
954 
955 passes <string> to the Meadow submission command as <options> (formerly lsf_options)
956 
957 =item --submit_log_dir <dir>
958 
959 record submission output+error streams into files under the given directory (to see why some workers fail after submission)
960 
961 =back
962 
963 =head2 Worker control
964 
965 =over
966 
967 =item --analyses_pattern <string>
968 
969 restrict the sync operation, printing of stats or looping of the Beekeeper to the specified subset of Analyses
970 
971 =item --nocan_respecialize
972 
973 prevent workers from re-specializing into another Analysis (within resource_class) after their previous Analysis is exhausted
974 
975 =item --force
976 
977 run all workers with -force (see runWorker.pl)
978 
979 =item --killworker <worker_id>
980 
981 kill Worker by worker_id
982 
983 =item --life_span <num>
984 
985 number of minutes each Worker is allowed to run
986 
987 =item --job_limit <num>
988 
989 Number of Jobs to run before Worker can die naturally
990 
991 =item --retry_throwing_jobs
992 
993 if a Job dies *knowingly* (e.g. by encountering a die statement in the Runnable), should we retry it by default?
994 
995 =item --hive_log_dir <path>
996 
997 directory where stdout/stderr of the eHive is redirected
998 
999 =item --worker_delay_startup_seconds <number>
1000 
1001 number of seconds each Worker has to wait before first talking to the database (0 by default, useful for debugging)
1002 
1003 =item --worker_crash_on_startup_prob <float>
1004 
1005 probability of each Worker failing at startup (0 by default, useful for debugging)
1006 
1007 =item --debug <debug_level>
1008 
1009 set debug level of the workers
1010 
1011 =back
1012 
1013 =head2 Other commands/options
1014 
1015 =over
1016 
1017 =item --help
1018 
1019 print this help
1020 
1021 =item --versions
1022 
1023 report both eHive code version and eHive database schema version
1024 
1025 =item --dead
1026 
1027 detect all unaccounted dead workers and reset their Jobs for resubmission
1028 
1029 =item --sync
1030 
1031 re-synchronise the ehive
1032 
1033 =item --unkwn
1034 
1035 detect all workers in UNKWN state and reset their Jobs for resubmission (careful, they *may* reincarnate!)
1036 
1037 =item --big_red_button
1038 
1039 shut everything down: block all beekeepers connected to the pipeline and terminate workers
1040 
1041 =item --alldead
1042 
1043 tell the database all workers are dead (no checks are performed in this mode, so be very careful!)
1044 
1045 =item --balance_semaphores
1046 
1047 set all Semaphore counters to the numbers of unDONE fan Jobs (emergency use only)
1048 
1049 =item --worker_stats
1050 
1051 show status of each running Worker
1052 
1053 =item --failed_jobs
1054 
1055 show all failed Jobs
1056 
1057 =item --job_output <job_id>
1058 
1059 print details for one Job
1060 
1061 =item --reset_job_id <num>
1062 
1063 reset a Job back to READY so it can be rerun
1064 
1065 =item --reset_failed_jobs
1066 
1067 reset FAILED Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1068 
1069 =item --reset_done_jobs
1070 
1071 reset DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1072 
1073 =item --reset_all_jobs
1074 
1075 reset FAILED, DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1076 
1077 =item --forgive_failed_jobs
1078 
1079 mark FAILED Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1080 
1081 =item --discard_ready_jobs
1082 
1083 mark READY Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1084 
1085 =item --unblock_semaphored_jobs
1086 
1087 set SEMAPHORED Jobs of analyses matching -analyses_pattern to READY so they can start
1088 
1089 =back
1090 
1091 =head1 LICENSE
1092 
1093  See the NOTICE file distributed with this work for additional information
1094  regarding copyright ownership.
1095 
1096  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
1097  You may obtain a copy of the License at
1098 
1099  http://www.apache.org/licenses/LICENSE-2.0
1100 
1101  Unless required by applicable law or agreed to in writing, software distributed under the License
1102  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1103  See the License for the specific language governing permissions and limitations under the License.
1104 
1105 =head1 CONTACT
1106 
1107 Please subscribe to the eHive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss eHive-related questions or to be notified of our updates
1108 
1109 =cut
1110 
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::Utils::URL::hide_url_password
public Void hide_url_password()
Bio::EnsEMBL::Hive::Utils::URL
Definition: URL.pm:11
map
public map()
Bio::EnsEMBL::Hive::Beekeeper::new_from_Valley
public Bio::EnsEMBL::Hive::Beekeeper new_from_Valley()
log_and_die
public log_and_die()
Bio::EnsEMBL::Hive::Utils::Config::new
public new()
run_autonomously
public run_autonomously()
Bio::EnsEMBL::Hive::Valley::new
public new()
Bio::EnsEMBL::Hive::Utils::Config
Definition: Config.pm:12
Bio::EnsEMBL::Hive::HivePipeline::new
public new()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::Beekeeper
Definition: Beekeeper.pm:13
Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary
public schedule_workers_resync_if_necessary()
Bio::EnsEMBL::Hive::Storable::new
public Bio::EnsEMBL::Hive::Storable new()
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
debug
public debug()
Bio::EnsEMBL::Hive::HivePipeline
Definition: HivePipeline.pm:13
main
public main()
register_beekeeper
public register_beekeeper()
BEGIN
public BEGIN()
run
public run()
generate_worker_cmd
public generate_worker_cmd()
Bio::EnsEMBL::Hive::Valley
Definition: Valley.pm:16
Bio::EnsEMBL::Hive::Scheduler
Definition: Scheduler.pm:15
Bio::EnsEMBL::Hive::DBSQL::LogMessageAdaptor
Definition: LogMessageAdaptor.pm:22
Bio::EnsEMBL::Hive::Utils::Slack
Definition: Slack.pm:10
big_red_button
public big_red_button()