9 Object which encapsulates the details of how to find jobs, how to
run those
10 jobs, and then check the rules to create the next jobs in the chain.
11 Essentially knows where to find data, how to process data, and where to
12 put it when it is done (put in next persons INBOX) so the next Worker
13 in the chain can find data to work on.
15 Hive based processing is a concept based on a more controlled version
16 of an autonomous agent type system. Each worker is not told what to
do
17 (like a centralized control system - like the current pipeline system)
18 but rather queries a central database
for jobs (give me jobs).
20 Each worker is linked to an analysis_id, registers its
self on creation
21 into the Hive, creates a RunnableDB instance of the Analysis->module,
22 gets relevant configuration information from the database, does its
23 work, creates the next layer of job entries by interfacing to
24 the DataflowRuleAdaptor to determine the analyses it needs to pass its
25 output data to and creates jobs on the database of the next analysis.
26 It repeats
this cycle until it has lived its lifetime or until there are no
27 more jobs left to process.
28 The lifetime limit is a safety limit to prevent these from
'infecting'
29 a system and sitting on a compute node
for longer than is socially exceptable.
30 This is primarily needed on compute resources like an LSF system where jobs
31 are not preempted and
run until they are done.
33 The Queens primary job is to create Workers to get the work down.
34 As part of
this, she is also responsible
for summarizing the status of the
35 analyses by querying the jobs, summarizing, and updating the
36 analysis_stats table. From
this she is also responsible
for monitoring and
37 'unblocking' analyses via the analysis_ctrl_rules.
38 The Queen is also responsible
for freeing up jobs that were claimed by Workers
39 that died unexpectantly so that other workers can take over the work.
41 The Beekeeper is in charge of interfacing between the Queen and a compute resource
42 or
'compute farm'. Its job is to query Queens
if they need any workers and to
43 send the requested number of workers to open machines via the runWorker.pl script.
44 It is also responsible
for interfacing with the Queen to identify workers which died
45 unexpectantly so that she can free the dead workers unfinished jobs.
49 See the NOTICE file distributed with
this work
for additional information
50 regarding copyright ownership.
52 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
53 You may obtain a copy of the License at
57 Unless required by applicable law or agreed to in writing, software distributed under the License
58 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59 See the License
for the specific language governing permissions and limitations under the License.
63 Please subscribe to the
Hive mailing list: http:
67 The rest of the documentation details each of the
object methods.
68 Internal methods are usually preceded with a _
72 package Bio::EnsEMBL::Hive::Worker;
78 use File::Path
'make_path';
87 use base (
'Bio::EnsEMBL::Hive::Storable' );
91 ## How often we should refresh the AnalysisStats objects
92 sub refresh_tolerance_seconds {
96 sub worker_error_threshold {
104 resource_class_id / resource_class
113 $lifespan_stopwatch->_unit(1); # count in seconds (
default is milliseconds)
114 $lifespan_stopwatch->restart;
115 $self->lifespan_stopwatch( $lifespan_stopwatch );
121 ## Storable object's getters/setters:
126 $self->{
'_meadow_type'} = shift
if(@_);
127 return $self->{
'_meadow_type'};
133 $self->{
'_meadow_name'} = shift
if(@_);
134 return $self->{
'_meadow_name'};
140 $self->{
'_meadow_host'} = shift
if(@_);
141 return $self->{
'_meadow_host'};
147 $self->{
'_meadow_user'} = shift
if(@_);
148 return $self->{
'_meadow_user'};
154 $self->{
'_process_id'} = shift
if(@_);
155 return $self->{
'_process_id'};
161 $self->{
'_work_done'} = shift
if(@_);
162 return $self->{
'_work_done'} || 0;
168 $self->{
'_status'} = shift
if(@_);
169 return $self->{
'_status'};
175 $self->{
'_beekeeper_id'} = shift
if(@_);
176 return $self->{
'_beekeeper_id'} || undef;
182 $self->{
'_when_submitted'} = shift
if(@_);
183 return $self->{
'_when_submitted'};
187 sub seconds_since_when_submitted {
189 $self->{
'_seconds_since_when_submitted'} = shift
if(@_);
190 return $self->{
'_seconds_since_when_submitted'};
196 $self->{
'_when_born'} = shift
if(@_);
197 return $self->{
'_when_born'};
201 sub when_checked_in {
203 $self->{
'_when_checked_in'} = shift
if(@_);
204 return $self->{
'_when_checked_in'};
210 $self->{
'_when_seen'} = shift
if(@_);
211 return $self->{
'_when_seen'};
217 $self->{
'_when_died'} = shift
if(@_);
218 return $self->{
'_when_died'};
224 $self->{
'_cause_of_death'} = shift
if(@_);
225 return $self->{
'_cause_of_death'};
231 Arg [1] : (optional)
string directory path
233 Usage : $worker_log_dir = $self->log_dir;
234 $self->log_dir($worker_log_dir);
235 Description: Storable getter/setter attribute
for the directory where STDOUT and STRERR of the worker will be redirected to.
236 In
this directory each job will have its own .out and .err files.
243 $self->{
'_log_dir'} = shift
if(@_);
244 return $self->{
'_log_dir'};
248 =head2 temp_directory_name
250 Arg [1] : (optional)
string directory path
251 Title : temp_directory_name
252 Usage : $worker_tmp_dir = $self->temp_directory_name;
253 $self->temp_directory_name($worker_tmp_dir);
254 Description: Storable getter/setter attribute
for the directory where jobs can store temporary data.
259 sub temp_directory_name {
261 $self->{
'_tmp_dir'} = shift
if(@_);
262 return $self->{
'_tmp_dir'};
266 ## Non-Storable attributes:
272 if( my $from_analysis = $self->{
'_current_role'} && $self->{
'_current_role'}->analysis ) {
273 $self->worker_say(
"unspecializing from ".$from_analysis->logic_name.
'('.$from_analysis->dbID.
')' );
275 my $new_role = shift @_;
276 if( my $to_analysis = $new_role && $new_role->analysis ) {
277 $self->worker_say(
"specializing to " . $to_analysis->logic_name .
'('.($to_analysis->dbID
279 $self->{
'_current_role'} = $new_role;
281 return $self->{
'_current_role'};
287 $self->{
'_debug'} = shift
if(@_);
288 $self->{
'_debug'}=0 unless(defined($self->{
'_debug'}));
289 return $self->{
'_debug'};
295 $self->{
'_execute_writes'} = shift
if(@_);
296 $self->{
'_execute_writes'}=1 unless(defined($self->{
'_execute_writes'}));
297 return $self->{
'_execute_writes'};
303 $self->{
'_special_batch'} = shift
if(@_);
304 return $self->{
'_special_batch'};
308 sub perform_cleanup {
310 $self->{
'_perform_cleanup'} = shift
if(@_);
311 $self->{
'_perform_cleanup'} = 1 unless(defined($self->{
'_perform_cleanup'}));
312 return $self->{
'_perform_cleanup'};
316 # this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
318 sub retry_throwing_jobs {
321 $self->{
'_retry_throwing_jobs'} = shift @_
if(@_);
322 return defined($self->{
'_retry_throwing_jobs'}) ? $self->{
'_retry_throwing_jobs'} : 1;
326 sub can_respecialize {
328 $self->{
'_can_respecialize'} = shift
if(@_);
329 return $self->{
'_can_respecialize'};
335 Arg [1] : (optional) integer $value (in seconds)
337 Usage : $value = $self->life_span;
338 $self->life_span($new_value);
339 Description: Defines the maximum time a worker can live
for. Workers are always
340 allowed to complete the jobs they get, but whether they can
341 do multiple rounds of work is limited by their life_span
342 DefaultValue : 3600 (60 minutes)
343 Returntype : integer scalar
347 sub life_span { #
default life_span = 60minutes
348 my ($self, $value) = @_;
350 if(defined($value)) { # you can still set it to 0 and avoid having the limit on lifespan
351 $self->{
'_life_span'} = $value;
352 } elsif(!defined($self->{
'_life_span'})) {
353 $self->{
'_life_span'} = 60*60;
355 return $self->{
'_life_span'};
358 sub lifespan_stopwatch {
362 $self->{
'_lifespan_stopwatch'} = shift @_;
364 return $self->{
'_lifespan_stopwatch'};
367 sub life_span_limit_reached {
370 if( $self->life_span() ) {
371 my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
372 if($alive_for_secs > $self->life_span() ) {
373 return $alive_for_secs;
383 Arg [1] : (optional) integer $value
384 Usage : $limiter_obj = $self->job_limiter;
385 $self->job_limiter($new_value);
386 Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
387 A worker
'dies' when either the
'life_span' or
'job_limit' is exceeded.
388 DefaultValue : undef (relies on life_span to limit life of worker)
389 Returntype : Hive::Limiter
object
395 if( scalar(@_) or !defined($self->{
'_job_limiter'}) ) {
398 return $self->{
'_job_limiter'};
403 my ($self, $job_partial_timing) = @_;
405 $self->{
'_work_done'}++;
407 while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
408 $self->{
'_interval_partial_timing'}{$state} += $partial_timing_in_state;
413 # By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
415 # Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
416 # but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.
421 $self->{
'_prev_job_error'} = shift
if(@_);
422 return $self->{
'_prev_job_error'};
425 sub runnable_object {
428 $self->{
'_runnable_object'} = shift @_
if(@_);
429 return $self->{
'_runnable_object'};
433 sub get_stdout_redirector {
439 sub get_stderr_redirector {
447 my ($self, $msg) = @_;
449 unless ($self->adaptor) {
450 print
"Standalone worker $$ : $msg\n";
454 my $worker_id = $self->dbID();
455 my $current_role = $self->current_role;
456 my $job_id = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
457 print
"Worker $worker_id [ ". ( $current_role
458 ? (
'Role '.$current_role->dbID.
' , '.$current_role->analysis->logic_name.
'('.$current_role->analysis_id.
')'
459 . ($job_id ?
", Job $job_id" :
'')
467 my ($self, $include_analysis) = @_;
469 my $current_role = $self->current_role;
472 $include_analysis ? (
'analysis='.($current_role ? $current_role->analysis->logic_name.
'('.$current_role->analysis_id.
')' :
'UNSPECIALIZED') ) : (),
473 'resource_class_id='.($self->resource_class_id
474 'meadow='.$self->meadow_type.
'/'.$self->meadow_name,
475 'process='.$self->meadow_user.($self->meadow_host ?
'@'.$self->meadow_host :
'UNALLOCATED').
'#'.$self->process_id,
476 'when_checked_in='.($self->when_checked_in
477 'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() :
'UNSPECIALIZED'),
478 'job_limit='.($self->job_limiter->available_capacity()
479 'life_span='.($self->life_span
480 'worker_log_dir='.($self->log_dir
485 ###############################
489 ###############################
495 Usage : $worker->run;
497 This is a
self looping autonomous
function to process jobs.
498 First all STDOUT/STDERR is rediected, then looping commences.
501 2) processing those jobs through an instance of the
'module class' of
502 the analysis asigned to
this worker,
503 3) updating the job, analysis_stats, and hive tables to track the
504 progress of the job, the analysis and
this worker.
505 Looping stops when any one of these are met:
506 1) there is no more jobs to process
507 2) job_limit is reached
508 3) life_span has been reached.
514 my ($self, $specialization_arghash) = @_;
516 if( my $worker_log_dir = $self->log_dir ) {
517 $self->get_stdout_redirector->push( $worker_log_dir.
'/worker.out' );
518 $self->get_stderr_redirector->push( $worker_log_dir.
'/worker.err' );
522 my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
524 print
"\n"; # to clear beekeeper
's prompt in case output is not logged
525 $self->worker_say( $self->toString() );
526 $self->specialize_and_compile_wrapper( $specialization_arghash );
528 while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies
for any reason)
531 my $jobs_done_by_batches_loop = 0; # by all iterations of
internal loop
532 $self->{
'_interval_partial_timing'} = {};
534 if( my $special_batch = $self->special_batch() ) {
535 my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
536 $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
537 $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ?
'JOB_LIMIT' :
'CONTAMINATED');
538 }
else { # a proper
"BATCHES" loop
540 while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
541 my $current_role = $self->current_role;
543 if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
544 my $msg =
"Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
545 $self->worker_say( $msg );
546 $self->cause_of_death(
'CONTAMINATED');
547 $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
549 } elsif( $self->job_limiter->reached()) {
550 $self->worker_say(
"job_limit reached (".$self->work_done.
" jobs completed)" );
551 $self->cause_of_death(
'JOB_LIMIT');
553 } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
554 $self->worker_say(
"life_span limit reached (alive for $alive_for_secs secs)" );
555 $self->cause_of_death(
'LIFESPAN');
558 # No need to refresh the stats or the hive_current_load # since it's all been refreshed in
559 # specialize_and_compile_wrapper()
560 my $stats = $current_role->analysis->stats;
561 my $desired_batch_size = $stats->get_or_estimate_batch_size();
562 my $hit_the_limit; # dummy at the moment
563 ($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
565 my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
568 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
569 "Claiming: ready_job_count=".$stats->ready_job_count
570 .
", num_running_workers=".$stats->num_running_workers
571 .
", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
575 if(scalar(@$actual_batch)) {
576 my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
577 $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
578 $self->job_limiter->final_decision( $jobs_done_by_this_batch );
580 $self->cause_of_death(
'NO_WORK');
586 # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
587 # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
589 if($jobs_done_by_batches_loop) {
591 $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
592 $self->current_role->analysis->dbID,
593 $jobs_done_by_batches_loop,
594 $batches_stopwatch->get_elapsed,
595 $self->{
'_interval_partial_timing'}{
'FETCH_INPUT'} || 0,
596 $self->{
'_interval_partial_timing'}{
'RUN'} || 0,
597 $self->{
'_interval_partial_timing'}{
'WRITE_OUTPUT'} || 0,
601 # A mechanism whereby workers can be caused to exit even if they were doing fine:
602 if (!$self->cause_of_death) {
603 # We're here after having run a batch, so we need to refresh the stats
604 my $analysis = $self->current_role->analysis;
605 my $stats = $analysis->stats;
606 if ( $stats->refresh($self->refresh_tolerance_seconds) ) { #
if we DID refresh
607 $self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
608 $stats->hive_pipeline->invalidate_hive_current_load;
609 if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
610 or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
612 $self->cause_of_death(
'HIVE_OVERLOAD');
617 my $cod = $self->cause_of_death() ||
'';
619 if( $cod eq
'NO_WORK') {
620 $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id,
'ALL_CLAIMED' );
624 # 1) No work to do (computed across all
625 # 2) allowed to by the command-line option
626 # 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses !
627 if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{
'-analyses_pattern'} or $specialization_arghash->{
'-analyses_pattern'}!~/^\w+$/) ) {
628 my $old_role = $self->current_role;
629 $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
630 $self->current_role( undef );
631 $self->cause_of_death(undef);
632 $self->specialize_and_compile_wrapper( $specialization_arghash );
635 } # /Worker
's lifespan loop
637 # The second argument ("update_when_checked_in") is set to force an
638 # update of the "when_checked_in" timestamp in the worker table
639 $self->adaptor->register_worker_death($self, 1);
642 $self->worker_say( 'AnalysisStats :
'.$self->current_role->analysis->stats->toString ) if( $self->current_role );
643 $self->worker_say( 'dbc
'.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles
' );
646 $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
648 if( $self->log_dir ) {
649 $self->get_stdout_redirector->pop();
650 $self->get_stderr_redirector->pop();
655 sub specialize_and_compile_wrapper {
656 my ($self, $specialization_arghash) = @_;
659 $self->enter_status('SPECIALIZATION
');
660 $self->adaptor->specialize_worker( $self, $specialization_arghash );
665 $self->worker_say( "specialization failed:\t$msg" );
667 $self->cause_of_death('SEE_MSG
') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
670 if ($self->cause_of_death() eq "NO_ROLE") {
671 $message_class = 'INFO
';
673 $message_class = 'WORKER_ERROR
'
676 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $message_class );
679 if( !$self->cause_of_death() ) {
680 $self->compile_runnable;
684 sub compile_runnable {
687 $self->enter_status('COMPILATION
');
689 my $current_analysis = $self->current_role->analysis;
690 my $runnable_object = $current_analysis->get_compiled_module_name->new($self->debug, $current_analysis->language, $current_analysis->module) # Only GuestProcess will read the arguments
691 or die "Unknown compilation error";
693 $runnable_object->worker( $self );
695 $self->runnable_object( $runnable_object );
696 $self->enter_status('READY
');
701 $self->handle_compilation_failure($last_err);
705 sub handle_compilation_failure {
706 my ($self, $msg) = @_;
707 $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" );
708 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 'WORKER_ERROR
' );
710 $self->cause_of_death('SEE_MSG
') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
712 $self->check_analysis_for_exclusion();
716 my ($self, $jobs, $is_special_batch) = @_;
718 my $jobs_done_here = 0;
720 my $current_role = $self->current_role;
721 my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
722 my $stats = $current_role->analysis->stats; # cache it to avoid reloading
724 $self->adaptor->check_in_worker( $self );
725 $self->adaptor->safe_synchronize_AnalysisStats( $stats );
728 $self->worker_say( 'AnalysisStats :
' . $stats->toString );
729 $self->worker_say( 'claimed
'.scalar(@{$jobs}).' jobs to process
' );
732 my $job_partial_timing;
734 ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
736 my $job_id = $job->dbID();
737 $self->worker_say( $job->toString ) if($self->debug);
739 my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
740 $job_partial_timing = {};
742 $self->start_job_output_redirection($job); # switch logging into job's STDERR
743 eval { # capture any
throw/die
744 my $runnable_object = $self->runnable_object();
745 $runnable_object->input_job( $job ); #
"take" the job
748 $self->adaptor->db->dbc->query_count(0);
749 $job_stopwatch->restart();
751 $job->load_parameters( $runnable_object );
753 $self->worker_say(
"Job $job_id unsubstituted_params= ".stringify($job->{
'_unsubstituted_param_hash'}) )
if($self->debug());
755 $job_partial_timing = $runnable_object->life_cycle();
758 $job->died_somewhere( $job->incomplete ); # it will be OR
'd inside
759 Bio::EnsEMBL::Hive::Process::warning($self->runnable_object, $msg, $job->incomplete?'WORKER_ERROR
':'INFO
'); # In case the Runnable has redefined warning()
762 # whether the job completed successfully or not:
763 $self->runnable_object->input_job( undef ); # release an extra reference to the job
764 $job->runtime_msec( $job_stopwatch->get_elapsed );
765 $job->query_count( $self->adaptor->db->dbc->query_count );
767 my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died
' : 'complete
' );
769 print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
770 $self->stop_job_output_redirection($job); # and then we
switch back to worker
's STDERR
771 $self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
773 $self->current_role->register_attempt( ! $job->died_somewhere );
775 if($job->died_somewhere) {
776 # Both flags default to 1, meaning that jobs would by default be retried.
777 # If the job specifically said not to retry, or if the worker is configured
778 # not to retry jobs, follow their wish.
779 my $may_retry = $job->transient_error && $self->retry_throwing_jobs;
781 $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
783 if( $self->prev_job_error # a bit of AI:
if the previous job failed as well, it is LIKELY that we have contamination
784 or $job->lethal_for_worker ) { # trust the job
's expert knowledge
785 my $reason = $self->prev_job_error ? 'two failed jobs in a row
'
786 : 'suggested by job itself
';
787 $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die
" );
788 $self->cause_of_death('CONTAMINATED');
791 } else { # job successfully completed:
792 $self->more_work_done( $job_partial_timing );
794 $job->set_and_update_status('DONE');
796 if( my $controlled_semaphore = $job->controlled_semaphore ) {
797 $controlled_semaphore->decrease_by( [ $job ] );
800 if($job->lethal_for_worker) {
801 $self->worker_say( "The Job, although complete, wants the Worker to die
" );
802 $self->cause_of_death('CONTAMINATED');
807 $self->prev_job_error( $job->died_somewhere );
808 $self->enter_status('READY');
810 # UNCLAIM THE SURPLUS:
811 my $remaining_jobs_in_batch = scalar(@$jobs);
812 if( !$is_special_batch and $remaining_jobs_in_batch and $stats->refresh( $self->refresh_tolerance_seconds ) ) { # if we DID refresh
813 my $ready_job_count = $stats->ready_job_count;
814 $stats->hive_pipeline->invalidate_hive_current_load;
815 my $optimal_batch_now = $stats->get_or_estimate_batch_size( $remaining_jobs_in_batch );
816 my $jobs_to_unclaim = $remaining_jobs_in_batch - $optimal_batch_now;
818 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
819 "Check-point: rdy=$ready_job_count, rem=$remaining_jobs_in_batch,
"
820 . "opt=$optimal_batch_now, 2unc=$jobs_to_unclaim
",
823 if( $jobs_to_unclaim > 0 ) {
824 # FIXME: a faster way would be to unclaim( splice(@$jobs, -$jobs_to_unclaim) ); # unclaim the last $jobs_to_unclaim elements
825 # currently we just dump all the remaining jobs and prepare to take a fresh batch:
826 $job->adaptor->release_claimed_jobs_from_role( $current_role );
828 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "Unclaimed $jobs_to_unclaim jobs (trimming the tail)
", 'INFO' );
832 } # /while(my $job = shift @$jobs)
834 return $jobs_done_here;
838 sub set_and_update_status {
839 my ($self, $status ) = @_;
841 $self->status($status);
843 if(my $adaptor = $self->adaptor) {
844 $adaptor->check_in_worker( $self );
850 my ($self, $status) = @_;
853 $self->worker_say( '-> '.$status );
856 $self->set_and_update_status( $status );
860 sub start_job_output_redirection {
861 my ($self, $job) = @_;
863 if(my $worker_log_dir = $self->log_dir) {
864 $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
865 $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
867 if(my $job_adaptor = $job->adaptor) {
868 $job_adaptor->store_out_files($job);
874 sub stop_job_output_redirection {
875 my ($self, $job) = @_;
878 $self->get_stdout_redirector->pop();
879 $self->get_stderr_redirector->pop();
881 my $force_cleanup = !($self->debug || $job->died_somewhere);
883 if($force_cleanup or -z $job->stdout_file) {
884 $self->worker_say( "Deleting
'".$job->stdout_file."' file
" );
885 unlink $job->stdout_file;
886 $job->stdout_file(undef);
888 if($force_cleanup or -z $job->stderr_file) {
889 $self->worker_say( "Deleting
'".$job->stderr_file."' file
" );
890 unlink $job->stderr_file;
891 $job->stderr_file(undef);
894 if(my $job_adaptor = $job->adaptor) {
895 $job_adaptor->store_out_files($job);
900 sub check_analysis_for_exclusion {
901 my $self = shift(@_);
902 my $worker_errors_this_analysis =
903 $self->adaptor->db->get_LogMessageAdaptor()->count_analysis_events(
904 $self->current_role->analysis_id,
906 # warn "There are $worker_errors_this_analysis worker errors
for this analysis\n
";
907 if ($worker_errors_this_analysis > $self->worker_error_threshold) {
908 my $current_logic_name = $self->current_role->analysis->logic_name;
909 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "setting analysis
'$current_logic_name' to excluded
", 'INFO' );
910 $self->current_role->analysis->stats->is_excluded(1);
911 $self->adaptor->db->get_AnalysisStatsAdaptor->update_is_excluded($self->current_role->analysis->stats);
915 sub set_log_directory_name {
916 my ($self, $hive_log_dir, $worker_log_dir) = @_;
918 return unless ($hive_log_dir or $worker_log_dir);
920 my $dir_revhash = dir_revhash($self->dbID // ''); # Database-less workers are not hashed
921 $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/
" : '') . ($self->adaptor ? 'worker_id_' . $self->dbID : 'standalone/worker_pid_' . $self->process_id);
924 make_path( $worker_log_dir );
926 } or die "Could not create
'$worker_log_dir' directory : $
@";
928 $self->log_dir( $worker_log_dir );
929 $self->adaptor->update_log_dir( $self ) if $self->adaptor; # autoloaded
933 =head2 set_temp_directory_name
935 Title : set_temp_directory_name
936 Description : Generates and sets the name of a temporary directory suitable for this worker.
937 It will be under the base directory requested by $base_temp_dir, or the standard
938 location otherwise (as advised by File::Spec), and includes worker attributes
939 to make the path unique.
943 sub set_temp_directory_name {
944 my ($self, $base_temp_dir) = @_;
946 $base_temp_dir //= File::Spec->tmpdir();
948 my $temp_directory_name;
949 if ($self->adaptor) {
950 $temp_directory_name = sprintf('%s/worker_%s_%s.%s/', $base_temp_dir, $self->meadow_user, $self->hive_pipeline->hive_pipeline_name, $self->dbID);
952 $temp_directory_name = sprintf('%s/worker_%s.standalone.%s/', $base_temp_dir, $self->meadow_user, $self->process_id);
955 $self->temp_directory_name( $temp_directory_name );
956 $self->adaptor->update_temp_directory_name( $self ) if $self->adaptor; # autoloaded