sub main {
$|=1; # make STDOUT unbuffered (STDERR is unbuffered anyway)
# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = {};
my $help = 0;
my $report_versions = 0;
my $loopit = 0;
my $sync = 0;
my $local = 0;
my $show_failed_jobs = 0;
my $default_meadow_type = undef;
my $submit_workers_max = undef;
my $total_running_workers_max = undef;
my $submission_options = undef;
my $run = 0;
my $run_job_id = undef;
my $force = undef;
my $check_for_dead = 0;
my $bury_unkwn_workers = 0;
my $all_dead = 0;
my $balance_semaphores = 0;
my $job_id_for_output = 0;
my $show_worker_stats = 0;
my $kill_worker_id = 0;
my $big_red_button = 0;
my $keep_alive = 0; # DEPRECATED
my $reset_job_id = 0;
my $reset_all_jobs_for_analysis = 0; # DEPRECATED
my $reset_failed_jobs_for_analysis = 0; # DEPRECATED
my $reset_all_jobs = 0; # Mark DONE, PASSED_ON and FAILED jobs to READY
my $reset_failed_jobs = 0; # Mark FAILED jobs to READY
my $reset_done_jobs = 0; # Mark DONE and PASSED_ON jobs to READY
my $unblock_semaphored_jobs = 0; # Mark SEMAPHORED jobs to READY
my $forgive_failed_jobs = 0; # Mark FAILED jobs to DONE
my $discard_ready_jobs = 0; # Mark READY jobs to DONE
$self->{'url'} = undef;
$self->{'reg_conf'} = undef;
$self->{'reg_type'} = undef;
$self->{'reg_alias'} = undef;
$self->{'nosqlvc'} = undef;
$self->{'config_files'} = [];
$self->{'sleep_minutes'} = 1;
$self->{'max_loops'} = 0;
$self->{'retry_throwing_jobs'} = undef;
$self->{'loop_until'} = undef;
$self->{'can_respecialize'} = 1;
$self->{'hive_log_dir'} = undef;
$self->{'submit_log_dir'} = undef;
$self->{'worker_delay_startup_seconds'} = undef;
$self->{'worker_crash_on_startup_prob'} = undef;
# store all the options passed on the command line for registration
# as GetOptions modifies @ARGV
my @original_argv = @ARGV;
GetOptions(
# connection parameters
'url=s' => \$self->{'url'},
'reg_conf|regfile|reg_file=s' => \$self->{'reg_conf'},
'reg_type=s' => \$self->{'reg_type'},
'reg_alias|regname|reg_name=s' => \$self->{'reg_alias'},
'nosqlvc' => \$self->{'nosqlvc'}, # using "nosqlvc" instead of "sqlvc!" for compatibility with modules where it is a propagated option
# json config files
'config_file=s@' => $self->{'config_files'},
# loop control
'run' => \$run,
'loop' => \$loopit,
'max_loops=i' => \$self->{'max_loops'},
'loop_until=s' => \$self->{'loop_until'},
'keep_alive' => \$keep_alive,
'job_id|run_job_id=i'=> \$run_job_id,
'force!' => \$self->{'force'},
'sleep=f' => \$self->{'sleep_minutes'},
# meadow control
'local!' => \$local,
'meadow_type=s' => \$default_meadow_type,
'total_running_workers_max=i' => \$total_running_workers_max,
'submit_workers_max=i' => \$submit_workers_max,
'submission_options=s' => \$submission_options,
# worker control
'job_limit=i' => \$self->{'job_limit'},
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'analyses_pattern=s' => \$self->{'analyses_pattern'},
'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
'retry_throwing_jobs!' => \$self->{'retry_throwing_jobs'},
'can_respecialize|can_respecialise!' => \$self->{'can_respecialize'},
'debug=i' => \$self->{'debug'},
'submit_log_dir=s' => \$self->{'submit_log_dir'},
'worker_delay_startup_seconds=i' => \$self->{'worker_delay_startup_seconds'},
'worker_crash_on_startup_prob=f' => \$self->{'worker_crash_on_startup_prob'},
# other commands/options
'h|help!' => \$help,
'v|version|versions!' => \$report_versions,
'sync!' => \$sync,
'dead!' => \$check_for_dead,
'unkwn!' => \$bury_unkwn_workers,
'killworker=i' => \$kill_worker_id,
'big_red_button' => \$big_red_button,
'alldead!' => \$all_dead,
'balance_semaphores'=> \$balance_semaphores,
'worker_stats' => \$show_worker_stats,
'failed_jobs' => \$show_failed_jobs,
'reset_job_id=i' => \$reset_job_id,
'reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
'reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
'reset_failed_jobs' => \$reset_failed_jobs,
'reset_all_jobs' => \$reset_all_jobs,
'reset_done_jobs' => \$reset_done_jobs,
'discard_ready_jobs' => \$discard_ready_jobs,
'forgive_failed_jobs' => \$forgive_failed_jobs,
'unblock_semaphored_jobs' => \$unblock_semaphored_jobs,
'job_output=i' => \$job_id_for_output,
) or die "Error in command line arguments\n";
if (@ARGV) {
die "ERROR: There are invalid arguments on the command-line: ". join(" ", @ARGV). "\n";
}
if ($help) {
pod2usage({-exitvalue => 0, -verbose => 2});
}
if($report_versions) {
report_versions();
exit(0);
}
# if -keep_alive passed, ensure looping is on and loop_until is forever
if ($keep_alive) {
$self->{'loop_until'} = 'FOREVER';
$loopit = 1;
}
# if user has specified -loop_until, ensure looping is turned on
if ($self->{'loop_until'}) {
$loopit = 1;
}
# if loop_until hasn't been set by the user, or defaulted by a flag,
# set it to ANALYSIS_FAILURE
unless ($self->{'loop_until'}) {
$self->{'loop_until'} = 'ANALYSIS_FAILURE';
}
if($run or $run_job_id) {
$self->{'max_loops'} = 1;
} elsif ($loopit) {
unless($self->{'max_loops'}) {
$self->{'max_loops'} = -1; # unlimited
}
}
if($self->{'url'} or $self->{'reg_alias'}) {
-url => $self->{'url'},
-reg_conf => $self->{'reg_conf'},
-reg_type => $self->{'reg_type'},
-reg_alias => $self->{'reg_alias'},
-no_sql_schema_version_check => $self->{'nosqlvc'},
);
$self->{'dba'} = $self->{'pipeline'}->hive_dba();
} else {
die "\nERROR: Connection parameters (url or reg_conf+reg_alias) need to be specified\n";
}
$self->{'options'} = join(" ", @original_argv);
# make -loop_until case insensitive
$self->{'loop_until'} = uc($self->{'loop_until'});
my @allowed_loop_until_values = qw(ANALYSIS_FAILURE FOREVER JOB_FAILURE NO_WORK);
unless (grep {$_ eq $self->{'loop_until'}} @allowed_loop_until_values) {
die sprintf('"%s" is not a recognized value for -loop_until. Use one of %s', $self->{'loop_until'}, join('/', @allowed_loop_until_values));
}
my $pipeline_name = $self->{'pipeline'}->hive_pipeline_name;
if($pipeline_name) {
print "Pipeline name: $pipeline_name\n";
} else {
print STDERR "+---------------------------------------------------------------------+\n";
print STDERR "! !\n";
print STDERR "! WARNING: !\n";
print STDERR "! !\n";
print STDERR "! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
print STDERR "! This may seriously impair your beekeeping experience unless you are !\n";
print STDERR "! the only farm user. The name should be set in your PipeConfig file, !\n";
print STDERR "! or if you are running an old pipeline you can just set it by hand !\n";
print STDERR "! in the 'meta' table. !\n";
print STDERR "! !\n";
print STDERR "+---------------------------------------------------------------------+\n";
}
unless ($self->{'dba'}->dbc->has_write_access) {
my $dbc = $self->{'dba'}->dbc;
print STDERR "\n";
print STDERR "*" x 70, "\n";
print STDERR sprintf("* It appears that %s doesn't have INSERT/UPDATE/DELETE privileges\n", $dbc->username);
print STDERR sprintf("* on this database (%s). Please check the credentials\n", $dbc->dbname);
print STDERR "*\n";
print STDERR "*" x 70, "\n";
print STDERR "\n";
undef $run_job_id;
undef $reset_job_id;
undef $reset_all_jobs;
undef $reset_failed_jobs;
undef $reset_done_jobs;
undef $unblock_semaphored_jobs;
undef $forgive_failed_jobs;
undef $discard_ready_jobs;
undef $kill_worker_id;
undef $sync;
$self->{'read_only'} = 1;
}
if($run_job_id) {
$submit_workers_max = 1;
}
$default_meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
$self->{'available_meadow_list'} = $valley->get_available_meadow_list();
$valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
my $default_meadow = $valley->get_default_meadow();
print "Default meadow: ".$default_meadow->signature."\n\n";
$default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
my $queen = $self->{'dba'}->get_Queen;
if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
if($job_id_for_output) {
printf("===== Job output\n");
my $job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
print $job->toString. "\n";
}
if($reset_all_jobs_for_analysis) {
die "Deprecated option -reset_all_jobs_for_analysis. Please use -reset_all_jobs in combination with -analyses_pattern <pattern>";
}
if($reset_failed_jobs_for_analysis) {
die "Deprecated option -reset_failed_jobs_for_analysis. Please use -reset_failed_jobs in combination with -analyses_pattern <pattern>";
}
if( $self->{'logic_name'} ) { # FIXME: for now, logic_name will override analyses_pattern quietly
warn "-logic_name is now deprecated, please use -analyses_pattern that extends the functionality of -logic_name .\n";
$self->{'analyses_pattern'} = $self->{'logic_name'};
}
# May die if running within a non-LOCAL meadow
unless ($self->{'read_only'}) {
$self->{'beekeeper'} = register_beekeeper($valley, $self);
}
$self->{'logmessage_adaptor'} = $self->{'dba'}->get_LogMessageAdaptor();
# Check other beekeepers in our meadow to see if they are still alive
$self->{'beekeeper'}->adaptor->bury_other_beekeepers($self->{'beekeeper'}) unless $self->{'read_only'};
if ($kill_worker_id) {
my $kill_worker;
eval {$kill_worker = $queen->fetch_by_dbID($kill_worker_id) or die};
if ($@) {
log_and_die($self, "Could not fetch Worker with dbID='$kill_worker_id' to kill");
}
unless( $kill_worker->cause_of_death() ) {
if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
if( $meadow->check_worker_is_alive_and_mine($kill_worker) ) {
printf("Killing worker: %10d %35s %15s : ",
$kill_worker->dbID, $kill_worker->meadow_host, $kill_worker->process_id);
$meadow->kill_worker($kill_worker);
$kill_worker->cause_of_death('KILLED_BY_USER');
$queen->register_worker_death($kill_worker);
# what about clean-up? Should we do it here or not?
} else {
log_and_die($self, "According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
}
} else {
log_and_die($self, "Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill");
}
} else {
log_and_die($self, "According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill");
}
}
if ( $big_red_button ) {
return big_red_button( $self, $valley );
}
my $run_job;
if($run_job_id) {
eval {$run_job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id ) or die};
if ($@) {
log_and_die($self, "Could not fetch Job with dbID=$run_job_id.\n");
}
}
my $list_of_analyses = $run_job
? [ $run_job->analysis ]
: $self->{'pipeline'}->collection_of('Analysis')->find_all_by_pattern( $self->{'analyses_pattern'} );
if( $self->{'analyses_pattern'} ) {
if( @$list_of_analyses ) {
print "Beekeeper : the following Analyses matched your -analyses_pattern '".$self->{'analyses_pattern'}."' : "
. join(', ', map { $_->logic_name.'('.$_->dbID.')' } sort {$a->dbID <=> $b->dbID} @$list_of_analyses)
. "\nBeekeeper : ", scalar($self->{'pipeline'}->collection_of('Analysis')->list())-scalar(@$list_of_analyses), " Analyses are not shown\n\n";
} else {
log_and_die($self, "Beekeeper : the -analyses_pattern '".$self->{'analyses_pattern'}."' did not match any Analyses.\n");
}
}
my $has_task = ($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs || $unblock_semaphored_jobs || $forgive_failed_jobs || $discard_ready_jobs);
if($reset_all_jobs || $reset_failed_jobs || $reset_done_jobs) {
if (($reset_all_jobs || $reset_done_jobs) and not $self->{'analyses_pattern'}) {
log_and_die($self, "Beekeeper : do you really want to reset *all* the Jobs ? If yes, add \"-analyses_pattern '%'\" to the command line\n");
}
my $statuses_to_reset = $reset_failed_jobs ? [ 'FAILED' ] : ($reset_done_jobs ? [ 'DONE', 'PASSED_ON' ] : [ 'DONE', 'FAILED', 'PASSED_ON' ]);
$self->{'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id( $list_of_analyses, $statuses_to_reset );
}
if ($unblock_semaphored_jobs) {
$self->{'dba'}->get_AnalysisJobAdaptor->unblock_jobs_for_analysis_id( $list_of_analyses );
}
if ($discard_ready_jobs) {
$self->{'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses, 'READY' );
}
if ($forgive_failed_jobs) {
$self->{'dba'}->get_AnalysisJobAdaptor->discard_jobs_for_analysis_id( $list_of_analyses, 'FAILED' );
}
$queen->synchronize_hive( $list_of_analyses ) if $has_task;
if($all_dead) { $queen->register_all_workers_dead(); }
if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
if($bury_unkwn_workers) { $queen->check_for_dead_workers($valley, 1, 1); }
if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
my $has_error = 0;
if ($self->{'max_loops'}) { # positive $max_loop means limited, negative means unlimited
$has_error = run_autonomously($self, $self->{'pipeline'}, $self->{'max_loops'}, $self->{'loop_until'}, $valley, $list_of_analyses, $self->{'analyses_pattern'}, $run_job_id);
} else {
# the output of several methods will look differently depending on $analysis being [un]defined
if($sync) {
$queen->synchronize_hive( $list_of_analyses );
}
my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, $self->{'debug'} );
if($show_worker_stats) {
print "\n===== List of live Workers according to the Queen: ======\n";
foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
print $worker->toString(1)."\n";
}
}
$self->{'dba'}->get_RoleAdaptor->print_active_role_counts;
Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $list_of_analyses) unless $self->{'read_only'}; # show what would be submitted, but do not actually submit
if($show_failed_jobs) {
print("===== failed Jobs\n");
my $failed_job_list = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status( $list_of_analyses , 'FAILED');
foreach my $job (@{$failed_job_list}) {
print $job->toString. "\n";
}
}
$self->{'beekeeper'}->set_cause_of_death('LOOP_LIMIT') unless $self->{'read_only'};
}
exit($has_error);
}