my $self = shift @_;
my %flags = @_;
my ($preregistered, $resource_class_id, $resource_class_name, $beekeeper_id,
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize,
$worker_delay_startup_seconds, $worker_crash_on_startup_prob, $config_files)
= @flags{qw(-preregistered -resource_class_id -resource_class_name -beekeeper_id
-no_write -debug -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize
-worker_delay_startup_seconds -worker_crash_on_startup_prob -config_files)};
sleep( $worker_delay_startup_seconds
if( defined( $worker_crash_on_startup_prob ) ) {
if( rand(1) < $worker_crash_on_startup_prob ) {
die "This is a requested crash of the Worker (with probability=$worker_crash_on_startup_prob)";
}
}
die "Valley is not fully defined" unless ($meadow && $process_id && $meadow_host && $meadow_user);
my $meadow_type = $meadow->type;
my $meadow_name = $meadow->cached_name;
foreach my $prev_worker_incarnation (@{ $self->find_previous_worker_incarnations($meadow_type, $meadow_name, $process_id) }) {
# So far 'RELOCATED events' has been detected on LSF 9.0 in response to sending signal #99 or #100
# Since I don't know how to avoid them, I am trying to register them when they happen.
# The following snippet buries the previous incarnation of the Worker before starting a new one.
#
# FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
# LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
# So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
$prev_worker_incarnation->cause_of_death( 'RELOCATED' );
$self->register_worker_death( $prev_worker_incarnation );
}
my $worker;
if($preregistered) {
my $max_registration_seconds = $meadow->config_get('MaxRegistrationSeconds');
my $seconds_waited = 0;
my $seconds_more = 5; # step increment
until( $worker = $self->fetch_preregistered_worker($meadow_type, $meadow_name, $process_id) ) {
my $log_message_adaptor = $self->db->get_LogMessageAdaptor;
if( defined($max_registration_seconds) and ($seconds_waited > $max_registration_seconds) ) {
my $msg = "Preregistered Worker $meadow_type/$meadow_name:$process_id timed out waiting to occupy its entry, bailing out";
$log_message_adaptor->store_hive_message($msg, 'WORKER_ERROR' );
die $msg;
} else {
$log_message_adaptor->store_hive_message("Preregistered Worker $meadow_type/$meadow_name:$process_id waiting $seconds_more more seconds to fetch itself...", 'WORKER_CAUTION' );
sleep($seconds_more);
$seconds_waited += $seconds_more;
}
}
# only update the fields that were not available at the time of submission:
$worker->meadow_host( $meadow_host );
$worker->meadow_user( $meadow_user );
$worker->when_born( 'CURRENT_TIMESTAMP' );
$worker->status( 'READY' );
$self->update( $worker );
} else {
my $resource_class;
if( defined($resource_class_name) ) {
$resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('name' => $resource_class_name)
or die "resource_class with name='$resource_class_name' could not be fetched from the database";
} elsif( defined($resource_class_id) ) {
$resource_class = $self->db->hive_pipeline->collection_of('ResourceClass')->find_one_by('dbID', $resource_class_id)
or die "resource_class with dbID='$resource_class_id' could not be fetched from the database";
}
'meadow_type' => $meadow_type,
'meadow_name' => $meadow_name,
'process_id' => $process_id,
'resource_class' => $resource_class,
'beekeeper_id' => $beekeeper_id,
'meadow_host' => $meadow_host,
'meadow_user' => $meadow_user,
);
if (ref($self)) {
$self->store( $worker );
$worker->when_born( 'CURRENT_TIMESTAMP' );
$self->update_when_born( $worker );
$self->refresh( $worker );
}
}
$worker->set_log_directory_name($hive_log_dir, $worker_log_dir);
$worker->init;
if(defined($job_limit)) {
$worker->job_limiter($job_limit);
$worker->life_span(0);
}
$worker->life_span($life_span * 60) if($life_span); # $life_span min -> sec
$worker->execute_writes(0) if($no_write);
$worker->perform_cleanup(0) if($no_cleanup);
$worker->debug($debug) if($debug);
$worker->retry_throwing_jobs($retry_throwing_jobs) if(defined $retry_throwing_jobs);
$worker->can_respecialize($can_respecialize) if(defined $can_respecialize);
return $worker;
}