6 # Finding out own path in order to reference own components (including own modules):
10 $ENV{
'EHIVE_ROOT_DIR'} ||= File::Basename::dirname( File::Basename::dirname( Cwd::realpath($0) ) );
11 unshift @INC, $ENV{
'EHIVE_ROOT_DIR'}.
'/modules';
14 use File::Path
'make_path';
15 use Getopt::Long qw(:config no_auto_abbrev);
36 $|=1; # make STDOUT unbuffered (STDERR is unbuffered anyway)
38 # ok this is a hack, but I'm going to pretend I've got an object here
39 # by creating a hash ref and passing it around like an object
40 # this is to avoid using global variables in functions, and to consolidate
41 # the globals into a nice '$self' package
45 my $report_versions = 0;
49 my $show_failed_jobs = 0;
50 my $default_meadow_type = undef;
51 my $submit_workers_max = undef;
52 my $total_running_workers_max = undef;
53 my $submission_options = undef;
55 my $run_job_id = undef;
57 my $check_for_dead = 0;
58 my $bury_unkwn_workers = 0;
60 my $balance_semaphores = 0;
61 my $job_id_for_output = 0;
62 my $show_worker_stats = 0;
63 my $kill_worker_id = 0;
64 my $big_red_button = 0;
65 my $keep_alive = 0; # DEPRECATED
67 my $reset_all_jobs_for_analysis = 0; # DEPRECATED
68 my $reset_failed_jobs_for_analysis = 0; # DEPRECATED
69 my $reset_all_jobs = 0; # Mark DONE, PASSED_ON and FAILED jobs to READY
70 my $reset_failed_jobs = 0; # Mark FAILED jobs to READY
71 my $reset_done_jobs = 0; # Mark DONE and PASSED_ON jobs to READY
72 my $unblock_semaphored_jobs = 0; # Mark SEMAPHORED jobs to READY
73 my $forgive_failed_jobs = 0; # Mark FAILED jobs to DONE
74 my $discard_ready_jobs = 0; # Mark READY jobs to DONE
76 $self->{
'url'} = undef;
77 $self->{
'reg_conf'} = undef;
78 $self->{
'reg_type'} = undef;
79 $self->{
'reg_alias'} = undef;
80 $self->{
'nosqlvc'} = undef;
82 $self->{
'config_files'} = [];
84 $self->{
'sleep_minutes'} = 1;
85 $self->{
'max_loops'} = 0;
86 $self->{
'retry_throwing_jobs'} = undef;
87 $self->{
'loop_until'} = undef;
88 $self->{
'can_respecialize'} = 1;
89 $self->{
'hive_log_dir'} = undef;
90 $self->{
'submit_log_dir'} = undef;
91 $self->{
'worker_delay_startup_seconds'} = undef;
92 $self->{
'worker_crash_on_startup_prob'} = undef;
94 # store all the options passed on the command line for registration
95 # as GetOptions modifies @ARGV
96 my @original_argv = @ARGV;
99 # connection parameters
100 'url=s' => \$self->{
'url'},
101 'reg_conf|regfile|reg_file=s' => \$self->{
'reg_conf'},
102 'reg_type=s' => \$self->{
'reg_type'},
103 'reg_alias|regname|reg_name=s' => \$self->{
'reg_alias'},
104 'nosqlvc' => \$self->{
'nosqlvc'}, #
using "nosqlvc" instead of
"sqlvc!" for compatibility with modules where it is a propagated option
107 'config_file=s@' => $self->{
'config_files'},
112 'max_loops=i' => \$self->{
'max_loops'},
113 'loop_until=s' => \$self->{
'loop_until'},
114 'keep_alive' => \$keep_alive,
115 'job_id|run_job_id=i'=> \$run_job_id,
116 'force!' => \$self->{
'force'},
117 'sleep=f' => \$self->{
'sleep_minutes'},
121 'meadow_type=s' => \$default_meadow_type,
122 'total_running_workers_max=i' => \$total_running_workers_max,
123 'submit_workers_max=i' => \$submit_workers_max,
124 'submission_options=s' => \$submission_options,
127 'job_limit=i' => \$self->{
'job_limit'},
128 'life_span|lifespan=i' => \$self->{
'life_span'},
129 'logic_name=s' => \$self->{
'logic_name'},
130 'analyses_pattern=s' => \$self->{
'analyses_pattern'},
131 'hive_log_dir|hive_output_dir=s' => \$self->{
'hive_log_dir'},
132 'retry_throwing_jobs!' => \$self->{
'retry_throwing_jobs'},
133 'can_respecialize|can_respecialise!' => \$self->{
'can_respecialize'},
134 'debug=i' => \$self->{
'debug'},
135 'submit_log_dir=s' => \$self->{
'submit_log_dir'},
136 'worker_delay_startup_seconds=i' => \$self->{
'worker_delay_startup_seconds'},
137 'worker_crash_on_startup_prob=f' => \$self->{
'worker_crash_on_startup_prob'},
139 # other commands/options
141 'v|version|versions!' => \$report_versions,
143 'dead!' => \$check_for_dead,
144 'unkwn!' => \$bury_unkwn_workers,
145 'killworker=i' => \$kill_worker_id,
146 'big_red_button' => \$big_red_button,
147 'alldead!' => \$all_dead,
148 'balance_semaphores'=> \$balance_semaphores,
149 'worker_stats' => \$show_worker_stats,
150 'failed_jobs' => \$show_failed_jobs,
151 'reset_job_id=i' => \$reset_job_id,
152 'reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
153 'reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
154 'reset_failed_jobs' => \$reset_failed_jobs,
155 'reset_all_jobs' => \$reset_all_jobs,
156 'reset_done_jobs' => \$reset_done_jobs,
157 'discard_ready_jobs' => \$discard_ready_jobs,
158 'forgive_failed_jobs' => \$forgive_failed_jobs,
159 'unblock_semaphored_jobs' => \$unblock_semaphored_jobs,
160 'job_output=i' => \$job_id_for_output,
161 ) or die
"Error in command line arguments\n";
164 die
"ERROR: There are invalid arguments on the command-line: ". join(
" ", @ARGV).
"\n";
168 pod2usage({-exitvalue => 0, -verbose => 2});
171 if($report_versions) {
178 # if -keep_alive passed, ensure looping is on and loop_until is forever
180 $self->{
'loop_until'} =
'FOREVER';
184 # if user has specified -loop_until, ensure looping is turned on
185 if ($self->{
'loop_until'}) {
189 # if loop_until hasn't been set by the user, or defaulted by a flag,
190 # set it to ANALYSIS_FAILURE
191 unless ($self->{
'loop_until'}) {
192 $self->{
'loop_until'} =
'ANALYSIS_FAILURE';
195 if($run or $run_job_id) {
196 $self->{
'max_loops'} = 1;
198 unless($self->{
'max_loops'}) {
199 $self->{
'max_loops'} = -1; # unlimited
203 if($self->{
'url'} or $self->{
'reg_alias'}) {
206 -url => $self->{
'url'},
207 -reg_conf => $self->{
'reg_conf'},
208 -reg_type => $self->{
'reg_type'},
209 -reg_alias => $self->{
'reg_alias'},
210 -no_sql_schema_version_check => $self->{
'nosqlvc'},
213 $self->{
'dba'} = $self->{
'pipeline'}->hive_dba();
216 die
"\nERROR: Connection parameters (url or reg_conf+reg_alias) need to be specified\n";
219 $self->{
'options'} = join(
" ", @original_argv);
221 # make -loop_until case insensitive
222 $self->{
'loop_until'} = uc($self->{
'loop_until'});
224 my @allowed_loop_until_values = qw(ANALYSIS_FAILURE FOREVER JOB_FAILURE NO_WORK);
225 unless (grep {$_ eq $self->{
'loop_until'}} @allowed_loop_until_values) {
226 die sprintf(
'"%s" is not a recognized value for -loop_until. Use one of %s', $self->{
'loop_until'}, join(
'/', @allowed_loop_until_values));
229 my $pipeline_name = $self->{
'pipeline'}->hive_pipeline_name;
232 print
"Pipeline name: $pipeline_name\n";
234 print STDERR
"+---------------------------------------------------------------------+\n";
235 print STDERR
"! !\n";
236 print STDERR
"! WARNING: !\n";
237 print STDERR
"! !\n";
238 print STDERR
"! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
239 print STDERR
"! This may seriously impair your beekeeping experience unless you are !\n";
240 print STDERR
"! the only farm user. The name should be set in your PipeConfig file, !\n";
241 print STDERR
"! or if you are running an old pipeline you can just set it by hand !\n";
242 print STDERR
"! in the 'meta' table. !\n";
243 print STDERR
"! !\n";
244 print STDERR
"+---------------------------------------------------------------------+\n";
247 unless ($self->{
'dba'}->dbc->has_write_access) {
248 my $dbc = $self->{
'dba'}->dbc;
250 print STDERR
"*" x 70,
"\n";
251 print STDERR sprintf(
"* It appears that %s doesn't have INSERT/UPDATE/DELETE privileges\n", $dbc->username);
252 print STDERR sprintf(
"* on this database (%s). Please check the credentials\n", $dbc->dbname);
254 print STDERR
"*" x 70,
"\n";
258 undef $reset_all_jobs;
259 undef $reset_failed_jobs;
260 undef $reset_done_jobs;
261 undef $unblock_semaphored_jobs;
262 undef $forgive_failed_jobs;
263 undef $discard_ready_jobs;
264 undef $kill_worker_id;
266 $self->{
'read_only'} = 1;
270 $submit_workers_max = 1;
273 $default_meadow_type =
'LOCAL' if($local);
275 $self->{
'available_meadow_list'} = $valley->get_available_meadow_list();
277 $valley->config_set(
'SubmitWorkersMax', $submit_workers_max)
if(defined $submit_workers_max);
279 my $default_meadow = $valley->get_default_meadow();
280 print
"Default meadow: ".$default_meadow->signature.
"\n\n";
282 $default_meadow->config_set(
'TotalRunningWorkersMax', $total_running_workers_max)
if(defined $total_running_workers_max);
283 $default_meadow->config_set(
'SubmissionOptions', $submission_options)
if(defined $submission_options);
285 my $queen = $self->{
'dba'}->get_Queen;
287 if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
289 if($job_id_for_output) {
290 printf(
"===== Job output\n");
291 my $job = $self->{
'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
292 print $job->toString.
"\n";
295 if($reset_all_jobs_for_analysis) {
296 die
"Deprecated option -reset_all_jobs_for_analysis. Please use -reset_all_jobs in combination with -analyses_pattern <pattern>";
298 if($reset_failed_jobs_for_analysis) {
299 die
"Deprecated option -reset_failed_jobs_for_analysis. Please use -reset_failed_jobs in combination with -analyses_pattern <pattern>";
302 if( $self->{
'logic_name'} ) { # FIXME:
for now, logic_name will
override analyses_pattern quietly
303 warn
"-logic_name is now deprecated, please use -analyses_pattern that extends the functionality of -logic_name .\n";
304 $self->{
'analyses_pattern'} = $self->{
'logic_name'};
307 # May die if running within a non-LOCAL meadow
308 unless ($self->{
'read_only'}) {
311 $self->{
'logmessage_adaptor'} = $self->{
'dba'}->get_LogMessageAdaptor();
313 # Check other beekeepers in our meadow to see if they are still alive
314 $self->{
'beekeeper'}->adaptor->bury_other_beekeepers($self->{
'beekeeper'}) unless $self->{
'read_only'};
316 if ($kill_worker_id) {
318 eval {$kill_worker = $queen->fetch_by_dbID($kill_worker_id) or die};
320 log_and_die($self,
"Could not fetch Worker with dbID='$kill_worker_id' to kill");
323 unless( $kill_worker->cause_of_death() ) {
324 if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
326 if( $meadow->check_worker_is_alive_and_mine($kill_worker) ) {
327 printf(
"Killing worker: %10d %35s %15s : ",
328 $kill_worker->dbID, $kill_worker->meadow_host, $kill_worker->process_id);
330 $meadow->kill_worker($kill_worker);
331 $kill_worker->cause_of_death(
'KILLED_BY_USER');
332 $queen->register_worker_death($kill_worker);
333 # what about clean-up? Should we do it here or not?
335 log_and_die($self,
"According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
338 log_and_die($self,
"Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill");
341 log_and_die($self,
"According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
345 if ( $big_red_button ) {
351 eval {$run_job = $self->{
'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id ) or die};
353 log_and_die($self,
"Could not fetch Job with dbID=$run_job_id.\n");
357 my $list_of_analyses = $run_job
358 ? [ $run_job->analysis ]
359 : $self->{
'pipeline'}->collection_of(
'Analysis')->find_all_by_pattern( $self->{
'analyses_pattern'} );
361 if( $self->{
'analyses_pattern'} ) {
362 if( @$list_of_analyses ) {
363 print
"Beekeeper : the following Analyses matched your -analyses_pattern '".$self->{
'analyses_pattern'}.
"' : "
364 . join(
', ',
map { $_->logic_name.
'('.$_->dbID.
')' } sort {$a->dbID <=> $b->dbID} @$list_of_analyses)
365 .
"\nBeekeeper : ", scalar($self->{
'pipeline'}->collection_of(
'Analysis')->list())-scalar(@$list_of_analyses),
" Analyses are not shown\n\n";
367 log_and_die($self,
"Beekeeper : the -analyses_pattern '".$self->{
'analyses_pattern'}.
"' did not match any Analyses.\n");
371 my $has_task = ($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs || $unblock_semaphored_jobs || $forgive_failed_jobs || $discard_ready_jobs);
372 if($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs) {
373 if (($reset_all_jobs || $reset_done_jobs) and not $self->{
'analyses_pattern'}) {
374 log_and_die($self,
"Beekeeper : do you really want to reset *all* the Jobs ? If yes, add \"-analyses_pattern '%'\" to the command line\n");
376 my $statuses_to_reset = $reset_failed_jobs ? [
'FAILED' ] : ($reset_done_jobs ? [
'DONE',
'PASSED_ON' ] : [
'DONE',
'FAILED',
'PASSED_ON' ]);
377 $self->{
'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id( $list_of_analyses, $statuses_to_reset );
380 if ($unblock_semaphored_jobs) {
381 $self->{
'dba'}->get_AnalysisJobAdaptor->unblock_jobs_for_analysis_id( $list_of_analyses );
384 if ($discard_ready_jobs) {
385 $self->{
'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses,
'READY' );
388 if ($forgive_failed_jobs) {
389 $self->{
'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses,
'FAILED' );
392 $queen->synchronize_hive( $list_of_analyses )
if $has_task;
394 if($all_dead) { $queen->register_all_workers_dead(); }
395 if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
396 if($bury_unkwn_workers) { $queen->check_for_dead_workers($valley, 1, 1); }
397 if($balance_semaphores) { $self->{
'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
400 if ($self->{
'max_loops'}) { # positive $max_loop means limited, negative means unlimited
402 $has_error =
run_autonomously($self, $self->{
'pipeline'}, $self->{
'max_loops'}, $self->{
'loop_until'}, $valley, $list_of_analyses, $self->{
'analyses_pattern'}, $run_job_id);
405 # the output of several methods will look differently depending on $analysis being [un]defined
408 $queen->synchronize_hive( $list_of_analyses );
410 my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{
'debug'} );
412 if($show_worker_stats) {
413 print
"\n===== List of live Workers according to the Queen: ======\n";
414 foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
415 print $worker->toString(1).
"\n";
418 $self->{
'dba'}->get_RoleAdaptor->print_active_role_counts;
422 if($show_failed_jobs) {
423 print(
"===== failed Jobs\n");
424 my $failed_job_list = $self->{
'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status( $list_of_analyses ,
'FAILED');
426 foreach my $job (@{$failed_job_list}) {
427 print $job->toString.
"\n";
430 $self->{
'beekeeper'}->set_cause_of_death(
'LOOP_LIMIT') unless $self->{
'read_only'};
436 #######################
440 #######################
443 my ($self, $message) = @_;
445 my $beekeeper = $self->{
'beekeeper'};
447 $self->{
'logmessage_adaptor'}->store_beekeeper_message($beekeeper->dbID, $message,
'PIPELINE_ERROR',
'TASK_FAILED');
448 $beekeeper->set_cause_of_death(
'TASK_FAILED');
454 my ($self, $analyses_pattern, $run_job_id) = @_;
456 my $worker_cmd =
'runWorker.pl';
458 foreach my $worker_option (
'url',
'reg_conf',
'reg_type',
'reg_alias',
'job_limit',
'life_span',
459 'worker_delay_startup_seconds',
'worker_crash_on_startup_prob',
'hive_log_dir',
'debug') {
460 if(defined(my $value = $self->{$worker_option})) {
461 $worker_cmd .=
" -${worker_option} $value";
465 foreach my $worker_flag (
'retry_throwing_jobs',
'can_respecialize',
'force',
'nosqlvc') {
466 if(defined(my $value = $self->{$worker_flag})) {
468 $worker_cmd .=
" -no${worker_flag}";
470 $worker_cmd .=
" -${worker_flag}";
475 # This option can have multiple values
476 $worker_cmd .=
" -config_file $_" for @{$self->{
'config_files'}};
480 $worker_cmd .=
" -job_id $run_job_id";
481 } elsif ($analyses_pattern) {
482 $worker_cmd .=
" -analyses_pattern '".$analyses_pattern.
"'";
489 my ($valley, $self) = @_;
491 my $loop_limit = undef;
492 if ($self->{
'max_loops'} > -1) {
493 $loop_limit = $self->{
'max_loops'};
496 my $meadow_signatures = join(
",",
497 map {$_->signature} @{$self->{
'available_meadow_list'}});
499 # The new instance is partly initalised with the output of Valley::whereami()
501 'sleep_minutes' => $self->{
'sleep_minutes'},
502 'analyses_pattern' => $self->{
'analyses_pattern'},
503 'loop_limit' => $loop_limit,
504 'loop_until' => $self->{
'loop_until'},
505 'options' => $self->{
'options'},
506 'meadow_signatures' => $meadow_signatures,
509 $self->{
'dba'}->get_BeekeeperAdaptor->store($beekeeper);
510 unless ($self->{
'beekeeper_id'} = $beekeeper->dbID) {
511 die
"There was a problem registering this Beekeeper with the eHive database.";
518 my ( $self, $valley ) = @_;
520 my $bk_a = $self->{dba}->get_BeekeeperAdaptor();
521 my $blocked_beekeepers;
523 # Save a list of IDs of beekeepers which were blocked earlier so
524 # that we can not mention them while reporting the current blocking.
525 $blocked_beekeepers = $bk_a->fetch_all(
'is_blocked = 1' );
526 my %previously_blocked_ids;
527 while ( my $blocked_bk = shift @{ $blocked_beekeepers } ) {
528 $previously_blocked_ids{ $blocked_bk->dbID() } = 1;
531 # Begin the shutdown by blocking all registered beekeepers so that
532 # none of them start spawning new workers just as this one tries to
534 $bk_a->block_all_alive_beekeepers();
536 # Report which beekeepers, self excluded, we have just blocked
537 $blocked_beekeepers = $bk_a->fetch_all(
'is_blocked = 1' );
538 my $my_dbid = $self->{
'beekeeper'}->dbID();
539 my @newly_blocked = grep {
540 ( ! exists $previously_blocked_ids{ $_->dbID() } )
541 && ( $_->dbID() != $my_dbid )
542 } @{ $blocked_beekeepers };
543 while ( my $blocked_bk = shift @newly_blocked ) {
544 print
'Blocked beekeeper ' . $blocked_bk->dbID() .
': '
545 . $blocked_bk->toString() .
"\n";
548 # Next, kill all workers which are still alive.
549 # FIXME: double-check correct job status:
550 # - running ones should be marked as 'failed'
551 # - claimed but unstarted ones should get back to 'unclaimed'
552 my $queen = $self->{
'dba'}->get_Queen();
553 $queen->kill_all_workers( $valley );
560 my ($self, $pipeline, $max_loops, $loop_until, $valley, $list_of_analyses, $analyses_pattern, $run_job_id) = @_;
562 my $hive_dba = $pipeline->hive_dba;
563 my $queen = $hive_dba->get_Queen;
564 my $meadow_user = $self->{
'beekeeper'} && $self->{
'beekeeper'}->meadow_user;
566 my $pathless_resourceless_worker_cmd =
generate_worker_cmd($self, $analyses_pattern, $run_job_id);
571 BKLOOP:
while( ($iteration++ != $max_loops) or ($loop_until eq
'FOREVER') ) { # NB: the order of conditions is important!
573 print(
"\nBeekeeper : loop #$iteration ======================================================\n");
575 my $pipeline_name = $pipeline->hive_pipeline_name;
576 if ($pipeline_name) {
577 print
"\nPipeline name: $pipeline_name";
579 print
"\nPipeline URL: ".$self->{
'url'}.
"\n\n";
581 $queen->check_for_dead_workers($valley, 0) unless $self->{
'read_only'};
583 # this section is where the beekeeper decides whether or not to stop looping
584 $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{
'debug'});
585 my @job_fail_statuses = grep({$_->{
'exit_status'} eq
'JOB_FAILED'} @$reasons_to_exit);
586 my @analysis_fail_statuses = grep({$_->{
'exit_status'} eq
'ANALYSIS_FAILED'} @$reasons_to_exit);
587 my @no_work_statuses = grep({$_->{
'exit_status'} eq
'NO_WORK'} @$reasons_to_exit);
589 my $found_reason_to_exit = 0;
591 if (($loop_until eq
'JOB_FAILURE') &&
592 (scalar(@job_fail_statuses)) > 0) {
593 print
"Beekeeper : last loop because at least one Job failed and loop-until mode is '$loop_until'\n";
594 print
"Beekeeper : details from analyses with failed Jobs:\n";
595 print join(
"\n",
map {$_->{
'message'}} @job_fail_statuses) .
"\n";
596 $found_reason_to_exit = 1;
600 if (scalar(@analysis_fail_statuses > 0)) {
601 # at least one analysis has hit its fault tolerance
602 if (($loop_until eq
'FOREVER') ||
603 ($loop_until eq
'NO_WORK')) {
604 if (scalar(@no_work_statuses) == 0) {
605 print
"Beekeeper : detected the following exit condition(s), but staying alive because loop-until mode is set to '$loop_until' :\n" .
606 join(
", ",
map {$_->{
'message'}} @analysis_fail_statuses) .
"\n";
609 # loop_until_mode is either job_failure or analysis_failure, and both of these exit on analysis failure
610 unless ($found_reason_to_exit) {
611 print
"Beekeeper : last loop because at least one analysis failed and loop-until mode is '$loop_until'\n";
612 print
"Beekeeper : details from analyses with failed Jobs:\n";
613 print join(
"\n",
map {$_->{
'message'}} @analysis_fail_statuses) .
"\n";
614 $found_reason_to_exit = 1;
620 if ((scalar(@no_work_statuses) > 0) &&
621 ($loop_until ne
'FOREVER')) {
622 print
"Beekeeper : last loop because there is no more work and loop-until mode is '$loop_until'\n"
623 unless ($found_reason_to_exit);
627 # end of testing for loop end conditions
629 $hive_dba->get_RoleAdaptor->print_active_role_counts;
631 my $workers_to_submit_by_meadow_type_rc_name = $self->{
'read_only'} ? {} :
634 if( keys %$workers_to_submit_by_meadow_type_rc_name ) {
636 my $submit_log_subdir;
638 if( $self->{
'submit_log_dir'} ) {
639 $submit_log_subdir = $self->{
'submit_log_dir'}.
"/submit_bk".$self->{
'beekeeper_id'}.
"_iter${iteration}";
640 make_path( $submit_log_subdir );
643 # create an "index" over the freshly loaded RC/RD collections:
644 my %meadow_type_rc_name2resource_param_list = ();
645 foreach my $rd ( $pipeline->collection_of(
'ResourceDescription')->list ) {
646 my $rc_name = $rd->resource_class->name;
647 $meadow_type_rc_name2resource_param_list{ $rd->meadow_type }{ $rc_name } = [ $rd->submission_cmd_args, $rd->worker_cmd_args ];
650 foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {
652 my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
654 foreach my $rc_name (keys %{ $workers_to_submit_by_meadow_type_rc_name->{$meadow_type} }) {
655 my $this_meadow_rc_worker_count = $workers_to_submit_by_meadow_type_rc_name->{$meadow_type}{$rc_name};
657 my $submission_message =
"submitting $this_meadow_rc_worker_count workers (rc_name=$rc_name) to ".$this_meadow->signature();
658 print
"\nBeekeeper : $submission_message\n";
659 # Log to DB unless mode is Forever - assume we want no DB entry when we loop forever
660 if ($loop_until ne
'FOREVER') {
661 $self->{
'logmessage_adaptor'}->store_beekeeper_message($self->{
'beekeeper_id'},
662 "loop iteration $iteration, $submission_message",
666 my ($submission_cmd_args, $worker_cmd_args) = @{ $meadow_type_rc_name2resource_param_list{ $meadow_type }{ $rc_name } || [] };
668 my $specific_worker_cmd = $this_meadow->runWorker_path
669 . $pathless_resourceless_worker_cmd
670 . (defined($worker_cmd_args) ?
" $worker_cmd_args" :
'')
673 my $meadow_process_ids = [];
674 if ($this_meadow_rc_worker_count > 1 and !$this_meadow->config_get(
'CanSubmitJobArrays')) {
675 foreach my $i (1..$this_meadow_rc_worker_count) {
676 my $submitted_process_id = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, 1, $self->{
'beekeeper_id'}.
'_'.
"$iteration.$i", $rc_name, $submission_cmd_args ||
'', $submit_log_subdir);
677 push @$meadow_process_ids, $submitted_process_id->[0];
680 $meadow_process_ids = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, $this_meadow_rc_worker_count, $self->{
'beekeeper_id'}.
'_'.$iteration, $rc_name, $submission_cmd_args ||
'', $submit_log_subdir);
683 print
"Submitted the following process_ids to ".$this_meadow->signature.
": ".join(
', ', @$meadow_process_ids).
"\n";
685 my $resource_class = $pipeline->collection_of(
'ResourceClass')->find_one_by(
'name', $rc_name);
686 my $meadow_name = $this_meadow->cached_name;
688 my @pre_allocated_workers =
map {
690 'meadow_type' => $meadow_type, # non-unique key components
691 'meadow_name' => $meadow_name,
692 'meadow_user' => $meadow_user,
695 'resource_class' => $resource_class, # non-key, but known at the time of pre-allocation
696 'beekeeper_id' => $self->{
'beekeeper_id'},
698 'status' =>
'SUBMITTED',
700 } @$meadow_process_ids;
702 $queen->store( \@pre_allocated_workers );
706 } elsif ($self->{
'read_only'}) {
708 print STDERR
"*" x 70,
"\n";
709 print STDERR
"* beekeeper.pl is running in read-only mode, i.e. it only\n";
710 print STDERR
"* prints the current status of the pipeline.\n";
712 print STDERR
"*" x 70,
"\n";
716 # Log to STDOUT and DB unless mode is Forever - assume we want things quiet when we loop forever
717 if ($loop_until ne
'FOREVER') {
718 print
"\nBeekeeper : not submitting any workers this iteration\n";
719 $self->{
'logmessage_adaptor'}->store_beekeeper_message($self->{
'beekeeper_id'},
720 "loop iteration $iteration, 0 workers submitted",
725 if( $iteration != $max_loops ) { # skip the last sleep
727 $hive_dba->dbc->disconnect_if_idle;
728 printf(
"Beekeeper : going to sleep for %.2f minute(s). Expect next iteration at %s\n", $self->{
'sleep_minutes'}, scalar localtime(time+$self->{
'sleep_minutes'}*60));
729 sleep($self->{
'sleep_minutes'}*60);
730 last
if $self->{
'read_only'};
731 # this is a good time to check up on other beekeepers as well:
732 $self->{
'beekeeper'}->adaptor->bury_other_beekeepers($self->{
'beekeeper'});
733 if ($self->{
'beekeeper'}->check_if_blocked()) {
734 print
"Beekeeper : We have been blocked !\n".
735 "This can happen if a Job has explicitly required the Beekeeper to stop (have a look at log_message).\n".
736 "It may also happen if someone has set is_blocked=1 in the beekeeper table for beekeeper_id=".$self->{
'beekeeper_id'}.
".\n";
742 # after waking up reload Resources and Analyses to stay current.
743 unless($run_job_id) {
744 # reset all the collections so that fresher data will be used at this iteration:
745 $pipeline->invalidate_collections();
746 $pipeline->invalidate_hive_current_load();
748 $list_of_analyses = $pipeline->collection_of(
'Analysis')->find_all_by_pattern( $analyses_pattern );
753 # in this section, the beekeeper determines why it exited, sets an appropriate cause of death,
754 # and prints/logs an appropriate message
755 my @stringified_reasons_builder;
756 my $beekeeper_cause_of_death;
757 my $cause_of_death_is_error;
758 my %exit_statuses; # keep a set of unique exit statuses seen
759 if ($reasons_to_exit) {
760 foreach my $reason_to_exit (@$reasons_to_exit) {
761 $exit_statuses{$reason_to_exit->{
'exit_status'}} = 1;
762 push(@stringified_reasons_builder, $reason_to_exit->{
'message'});
766 my $stringified_reasons = join(
", ", @stringified_reasons_builder);
768 if (($loop_until eq
'JOB_FAILURE') &&
769 (grep(/JOB_FAILED/, keys(%exit_statuses)))) {
770 $beekeeper_cause_of_death =
'JOB_FAILED';
771 $cause_of_death_is_error = 1;
774 if (($loop_until eq
'ANALYSIS_FAILURE') &&
775 (grep(/ANALYSIS_FAILED/, keys(%exit_statuses)))) {
776 $beekeeper_cause_of_death =
'ANALYSIS_FAILED';
777 $cause_of_death_is_error = 1;
780 if (!$beekeeper_cause_of_death) {
781 if (grep(/NO_WORK/, keys(%exit_statuses))) {
782 $beekeeper_cause_of_death =
'NO_WORK';
784 $beekeeper_cause_of_death =
'LOOP_LIMIT';
786 $cause_of_death_is_error = 0;
789 $self->{
'logmessage_adaptor'}->store_beekeeper_message($self->{
'beekeeper_id'},
790 "stopped looping because of $stringified_reasons",
791 $cause_of_death_is_error ?
'PIPELINE_ERROR' :
'INFO',
792 $beekeeper_cause_of_death) unless $self->{
'read_only'};
794 if ($reasons_to_exit and $ENV{EHIVE_SLACK_WEBHOOK}) {
795 send_beekeeper_message_to_slack($ENV{EHIVE_SLACK_WEBHOOK}, $self->{
'pipeline'}, $cause_of_death_is_error, 1, $stringified_reasons, $loop_until);
798 $self->{
'beekeeper'}->set_cause_of_death($beekeeper_cause_of_death) unless $self->{
'read_only'};
799 printf(
"Beekeeper: dbc %d disconnect cycles\n", $hive_dba->dbc->disconnect_count);
800 return $cause_of_death_is_error;
810 beekeeper.pl [options]
814 The Beekeeper is in charge of interfacing between the eHive database a compute resource or
'compute farm'.
815 Its Job is to synchronise both, to assess the compute requirements of the pipeline
816 and to send the requested number of workers to open machines via the runWorker.pl script.
818 It is also responsible
for identifying workers which died
819 unexpectedly so that dead workers can be released and unfinished Jobs reclaimed.
821 =head1 USAGE EXAMPLES
823 # Usually run after the pipeline has been created to calculate the internal statistics necessary for eHive functioning
824 beekeeper.pl -url mysql:
826 # Do not run any additional Workers, just check for the current status of the pipeline:
827 beekeeper.pl -url mysql:
829 # Run the pipeline in automatic mode (-loop), run all the workers locally (-meadow_type LOCAL) and allow for 3 parallel workers (-total_running_workers_max 3)
830 beekeeper.pl -url mysql:
832 # Run in automatic mode, but only restrict to running blast-related analyses with the exception of analyses 4..6
833 beekeeper.pl -url mysql:
835 # Restrict the normal execution to one iteration only - can be used for testing a newly set up pipeline
836 beekeeper.pl -url mysql:
838 # Reset failed 'buggy_analysis' Jobs to 'READY' state, so that they can be run again
839 beekeeper.pl -url mysql:
841 # Do a cleanup: find and bury dead workers, reclaim their Jobs
842 beekeeper.pl -url mysql:
846 =head2 Connection parameters
850 =item --reg_conf <path>
852 Path to a Registry configuration file
854 =item --reg_type <string>
856 Type of the registry entry (
"hive",
"core",
"compara", etc. - defaults to
"hive")
858 =item --reg_alias <string>
860 Species / alias name
for the eHive DBAdaptor
862 =item --url <url string>
864 URL defining where eHive database is located
868 "No SQL Version Check" - set
if you want to force working with a database created by a potentially schema-incompatible API
872 =head2 Configs overriding
876 =item --config_file <string>
878 JSON file (with absolute path) to
override the
default configurations (could be multiple)
882 =head2 Looping control
888 run autonomously, loops and sleeps. Equivalent to -loop_until ANALYSIS_FAILURE
892 sets the level of
event that will cause the Beekeeper to stop looping:
898 stop looping
if any Job fails
900 =item ANALYSIS_FAILURE
902 stop looping
if any Analysis has Job failures exceeding its fault tolerance
906 ignore Job and Analysis failures, keep looping until there is no work
910 ignore failures and no work, keep looping
916 (Deprecated) alias
for -loop_until FOREVER
918 =item --max_loops <num>
920 perform max
this # of loops in autonomous mode. The Beekeeper will stop when
921 it has performed max_loops loops, even in FOREVER mode
923 =item --job_id <job_id>
925 run one iteration
for this job_id
929 run one iteration of automation loop
933 when looping, sleep <num> minutes (
default 1 min)
937 =head2 Current Meadow control
941 =item --meadow_type <string>
943 the desired Meadow
class name, such as 'LSF' or 'LOCAL'
945 =item --total_running_workers_max <num>
947 max # workers to be running in parallel
949 =item --submit_workers_max <num>
951 max # workers to create per loop iteration
953 =item --submission_options <string>
955 passes <string> to the Meadow submission command as <options> (formerly lsf_options)
957 =item --submit_log_dir <dir>
959 record submission output+error streams into files under the given directory (to see why some workers fail after submission)
963 =head2 Worker control
967 =item --analyses_pattern <string>
969 restrict the sync operation, printing of stats or looping of the Beekeeper to the specified subset of Analyses
971 =item --nocan_respecialize
973 prevent workers from re-specializing into another Analysis (within resource_class) after their previous Analysis is exhausted
977 run all workers with -force (see runWorker.pl)
979 =item --killworker <worker_id>
981 kill Worker by worker_id
983 =item --life_span <num>
985 number of minutes each Worker is allowed to
run
987 =item --job_limit <num>
989 Number of Jobs to
run before Worker can die naturally
991 =item --retry_throwing_jobs
993 if a Job dies *knowingly* (e.g. by encountering a die statement in the Runnable), should we retry it by
default?
995 =item --hive_log_dir <path>
997 directory where stdout/stderr of the eHive is redirected
999 =item --worker_delay_startup_seconds <number>
1001 number of seconds each Worker has to wait before first talking to the database (0 by
default, useful
for debugging)
1003 =item --worker_crash_on_startup_prob <float>
1005 probability of each Worker failing at startup (0 by
default, useful
for debugging)
1007 =item --debug <debug_level>
1009 set
debug level of the workers
1013 =head2 Other commands/options
1023 report both eHive code version and eHive database schema version
1027 detect all unaccounted dead workers and reset their Jobs
for resubmission
1031 re-synchronise the ehive
1035 detect all workers in UNKWN state and reset their Jobs
for resubmission (careful, they *may* reincarnate!)
1039 shut everything down: block all beekeepers connected to the pipeline and terminate workers
1043 tell the database all workers are dead (no checks are performed in
this mode, so be very careful!)
1045 =item --balance_semaphores
1047 set all Semaphore counters to the numbers of unDONE fan Jobs (emergency use only)
1049 =item --worker_stats
1051 show status of each running Worker
1055 show all failed Jobs
1057 =item --job_output <job_id>
1059 print details
for one Job
1061 =item --reset_job_id <num>
1063 reset a Job back to READY so it can be rerun
1065 =item --reset_failed_jobs
1067 reset FAILED Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1069 =item --reset_done_jobs
1071 reset DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1073 =item --reset_all_jobs
1075 reset FAILED, DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1077 =item --forgive_failed_jobs
1079 mark FAILED Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1081 =item --discard_ready_jobs
1083 mark READY Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1085 =item --unblock_semaphored_jobs
1087 set SEMAPHORED Jobs of analyses matching -analyses_pattern to READY so they can start
1093 See the NOTICE file distributed with
this work
for additional information
1094 regarding copyright ownership.
1096 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
1097 You may obtain a copy of the License at
1101 Unless required by applicable law or agreed to in writing, software distributed under the License
1102 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1103 See the License
for the specific language governing permissions and limitations under the License.
1107 Please subscribe to the eHive mailing list: http: