6 # Finding out own path in order to reference own components (including own modules):
10 $ENV{
'EHIVE_ROOT_DIR'} ||= File::Basename::dirname( File::Basename::dirname( Cwd::realpath($0) ) );
11 unshift @INC, $ENV{
'EHIVE_ROOT_DIR'}.
'/modules';
14 use File::Path
'make_path';
15 use Getopt::Long qw(:config no_auto_abbrev);
36 $|=1; # make STDOUT unbuffered (STDERR is unbuffered anyway)
38 # ok this is a hack, but I'm going to pretend I've got an object here
39 # by creating a hash ref and passing it around like an object
40 # this is to avoid using global variables in functions, and to consolidate
41 # the globals into a nice '$self' package
45 my $report_versions = 0;
49 my $show_failed_jobs = 0;
50 my $default_meadow_type = undef;
51 my $submit_workers_max = undef;
52 my $total_running_workers_max = undef;
53 my $submission_options = undef;
55 my $run_job_id = undef;
57 my $check_for_dead = 0;
58 my $bury_unkwn_workers = 0;
60 my $balance_semaphores = 0;
61 my $job_id_for_output = 0;
62 my $show_worker_stats = 0;
63 my $kill_worker_id = 0;
64 my $big_red_button = 0;
65 my $keep_alive = 0; # DEPRECATED
67 my $reset_all_jobs_for_analysis = 0; # DEPRECATED
68 my $reset_failed_jobs_for_analysis = 0; # DEPRECATED
69 my $reset_all_jobs = 0; # Mark DONE, PASSED_ON and FAILED jobs to READY
70 my $reset_failed_jobs = 0; # Mark FAILED jobs to READY
71 my $reset_done_jobs = 0; # Mark DONE and PASSED_ON jobs to READY
72 my $unblock_semaphored_jobs = 0; # Mark SEMAPHORED jobs to READY
73 my $forgive_failed_jobs = 0; # Mark FAILED jobs to DONE
74 my $discard_ready_jobs = 0; # Mark READY jobs to DONE
76 $self->{
'url'} = undef;
77 $self->{
'reg_conf'} = undef;
78 $self->{
'reg_type'} = undef;
79 $self->{
'reg_alias'} = undef;
80 $self->{
'nosqlvc'} = undef;
82 $self->{
'config_files'} = [];
84 $self->{
'sleep_minutes'} = 1;
85 $self->{
'max_loops'} = 0;
86 $self->{
'retry_throwing_jobs'} = undef;
87 $self->{
'loop_until'} = undef;
88 $self->{
'can_respecialize'} = 1;
89 $self->{
'hive_log_dir'} = undef;
90 $self->{
'submit_log_dir'} = undef;
91 $self->{
'worker_delay_startup_seconds'} = undef;
92 $self->{
'worker_crash_on_startup_prob'} = undef;
94 # store all the options passed on the command line for registration
95 # as GetOptions modifies @ARGV
96 my @original_argv = @ARGV;
99 # connection parameters
100 'url=s' => \$self->{
'url'},
101 'reg_conf|regfile|reg_file=s' => \$self->{
'reg_conf'},
102 'reg_type=s' => \$self->{
'reg_type'},
103 'reg_alias|regname|reg_name=s' => \$self->{
'reg_alias'},
104 'nosqlvc' => \$self->{
'nosqlvc'}, #
using "nosqlvc" instead of
"sqlvc!" for compatibility with modules where it is a propagated option
107 'config_file=s@' => $self->{
'config_files'},
112 'max_loops=i' => \$self->{
'max_loops'},
113 'loop_until=s' => \$self->{
'loop_until'},
114 'keep_alive' => \$keep_alive,
115 'job_id|run_job_id=i'=> \$run_job_id,
116 'force!' => \$self->{
'force'},
117 'sleep=f' => \$self->{
'sleep_minutes'},
121 'meadow_type=s' => \$default_meadow_type,
122 'total_running_workers_max=i' => \$total_running_workers_max,
123 'submit_workers_max=i' => \$submit_workers_max,
124 'submission_options=s' => \$submission_options,
127 'job_limit=i' => \$self->{
'job_limit'},
128 'life_span|lifespan=i' => \$self->{
'life_span'},
129 'logic_name=s' => \$self->{
'logic_name'},
130 'analyses_pattern=s' => \$self->{
'analyses_pattern'},
131 'hive_log_dir|hive_output_dir=s' => \$self->{
'hive_log_dir'},
132 'retry_throwing_jobs!' => \$self->{
'retry_throwing_jobs'},
133 'can_respecialize|can_respecialise!' => \$self->{
'can_respecialize'},
134 'debug=i' => \$self->{
'debug'},
135 'submit_log_dir=s' => \$self->{
'submit_log_dir'},
136 'worker_delay_startup_seconds=i' => \$self->{
'worker_delay_startup_seconds'},
137 'worker_crash_on_startup_prob=f' => \$self->{
'worker_crash_on_startup_prob'},
139 # other commands/options
141 'v|version|versions!' => \$report_versions,
143 'dead!' => \$check_for_dead,
144 'unkwn!' => \$bury_unkwn_workers,
145 'killworker=i' => \$kill_worker_id,
146 'big_red_button' => \$big_red_button,
147 'alldead!' => \$all_dead,
148 'balance_semaphores'=> \$balance_semaphores,
149 'worker_stats' => \$show_worker_stats,
150 'failed_jobs' => \$show_failed_jobs,
151 'reset_job_id=i' => \$reset_job_id,
152 'reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
153 'reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
154 'reset_failed_jobs' => \$reset_failed_jobs,
155 'reset_all_jobs' => \$reset_all_jobs,
156 'reset_done_jobs' => \$reset_done_jobs,
157 'discard_ready_jobs' => \$discard_ready_jobs,
158 'forgive_failed_jobs' => \$forgive_failed_jobs,
159 'unblock_semaphored_jobs' => \$unblock_semaphored_jobs,
160 'job_output=i' => \$job_id_for_output,
161 ) or die
"Error in command line arguments\n";
164 die
"ERROR: There are invalid arguments on the command-line: ". join(
" ", @ARGV).
"\n";
168 pod2usage({-exitvalue => 0, -verbose => 2});
171 if($report_versions) {
178 # if -keep_alive passed, ensure looping is on and loop_until is forever
180 $self->{
'loop_until'} =
'FOREVER';
184 # if user has specified -loop_until, ensure looping is turned on
185 if ($self->{
'loop_until'}) {
189 # if loop_until hasn't been set by the user, or defaulted by a flag,
190 # set it to ANALYSIS_FAILURE
191 unless ($self->{
'loop_until'}) {
192 $self->{
'loop_until'} =
'ANALYSIS_FAILURE';
195 if($run or $run_job_id) {
196 $self->{
'max_loops'} = 1;
198 unless($self->{
'max_loops'}) {
199 $self->{
'max_loops'} = -1; # unlimited
203 if($self->{
'url'} or $self->{
'reg_alias'}) {
206 -url => $self->{
'url'},
207 -reg_conf => $self->{
'reg_conf'},
208 -reg_type => $self->{
'reg_type'},
209 -reg_alias => $self->{
'reg_alias'},
210 -no_sql_schema_version_check => $self->{
'nosqlvc'},
213 $self->{
'dba'} = $self->{
'pipeline'}->hive_dba();
216 die
"\nERROR: Connection parameters (url or reg_conf+reg_alias) need to be specified\n";
219 $self->{
'options'} = join(
" ", @original_argv);
221 # make -loop_until case insensitive
222 $self->{
'loop_until'} = uc($self->{
'loop_until'});
224 my @allowed_loop_until_values = qw(ANALYSIS_FAILURE FOREVER JOB_FAILURE NO_WORK);
225 unless (grep {$_ eq $self->{
'loop_until'}} @allowed_loop_until_values) {
226 die sprintf(
'"%s" is not a recognized value for -loop_until. Use one of %s', $self->{
'loop_until'}, join(
'/', @allowed_loop_until_values));
229 my $pipeline_name = $self->{
'pipeline'}->hive_pipeline_name;
232 print
"Pipeline name: $pipeline_name\n";
234 print STDERR
"+---------------------------------------------------------------------+\n";
235 print STDERR
"! !\n";
236 print STDERR
"! WARNING: !\n";
237 print STDERR
"! !\n";
238 print STDERR
"! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
239 print STDERR
"! This may seriously impair your beekeeping experience unless you are !\n";
240 print STDERR
"! the only farm user. The name should be set in your PipeConfig file, !\n";
241 print STDERR
"! or if you are running an old pipeline you can just set it by hand !\n";
242 print STDERR
"! in the 'meta' table. !\n";
243 print STDERR
"! !\n";
244 print STDERR
"+---------------------------------------------------------------------+\n";
247 unless ($self->{
'dba'}->dbc->has_write_access) {
248 my $dbc = $self->{
'dba'}->dbc;
250 print STDERR
"*" x 70,
"\n";
251 print STDERR sprintf(
"* It appears that %s doesn't have INSERT/UPDATE/DELETE privileges\n", $dbc->username);
252 print STDERR sprintf(
"* on this database (%s). Please check the credentials\n", $dbc->dbname);
254 print STDERR
"*" x 70,
"\n";
258 undef $reset_all_jobs;
259 undef $reset_failed_jobs;
260 undef $reset_done_jobs;
261 undef $unblock_semaphored_jobs;
262 undef $forgive_failed_jobs;
263 undef $discard_ready_jobs;
264 undef $kill_worker_id;
266 $self->{
'read_only'} = 1;
270 $submit_workers_max = 1;
273 $default_meadow_type =
'LOCAL' if($local);
275 $self->{
'available_meadow_list'} = $valley->get_available_meadow_list();
277 $valley->config_set(
'SubmitWorkersMax', $submit_workers_max)
if(defined $submit_workers_max);
279 my $default_meadow = $valley->get_default_meadow();
280 print
"Default meadow: ".$default_meadow->signature.
"\n\n";
282 $default_meadow->config_set(
'TotalRunningWorkersMax', $total_running_workers_max)
if(defined $total_running_workers_max);
283 $default_meadow->config_set(
'SubmissionOptions', $submission_options)
if(defined $submission_options);
285 my $queen = $self->{
'dba'}->get_Queen;
287 if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
289 if($job_id_for_output) {
290 printf(
"===== Job output\n");
291 my $job = $self->{
'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
292 print $job->toString.
"\n";
295 if($reset_all_jobs_for_analysis) {
296 die
"Deprecated option -reset_all_jobs_for_analysis. Please use -reset_all_jobs in combination with -analyses_pattern <pattern>";
298 if($reset_failed_jobs_for_analysis) {
299 die
"Deprecated option -reset_failed_jobs_for_analysis. Please use -reset_failed_jobs in combination with -analyses_pattern <pattern>";
302 if( $self->{
'logic_name'} ) { # FIXME:
for now, logic_name will
override analyses_pattern quietly
303 warn
"-logic_name is now deprecated, please use -analyses_pattern that extends the functionality of -logic_name .\n";
304 $self->{
'analyses_pattern'} = $self->{
'logic_name'};
307 # May die if running within a non-LOCAL meadow
308 unless ($self->{
'read_only'}) {
311 $self->{
'logmessage_adaptor'} = $self->{
'dba'}->get_LogMessageAdaptor();
313 # Check other beekeepers in our meadow to see if they are still alive
314 $self->{
'beekeeper'}->adaptor->bury_other_beekeepers($self->{
'beekeeper'}) unless $self->{
'read_only'};
316 if ($kill_worker_id) {
318 eval {$kill_worker = $queen->fetch_by_dbID($kill_worker_id) or die};
320 log_and_die($self,
"Could not fetch Worker with dbID='$kill_worker_id' to kill");
323 unless( $kill_worker->cause_of_death() ) {
324 if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
326 if( $meadow->check_worker_is_alive_and_mine($kill_worker) ) {
327 printf(
"Killing worker: %10d %35s %15s : ",
328 $kill_worker->dbID, $kill_worker->meadow_host, $kill_worker->process_id);
330 $meadow->kill_worker($kill_worker);
331 $kill_worker->cause_of_death(
'KILLED_BY_USER');
332 $queen->register_worker_death($kill_worker);
333 # what about clean-up? Should we do it here or not?
335 log_and_die($self,
"According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
338 log_and_die($self,
"Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill");
341 log_and_die($self,
"According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
345 if ( $big_red_button ) {
351 eval {$run_job = $self->{
'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id ) or die};
353 log_and_die($self,
"Could not fetch Job with dbID=$run_job_id.\n");
357 my $list_of_analyses = $run_job
358 ? [ $run_job->analysis ]
359 : $self->{
'pipeline'}->collection_of(
'Analysis')->find_all_by_pattern( $self->{
'analyses_pattern'} );
361 if( $self->{
'analyses_pattern'} ) {
362 if( @$list_of_analyses ) {
363 print
"Beekeeper : the following Analyses matched your -analyses_pattern '".$self->{
'analyses_pattern'}.
"' : "
364 . join(
', ',
map { $_->logic_name.
'('.$_->dbID.
')' } sort {$a->dbID <=> $b->dbID} @$list_of_analyses)
365 .
"\nBeekeeper : ", scalar($self->{
'pipeline'}->collection_of(
'Analysis')->list())-scalar(@$list_of_analyses),
" Analyses are not shown\n\n";
367 log_and_die($self,
"Beekeeper : the -analyses_pattern '".$self->{
'analyses_pattern'}.
"' did not match any Analyses.\n");
371 my $has_task = ($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs || $unblock_semaphored_jobs || $forgive_failed_jobs || $discard_ready_jobs);
372 if($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs) {
373 if (($reset_all_jobs || $reset_done_jobs) and not $self->{
'analyses_pattern'}) {
374 log_and_die($self,
"Beekeeper : do you really want to reset *all* the Jobs ? If yes, add \"-analyses_pattern '%'\" to the command line\n");
376 my $statuses_to_reset = $reset_failed_jobs ? [
'FAILED' ] : ($reset_done_jobs ? [
'DONE',
'PASSED_ON' ] : [
'DONE',
'FAILED',
'PASSED_ON' ]);
377 $self->{
'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id( $list_of_analyses, $statuses_to_reset );
380 if ($unblock_semaphored_jobs) {
381 $self->{
'dba'}->get_AnalysisJobAdaptor->unblock_jobs_for_analysis_id( $list_of_analyses );
384 if ($discard_ready_jobs) {
385 $self->{
'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses,
'READY' );
388 if ($forgive_failed_jobs) {
389 $self->{
'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses,
'FAILED' );
392 $queen->synchronize_hive( $list_of_analyses )
if $has_task;
394 if($all_dead) { $queen->register_all_workers_dead(); }
395 if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
396 if($bury_unkwn_workers) { $queen->check_for_dead_workers($valley, 1, 1); }
397 if($balance_semaphores) { $self->{
'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
400 if ($self->{
'max_loops'}) { # positive $max_loop means limited, negative means unlimited
402 $has_error =
run_autonomously($self, $self->{
'pipeline'}, $self->{
'max_loops'}, $self->{
'loop_until'}, $valley, $list_of_analyses, $self->{
'analyses_pattern'}, $run_job_id);
405 # the output of several methods will look differently depending on $analysis being [un]defined
408 $queen->synchronize_hive( $list_of_analyses );
410 my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{
'debug'} );
412 if($show_worker_stats) {
413 print
"\n===== List of live Workers according to the Queen: ======\n";
414 foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
415 print $worker->toString(1).
"\n";
418 $self->{
'dba'}->get_RoleAdaptor->print_active_role_counts;
422 if($show_failed_jobs) {
423 print(
"===== failed Jobs\n");
424 my $failed_job_list = $self->{
'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status( $list_of_analyses ,
'FAILED');
426 foreach my $job (@{$failed_job_list}) {
427 print $job->toString.
"\n";
430 $self->{
'beekeeper'}->set_cause_of_death(
'LOOP_LIMIT') unless $self->{
'read_only'};
436 #######################
440 #######################
443 my ($self, $message) = @_;
445 my $beekeeper = $self->{
'beekeeper'};
447 $self->{
'logmessage_adaptor'}->store_beekeeper_message($beekeeper->dbID, $message,
'PIPELINE_ERROR',
'TASK_FAILED');
448 $beekeeper->set_cause_of_death(
'TASK_FAILED');
454 my ($self, $analyses_pattern, $run_job_id) = @_;
456 my $worker_cmd =
'runWorker.pl';
458 foreach my $worker_option (
'url',
'reg_conf',
'reg_type',
'reg_alias',
'job_limit',
'life_span',
459 'worker_delay_startup_seconds',
'worker_crash_on_startup_prob',
'hive_log_dir',
'debug') {
460 if(defined(my $value = $self->{$worker_option})) {
461 $worker_cmd .=
" -${worker_option} $value";
465 foreach my $worker_flag (
'retry_throwing_jobs',
'can_respecialize',
'force',
'nosqlvc') {
466 if(defined(my $value = $self->{$worker_flag})) {
468 $worker_cmd .=
" -no${worker_flag}";
470 $worker_cmd .=
" -${worker_flag}";
475 # This option can have multiple values
476 $worker_cmd .=
" -config_file $_" for @{$self->{
'config_files'}};
480 $worker_cmd .=
" -job_id $run_job_id";
481 } elsif ($analyses_pattern) {
482 $worker_cmd .=
" -analyses_pattern '".$analyses_pattern.
"'";
489 my ($valley, $self) = @_;
491 my $loop_limit = undef;
492 if ($self->{
'max_loops'} > -1) {
493 $loop_limit = $self->{
'max_loops'};
496 my $meadow_signatures = join(
",",
497 map {$_->signature} @{$self->{
'available_meadow_list'}});
499 # The new instance is partly initalised with the output of Valley::whereami()
501 'sleep_minutes' => $self->{
'sleep_minutes'},
502 'analyses_pattern' => $self->{
'analyses_pattern'},
503 'loop_limit' => $loop_limit,
504 'loop_until' => $self->{
'loop_until'},
505 'options' => $self->{
'options'},
506 'meadow_signatures' => $meadow_signatures,
509 $self->{
'dba'}->get_BeekeeperAdaptor->store($beekeeper);
510 unless ($self->{
'beekeeper_id'} = $beekeeper->dbID) {
511 die
"There was a problem registering this Beekeeper with the eHive database.";
518 my ( $self, $valley ) = @_;
520 my $bk_a = $self->{dba}->get_BeekeeperAdaptor();
521 my $blocked_beekeepers;
523 # Save a list of IDs of beekeepers which were blocked earlier so
524 # that we can not mention them while reporting the current blocking.
525 $blocked_beekeepers = $bk_a->fetch_all(
'is_blocked = 1' );
526 my %previously_blocked_ids;
527 while ( my $blocked_bk = shift @{ $blocked_beekeepers } ) {
528 $previously_blocked_ids{ $blocked_bk->dbID() } = 1;
531 # Begin the shutdown by blocking all registered beekeepers so that
532 # none of them start spawning new workers just as this one tries to
534 $bk_a->block_all_alive_beekeepers();
536 # Report which beekeepers, self excluded, we have just blocked
537 $blocked_beekeepers = $bk_a->fetch_all(
'is_blocked = 1' );
538 my $my_dbid = $self->{
'beekeeper'}->dbID();
539 my @newly_blocked = grep {
540 ( ! exists $previously_blocked_ids{ $_->dbID() } )
541 && ( $_->dbID() != $my_dbid )
542 } @{ $blocked_beekeepers };
543 while ( my $blocked_bk = shift @newly_blocked ) {
544 print
'Blocked beekeeper ' . $blocked_bk->dbID() .
': '
545 . $blocked_bk->toString() .
"\n";
548 # Next, kill all workers which are still alive.
549 # FIXME: double-check correct job status:
550 # - running ones should be marked as 'failed'
551 # - claimed but unstarted ones should get back to 'unclaimed'
552 my $queen = $self->{
'dba'}->get_Queen();
553 $queen->kill_all_workers( $valley );
560 my ($self, $pipeline, $max_loops, $loop_until, $valley, $list_of_analyses, $analyses_pattern, $run_job_id) = @_;
562 my $hive_dba = $pipeline->hive_dba;
563 my $queen = $hive_dba->get_Queen;
564 my $meadow_user = $self->{
'beekeeper'} && $self->{
'beekeeper'}->meadow_user;
566 my $pathless_resourceless_worker_cmd =
generate_worker_cmd($self, $analyses_pattern, $run_job_id);
571 BKLOOP:
while( ($iteration++ != $max_loops) or ($loop_until eq
'FOREVER') ) { # NB: the order of conditions is important!
573 print(
"\nBeekeeper : loop #$iteration ======================================================\n");
575 my $pipeline_name = $pipeline->hive_pipeline_name;
576 if ($pipeline_name) {
577 print
"\nPipeline name: $pipeline_name";
579 print
"\nPipeline URL: ".$self->{
'url'}.
"\n\n";
581 $queen->check_for_dead_workers($valley, 0) unless $self->{
'read_only'};
583 # this section is where the beekeeper decides whether or not to stop looping
584 $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{
'debug'});
585 my @job_fail_statuses = grep({$_->{
'exit_status'} eq
'JOB_FAILED'} @$reasons_to_exit);
586 my @analysis_fail_statuses = grep({$_->{
'exit_status'} eq
'ANALYSIS_FAILED'} @$reasons_to_exit);
587 my @no_work_statuses = grep({$_->{
'exit_status'} eq
'NO_WORK'} @$reasons_to_exit);
589 my $found_reason_to_exit = 0;
591 if (($loop_until eq
'JOB_FAILURE') &&
592 (scalar(@job_fail_statuses)) > 0) {
593 print
"Beekeeper : last loop because at least one Job failed and loop-until mode is '$loop_until'\n";
594 print
"Beekeeper : details from analyses with failed Jobs:\n";
595 print join(
"\n",
map {$_->{
'message'}} @job_fail_statuses) .
"\n";
596 $found_reason_to_exit = 1;
600 if (scalar(@analysis_fail_statuses > 0)) {
601 # at least one analysis has hit its fault tolerance
602 if (($loop_until eq
'FOREVER') ||
603 ($loop_until eq
'NO_WORK')) {
604 if (scalar(@no_work_statuses) == 0) {
605 print
"Beekeeper : detected the following exit condition(s), but staying alive because loop-until mode is set to '$loop_until' :\n" .
606 join(
", ",
map {$_->{
'message'}} @analysis_fail_statuses) .
"\n";
609 # loop_until_mode is either job_failure or analysis_failure, and both of these exit on analysis failure
610 unless ($found_reason_to_exit) {
611 print
"Beekeeper : last loop because at least one analysis failed and loop-until mode is '$loop_until'\n";
612 print
"Beekeeper : details from analyses with failed Jobs:\n";
613 print join(
"\n",
map {$_->{
'message'}} @analysis_fail_statuses) .
"\n";
614 $found_reason_to_exit = 1;
620 if ((scalar(@no_work_statuses) > 0) &&
621 ($loop_until ne
'FOREVER')) {
622 print
"Beekeeper : last loop because there is no more work and loop-until mode is '$loop_until'\n"
623 unless ($found_reason_to_exit);
627 # end of testing for loop end conditions
629 $hive_dba->get_RoleAdaptor->print_active_role_counts;
631 my $workers_to_submit_by_meadow_type_rc_name = $self->{
'read_only'} ? {} :
634 if( keys %$workers_to_submit_by_meadow_type_rc_name ) {
636 my $submit_log_subdir;
638 if( $self->{
'submit_log_dir'} ) {
639 $submit_log_subdir = $self->{
'submit_log_dir'}.
"/submit_bk".$self->{
'beekeeper_id'}.
"_iter${iteration}";
640 make_path( $submit_log_subdir );
643 # create an "index" over the freshly loaded RC/RD collections:
644 my %meadow_type_rc_name2resource_param_list = ();
645 foreach my $rd ( $pipeline->collection_of(
'ResourceDescription')->list ) {
646 my $rc_name = $rd->resource_class->name;
647 $meadow_type_rc_name2resource_param_list{ $rd->meadow_type }{ $rc_name } = [ $rd->submission_cmd_args, $rd->worker_cmd_args ];
650 foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {
652 my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
654 foreach my $rc_name (keys %{ $workers_to_submit_by_meadow_type_rc_name->{$meadow_type} }) {
655 my $this_meadow_rc_worker_count = $workers_to_submit_by_meadow_type_rc_name->{$meadow_type}{$rc_name};
657 my $submission_message =
"submitting $this_meadow_rc_worker_count workers (rc_name=$rc_name) to ".$this_meadow->signature();
658 print
"\nBeekeeper : $submission_message\n";
659 $self->{
'logmessage_adaptor'}->store_beekeeper_message($self->{
'beekeeper_id'},
660 "loop iteration $iteration, $submission_message",
663 my ($submission_cmd_args, $worker_cmd_args) = @{ $meadow_type_rc_name2resource_param_list{ $meadow_type }{ $rc_name } || [] };
665 my $specific_worker_cmd = $this_meadow->runWorker_path
666 . $pathless_resourceless_worker_cmd
667 . (defined($worker_cmd_args) ?
" $worker_cmd_args" :
'')
670 my $meadow_process_ids = [];
671 if ($this_meadow_rc_worker_count > 1 and !$this_meadow->config_get(
'CanSubmitJobArrays')) {
672 foreach my $i (1..$this_meadow_rc_worker_count) {
673 my $submitted_process_id = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, 1, $self->{
'beekeeper_id'}.
'_'.
"$iteration.$i", $rc_name, $submission_cmd_args ||
'', $submit_log_subdir);
674 push @$meadow_process_ids, $submitted_process_id->[0];
677 $meadow_process_ids = $this_meadow->submit_workers_return_meadow_pids($specific_worker_cmd, $this_meadow_rc_worker_count, $self->{
'beekeeper_id'}.
'_'.$iteration, $rc_name, $submission_cmd_args ||
'', $submit_log_subdir);
680 print
"Submitted the following process_ids to ".$this_meadow->signature.
": ".join(
', ', @$meadow_process_ids).
"\n";
682 my $resource_class = $pipeline->collection_of(
'ResourceClass')->find_one_by(
'name', $rc_name);
683 my $meadow_name = $this_meadow->cached_name;
685 my @pre_allocated_workers =
map {
687 'meadow_type' => $meadow_type, # non-unique key components
688 'meadow_name' => $meadow_name,
689 'meadow_user' => $meadow_user,
692 'resource_class' => $resource_class, # non-key, but known at the time of pre-allocation
693 'beekeeper_id' => $self->{
'beekeeper_id'},
695 'status' =>
'SUBMITTED',
697 } @$meadow_process_ids;
699 $queen->store( \@pre_allocated_workers );
703 } elsif ($self->{
'read_only'}) {
705 print STDERR
"*" x 70,
"\n";
706 print STDERR
"* beekeeper.pl is running in read-only mode, i.e. it only\n";
707 print STDERR
"* prints the current status of the pipeline.\n";
709 print STDERR
"*" x 70,
"\n";
713 print
"\nBeekeeper : not submitting any workers this iteration\n";
714 $self->{
'logmessage_adaptor'}->store_beekeeper_message($self->{
'beekeeper_id'},
715 "loop iteration $iteration, 0 workers submitted",
719 if( $iteration != $max_loops ) { # skip the last sleep
721 $hive_dba->dbc->disconnect_if_idle;
722 printf(
"Beekeeper : going to sleep for %.2f minute(s). Expect next iteration at %s\n", $self->{
'sleep_minutes'}, scalar localtime(time+$self->{
'sleep_minutes'}*60));
723 sleep($self->{
'sleep_minutes'}*60);
724 last
if $self->{
'read_only'};
725 # this is a good time to check up on other beekeepers as well:
726 $self->{
'beekeeper'}->adaptor->bury_other_beekeepers($self->{
'beekeeper'});
727 if ($self->{
'beekeeper'}->check_if_blocked()) {
728 print
"Beekeeper : We have been blocked !\n".
729 "This can happen if a Job has explicitly required the Beekeeper to stop (have a look at log_message).\n".
730 "It may also happen if someone has set is_blocked=1 in the beekeeper table for beekeeper_id=".$self->{
'beekeeper_id'}.
".\n";
736 # after waking up reload Resources and Analyses to stay current.
737 unless($run_job_id) {
738 # reset all the collections so that fresher data will be used at this iteration:
739 $pipeline->invalidate_collections();
740 $pipeline->invalidate_hive_current_load();
742 $list_of_analyses = $pipeline->collection_of(
'Analysis')->find_all_by_pattern( $analyses_pattern );
747 # in this section, the beekeeper determines why it exited, sets an appropriate cause of death,
748 # and prints/logs an appropriate message
749 my @stringified_reasons_builder;
750 my $beekeeper_cause_of_death;
751 my $cause_of_death_is_error;
752 my %exit_statuses; # keep a set of unique exit statuses seen
753 if ($reasons_to_exit) {
754 foreach my $reason_to_exit (@$reasons_to_exit) {
755 $exit_statuses{$reason_to_exit->{
'exit_status'}} = 1;
756 push(@stringified_reasons_builder, $reason_to_exit->{
'message'});
760 my $stringified_reasons = join(
", ", @stringified_reasons_builder);
762 if (($loop_until eq
'JOB_FAILURE') &&
763 (grep(/JOB_FAILED/, keys(%exit_statuses)))) {
764 $beekeeper_cause_of_death =
'JOB_FAILED';
765 $cause_of_death_is_error = 1;
768 if (($loop_until eq
'ANALYSIS_FAILURE') &&
769 (grep(/ANALYSIS_FAILED/, keys(%exit_statuses)))) {
770 $beekeeper_cause_of_death =
'ANALYSIS_FAILED';
771 $cause_of_death_is_error = 1;
774 if (!$beekeeper_cause_of_death) {
775 if (grep(/NO_WORK/, keys(%exit_statuses))) {
776 $beekeeper_cause_of_death =
'NO_WORK';
778 $beekeeper_cause_of_death =
'LOOP_LIMIT';
780 $cause_of_death_is_error = 0;
783 $self->{
'logmessage_adaptor'}->store_beekeeper_message($self->{
'beekeeper_id'},
784 "stopped looping because of $stringified_reasons",
785 $cause_of_death_is_error ?
'PIPELINE_ERROR' :
'INFO',
786 $beekeeper_cause_of_death) unless $self->{
'read_only'};
788 if ($reasons_to_exit and $ENV{EHIVE_SLACK_WEBHOOK}) {
789 send_beekeeper_message_to_slack($ENV{EHIVE_SLACK_WEBHOOK}, $self->{
'pipeline'}, $cause_of_death_is_error, 1, $stringified_reasons, $loop_until);
792 $self->{
'beekeeper'}->set_cause_of_death($beekeeper_cause_of_death) unless $self->{
'read_only'};
793 printf(
"Beekeeper: dbc %d disconnect cycles\n", $hive_dba->dbc->disconnect_count);
794 return $cause_of_death_is_error;
804 beekeeper.pl [options]
808 The Beekeeper is in charge of interfacing between the eHive database a compute resource or
'compute farm'.
809 Its Job is to synchronise both, to assess the compute requirements of the pipeline
810 and to send the requested number of workers to open machines via the runWorker.pl script.
812 It is also responsible
for identifying workers which died
813 unexpectedly so that dead workers can be released and unfinished Jobs reclaimed.
815 =head1 USAGE EXAMPLES
817 # Usually run after the pipeline has been created to calculate the internal statistics necessary for eHive functioning
818 beekeeper.pl -url mysql:
820 # Do not run any additional Workers, just check for the current status of the pipeline:
821 beekeeper.pl -url mysql:
823 # Run the pipeline in automatic mode (-loop), run all the workers locally (-meadow_type LOCAL) and allow for 3 parallel workers (-total_running_workers_max 3)
824 beekeeper.pl -url mysql:
826 # Run in automatic mode, but only restrict to running blast-related analyses with the exception of analyses 4..6
827 beekeeper.pl -url mysql:
829 # Restrict the normal execution to one iteration only - can be used for testing a newly set up pipeline
830 beekeeper.pl -url mysql:
832 # Reset failed 'buggy_analysis' Jobs to 'READY' state, so that they can be run again
833 beekeeper.pl -url mysql:
835 # Do a cleanup: find and bury dead workers, reclaim their Jobs
836 beekeeper.pl -url mysql:
840 =head2 Connection parameters
844 =item --reg_conf <path>
846 Path to a Registry configuration file
848 =item --reg_type <string>
850 Type of the registry entry (
"hive",
"core",
"compara", etc. - defaults to
"hive")
852 =item --reg_alias <string>
854 Species / alias name
for the eHive DBAdaptor
856 =item --url <url string>
858 URL defining where eHive database is located
862 "No SQL Version Check" - set
if you want to force working with a database created by a potentially schema-incompatible API
866 =head2 Configs overriding
870 =item --config_file <string>
872 JSON file (with absolute path) to
override the
default configurations (could be multiple)
876 =head2 Looping control
882 run autonomously, loops and sleeps. Equivalent to -loop_until ANALYSIS_FAILURE
886 sets the level of
event that will cause the Beekeeper to stop looping:
892 stop looping
if any Job fails
894 =item ANALYSIS_FAILURE
896 stop looping
if any Analysis has Job failures exceeding its fault tolerance
900 ignore Job and Analysis failures, keep looping until there is no work
904 ignore failures and no work, keep looping
910 (Deprecated) alias
for -loop_until FOREVER
912 =item --max_loops <num>
914 perform max
this # of loops in autonomous mode. The Beekeeper will stop when
915 it has performed max_loops loops, even in FOREVER mode
917 =item --job_id <job_id>
919 run one iteration
for this job_id
923 run one iteration of automation loop
927 when looping, sleep <num> minutes (
default 1 min)
931 =head2 Current Meadow control
935 =item --meadow_type <string>
937 the desired Meadow
class name, such as 'LSF' or 'LOCAL'
939 =item --total_running_workers_max <num>
941 max # workers to be running in parallel
943 =item --submit_workers_max <num>
945 max # workers to create per loop iteration
947 =item --submission_options <string>
949 passes <string> to the Meadow submission command as <options> (formerly lsf_options)
951 =item --submit_log_dir <dir>
953 record submission output+error streams into files under the given directory (to see why some workers fail after submission)
957 =head2 Worker control
961 =item --analyses_pattern <string>
963 restrict the sync operation, printing of stats or looping of the Beekeeper to the specified subset of Analyses
965 =item --nocan_respecialize
967 prevent workers from re-specializing into another Analysis (within resource_class) after their previous Analysis is exhausted
971 run all workers with -force (see runWorker.pl)
973 =item --killworker <worker_id>
975 kill Worker by worker_id
977 =item --life_span <num>
979 number of minutes each Worker is allowed to
run
981 =item --job_limit <num>
983 Number of Jobs to
run before Worker can die naturally
985 =item --retry_throwing_jobs
987 if a Job dies *knowingly* (e.g. by encountering a die statement in the Runnable), should we retry it by
default?
989 =item --hive_log_dir <path>
991 directory where stdout/stderr of the eHive is redirected
993 =item --worker_delay_startup_seconds <number>
995 number of seconds each Worker has to wait before first talking to the database (0 by
default, useful
for debugging)
997 =item --worker_crash_on_startup_prob <float>
999 probability of each Worker failing at startup (0 by
default, useful
for debugging)
1001 =item --debug <debug_level>
1003 set
debug level of the workers
1007 =head2 Other commands/options
1017 report both eHive code version and eHive database schema version
1021 detect all unaccounted dead workers and reset their Jobs
for resubmission
1025 re-synchronise the ehive
1029 detect all workers in UNKWN state and reset their Jobs
for resubmission (careful, they *may* reincarnate!)
1033 shut everything down: block all beekeepers connected to the pipeline and terminate workers
1037 tell the database all workers are dead (no checks are performed in
this mode, so be very careful!)
1039 =item --balance_semaphores
1041 set all Semaphore counters to the numbers of unDONE fan Jobs (emergency use only)
1043 =item --worker_stats
1045 show status of each running Worker
1049 show all failed Jobs
1051 =item --job_output <job_id>
1053 print details
for one Job
1055 =item --reset_job_id <num>
1057 reset a Job back to READY so it can be rerun
1059 =item --reset_failed_jobs
1061 reset FAILED Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1063 =item --reset_done_jobs
1065 reset DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1067 =item --reset_all_jobs
1069 reset FAILED, DONE and PASSED_ON Jobs of analyses matching -analyses_pattern back to READY so they can be rerun
1071 =item --forgive_failed_jobs
1073 mark FAILED Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1075 =item --discard_ready_jobs
1077 mark READY Jobs of analyses matching -analyses_pattern as DONE, and update their Semaphores. NOTE: This does not make them dataflow
1079 =item --unblock_semaphored_jobs
1081 set SEMAPHORED Jobs of analyses matching -analyses_pattern to READY so they can start
1087 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
1088 Copyright [2016-2024] EMBL-European Bioinformatics Institute
1090 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
1091 You may obtain a copy of the License at
1095 Unless required by applicable law or agreed to in writing, software distributed under the License
1096 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1097 See the License
for the specific language governing permissions and limitations under the License.
1101 Please subscribe to the eHive mailing list: http: