my ($self, $specialization_arghash) = @_;
if( my $worker_log_dir = $self->log_dir ) {
$self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
$self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
}
my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
print "\n"; # to clear beekeeper's prompt in case output is not logged
$self->worker_say( $self->toString() );
$self->specialize_and_compile_wrapper( $specialization_arghash );
while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
$self->{'_interval_partial_timing'} = {};
if( my $special_batch = $self->special_batch() ) {
my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
$jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
$self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ? 'JOB_LIMIT' : 'CONTAMINATED');
} else { # a proper "BATCHES" loop
while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
my $current_role = $self->current_role;
if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
$self->worker_say( $msg );
$self->cause_of_death('CONTAMINATED');
$job_adaptor->release_undone_jobs_from_role($current_role, $msg);
} elsif( $self->job_limiter->reached()) {
$self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
$self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
$self->cause_of_death('LIFESPAN');
} else {
# No need to refresh the stats or the hive_current_load # since it's all been refreshed in
# specialize_and_compile_wrapper()
my $stats = $current_role->analysis->stats;
my $desired_batch_size = $stats->get_or_estimate_batch_size();
my $hit_the_limit; # dummy at the moment
($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
if($self->debug) {
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
"Claiming: ready_job_count=".$stats->ready_job_count
.", num_running_workers=".$stats->num_running_workers
.", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
'INFO' );
}
if(scalar(@$actual_batch)) {
my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
$jobs_done_by_batches_loop += $jobs_done_by_this_batch;
$self->job_limiter->final_decision( $jobs_done_by_this_batch );
} else {
$self->cause_of_death('NO_WORK');
}
}
}
}
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
#
if($jobs_done_by_batches_loop) {
$self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->current_role->analysis->dbID,
$jobs_done_by_batches_loop,
$batches_stopwatch->get_elapsed,
$self->{'_interval_partial_timing'}{'FETCH_INPUT'} || 0,
$self->{'_interval_partial_timing'}{'RUN'} || 0,
$self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
);
}
# A mechanism whereby workers can be caused to exit even if they were doing fine:
if (!$self->cause_of_death) {
# We're here after having run a batch, so we need to refresh the stats
my $analysis = $self->current_role->analysis;
my $stats = $analysis->stats;
if ( $stats->refresh($self->refresh_tolerance_seconds) ) { # if we DID refresh
$self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
$stats->hive_pipeline->invalidate_hive_current_load;
if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
) {
$self->cause_of_death('HIVE_OVERLOAD');
}
}
}
my $cod = $self->cause_of_death() || '';
if( $cod eq 'NO_WORK') {
$self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id, 'ALL_CLAIMED' );
}
# Respecialize if:
# 1) No work to do (computed across all
# 2) allowed to by the command-line option
# 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses !
if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{'-analyses_pattern'} or $specialization_arghash->{'-analyses_pattern'}!~/^\w+$/) ) {
my $old_role = $self->current_role;
$self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
$self->current_role( undef );
$self->cause_of_death(undef);
$self->specialize_and_compile_wrapper( $specialization_arghash );
}
} # /Worker's lifespan loop
# The second argument ("update_when_checked_in") is set to force an
# update of the "when_checked_in" timestamp in the worker table
$self->adaptor->register_worker_death($self, 1);
if($self->debug) {
$self->worker_say( 'AnalysisStats : '.$self->current_role->analysis->stats->toString ) if( $self->current_role );
$self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
}
$self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
if( $self->log_dir ) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
}
}