9 Object which encapsulates the details of how to find jobs, how to
run those
10 jobs, and then check the rules to create the next jobs in the chain.
11 Essentially knows where to find data, how to process data, and where to
12 put it when it is done (put in next persons INBOX) so the next Worker
13 in the chain can find data to work on.
15 Hive based processing is a concept based on a more controlled version
16 of an autonomous agent type system. Each worker is not told what to
do
17 (like a centralized control system - like the current pipeline system)
18 but rather queries a central database
for jobs (give me jobs).
20 Each worker is linked to an analysis_id, registers its
self on creation
21 into the Hive, creates a RunnableDB instance of the Analysis->module,
22 gets relevant configuration information from the database, does its
23 work, creates the next layer of job entries by interfacing to
24 the DataflowRuleAdaptor to determine the analyses it needs to pass its
25 output data to and creates jobs on the database of the next analysis.
26 It repeats
this cycle until it has lived its lifetime or until there are no
27 more jobs left to process.
28 The lifetime limit is a safety limit to prevent these from
'infecting'
29 a system and sitting on a compute node
for longer than is socially exceptable.
30 This is primarily needed on compute resources like an LSF system where jobs
31 are not preempted and
run until they are done.
33 The Queens primary job is to create Workers to get the work down.
34 As part of
this, she is also responsible
for summarizing the status of the
35 analyses by querying the jobs, summarizing, and updating the
36 analysis_stats table. From
this she is also responsible
for monitoring and
37 'unblocking' analyses via the analysis_ctrl_rules.
38 The Queen is also responsible
for freeing up jobs that were claimed by Workers
39 that died unexpectantly so that other workers can take over the work.
41 The Beekeeper is in charge of interfacing between the Queen and a compute resource
42 or
'compute farm'. Its job is to query Queens
if they need any workers and to
43 send the requested number of workers to open machines via the runWorker.pl script.
44 It is also responsible
for interfacing with the Queen to identify workers which died
45 unexpectantly so that she can free the dead workers unfinished jobs.
49 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
50 Copyright [2016-2024] EMBL-European Bioinformatics Institute
52 Licensed under the Apache License, Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
53 You may obtain a copy of the License at
57 Unless required by applicable law or agreed to in writing, software distributed under the License
58 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59 See the License
for the specific language governing permissions and limitations under the License.
63 Please subscribe to the
Hive mailing list: http:
67 The rest of the documentation details each of the
object methods.
68 Internal methods are usually preceded with a _
73 package Bio::EnsEMBL::Hive::Worker;
78 use File::Path
'make_path';
87 use base (
'Bio::EnsEMBL::Hive::Storable' );
90 ## How often we should refresh the AnalysisStats objects
91 sub refresh_tolerance_seconds {
95 sub worker_error_threshold {
103 resource_class_id / resource_class
112 $lifespan_stopwatch->_unit(1); # count in seconds (
default is milliseconds)
113 $lifespan_stopwatch->restart;
114 $self->lifespan_stopwatch( $lifespan_stopwatch );
120 ## Storable object's getters/setters:
125 $self->{
'_meadow_type'} = shift
if(@_);
126 return $self->{
'_meadow_type'};
132 $self->{
'_meadow_name'} = shift
if(@_);
133 return $self->{
'_meadow_name'};
139 $self->{
'_meadow_host'} = shift
if(@_);
140 return $self->{
'_meadow_host'};
146 $self->{
'_meadow_user'} = shift
if(@_);
147 return $self->{
'_meadow_user'};
153 $self->{
'_process_id'} = shift
if(@_);
154 return $self->{
'_process_id'};
160 $self->{
'_work_done'} = shift
if(@_);
161 return $self->{
'_work_done'} || 0;
167 $self->{
'_status'} = shift
if(@_);
168 return $self->{
'_status'};
174 $self->{
'_beekeeper_id'} = shift
if(@_);
175 return $self->{
'_beekeeper_id'} || undef;
181 $self->{
'_when_submitted'} = shift
if(@_);
182 return $self->{
'_when_submitted'};
186 sub seconds_since_when_submitted {
188 $self->{
'_seconds_since_when_submitted'} = shift
if(@_);
189 return $self->{
'_seconds_since_when_submitted'};
195 $self->{
'_when_born'} = shift
if(@_);
196 return $self->{
'_when_born'};
200 sub when_checked_in {
202 $self->{
'_when_checked_in'} = shift
if(@_);
203 return $self->{
'_when_checked_in'};
209 $self->{
'_when_seen'} = shift
if(@_);
210 return $self->{
'_when_seen'};
216 $self->{
'_when_died'} = shift
if(@_);
217 return $self->{
'_when_died'};
223 $self->{
'_cause_of_death'} = shift
if(@_);
224 return $self->{
'_cause_of_death'};
230 Arg [1] : (optional)
string directory path
232 Usage : $worker_log_dir = $self->log_dir;
233 $self->log_dir($worker_log_dir);
234 Description: Storable getter/setter attribute
for the directory where STDOUT and STRERR of the worker will be redirected to.
235 In
this directory each job will have its own .out and .err files.
242 $self->{
'_log_dir'} = shift
if(@_);
243 return $self->{
'_log_dir'};
247 =head2 temp_directory_name
249 Arg [1] : (optional)
string directory path
250 Title : temp_directory_name
251 Usage : $worker_tmp_dir = $self->temp_directory_name;
252 $self->temp_directory_name($worker_tmp_dir);
253 Description: Storable getter/setter attribute
for the directory where jobs can store temporary data.
258 sub temp_directory_name {
260 $self->{
'_tmp_dir'} = shift
if(@_);
261 return $self->{
'_tmp_dir'};
265 ## Non-Storable attributes:
271 if( my $from_analysis = $self->{
'_current_role'} && $self->{
'_current_role'}->analysis ) {
272 $self->worker_say(
"unspecializing from ".$from_analysis->logic_name.
'('.$from_analysis->dbID.
')' );
274 my $new_role = shift @_;
275 if( my $to_analysis = $new_role && $new_role->analysis ) {
276 $self->worker_say(
"specializing to " . $to_analysis->logic_name .
'('.($to_analysis->dbID
278 $self->{
'_current_role'} = $new_role;
280 return $self->{
'_current_role'};
286 $self->{
'_debug'} = shift
if(@_);
287 $self->{
'_debug'}=0 unless(defined($self->{
'_debug'}));
288 return $self->{
'_debug'};
294 $self->{
'_execute_writes'} = shift
if(@_);
295 $self->{
'_execute_writes'}=1 unless(defined($self->{
'_execute_writes'}));
296 return $self->{
'_execute_writes'};
302 $self->{
'_special_batch'} = shift
if(@_);
303 return $self->{
'_special_batch'};
307 sub perform_cleanup {
309 $self->{
'_perform_cleanup'} = shift
if(@_);
310 $self->{
'_perform_cleanup'} = 1 unless(defined($self->{
'_perform_cleanup'}));
311 return $self->{
'_perform_cleanup'};
315 # this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
317 sub retry_throwing_jobs {
320 $self->{
'_retry_throwing_jobs'} = shift @_
if(@_);
321 return defined($self->{
'_retry_throwing_jobs'}) ? $self->{
'_retry_throwing_jobs'} : 1;
325 sub can_respecialize {
327 $self->{
'_can_respecialize'} = shift
if(@_);
328 return $self->{
'_can_respecialize'};
334 Arg [1] : (optional) integer $value (in seconds)
336 Usage : $value = $self->life_span;
337 $self->life_span($new_value);
338 Description: Defines the maximum time a worker can live
for. Workers are always
339 allowed to complete the jobs they get, but whether they can
340 do multiple rounds of work is limited by their life_span
341 DefaultValue : 3600 (60 minutes)
342 Returntype : integer scalar
346 sub life_span { #
default life_span = 60minutes
347 my ($self, $value) = @_;
349 if(defined($value)) { # you can still set it to 0 and avoid having the limit on lifespan
350 $self->{
'_life_span'} = $value;
351 } elsif(!defined($self->{
'_life_span'})) {
352 $self->{
'_life_span'} = 60*60;
354 return $self->{
'_life_span'};
357 sub lifespan_stopwatch {
361 $self->{
'_lifespan_stopwatch'} = shift @_;
363 return $self->{
'_lifespan_stopwatch'};
366 sub life_span_limit_reached {
369 if( $self->life_span() ) {
370 my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
371 if($alive_for_secs > $self->life_span() ) {
372 return $alive_for_secs;
382 Arg [1] : (optional) integer $value
383 Usage : $limiter_obj = $self->job_limiter;
384 $self->job_limiter($new_value);
385 Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
386 A worker
'dies' when either the
'life_span' or
'job_limit' is exceeded.
387 DefaultValue : undef (relies on life_span to limit life of worker)
388 Returntype : Hive::Limiter
object
394 if( scalar(@_) or !defined($self->{
'_job_limiter'}) ) {
397 return $self->{
'_job_limiter'};
402 my ($self, $job_partial_timing) = @_;
404 $self->{
'_work_done'}++;
406 while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
407 $self->{
'_interval_partial_timing'}{$state} += $partial_timing_in_state;
412 # By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
414 # Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
415 # but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.
420 $self->{
'_prev_job_error'} = shift
if(@_);
421 return $self->{
'_prev_job_error'};
424 sub runnable_object {
427 $self->{
'_runnable_object'} = shift @_
if(@_);
428 return $self->{
'_runnable_object'};
432 sub get_stdout_redirector {
438 sub get_stderr_redirector {
446 my ($self, $msg) = @_;
448 unless ($self->adaptor) {
449 print
"Standalone worker $$ : $msg\n";
453 my $worker_id = $self->dbID();
454 my $current_role = $self->current_role;
455 my $job_id = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
456 print
"Worker $worker_id [ ". ( $current_role
457 ? (
'Role '.$current_role->dbID.
' , '.$current_role->analysis->logic_name.
'('.$current_role->analysis_id.
')'
458 . ($job_id ?
", Job $job_id" :
'')
466 my ($self, $include_analysis) = @_;
468 my $current_role = $self->current_role;
471 $include_analysis ? (
'analysis='.($current_role ? $current_role->analysis->logic_name.
'('.$current_role->analysis_id.
')' :
'UNSPECIALIZED') ) : (),
472 'resource_class_id='.($self->resource_class_id
473 'meadow='.$self->meadow_type.
'/'.$self->meadow_name,
474 'process='.$self->meadow_user.
'@'.$self->meadow_host.
'#'.$self->process_id,
475 'when_checked_in='.($self->when_checked_in
476 'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() :
'UNSPECIALIZED'),
477 'job_limit='.($self->job_limiter->available_capacity()
478 'life_span='.($self->life_span
479 'worker_log_dir='.($self->log_dir
484 ###############################
488 ###############################
494 Usage : $worker->run;
496 This is a
self looping autonomous
function to process jobs.
497 First all STDOUT/STDERR is rediected, then looping commences.
500 2) processing those jobs through an instance of the
'module class' of
501 the analysis asigned to
this worker,
502 3) updating the job, analysis_stats, and hive tables to track the
503 progress of the job, the analysis and
this worker.
504 Looping stops when any one of these are met:
505 1) there is no more jobs to process
506 2) job_limit is reached
507 3) life_span has been reached.
513 my ($self, $specialization_arghash) = @_;
515 if( my $worker_log_dir = $self->log_dir ) {
516 $self->get_stdout_redirector->push( $worker_log_dir.
'/worker.out' );
517 $self->get_stderr_redirector->push( $worker_log_dir.
'/worker.err' );
521 my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
523 print
"\n"; # to clear beekeeper
's prompt in case output is not logged
524 $self->worker_say( $self->toString() );
525 $self->specialize_and_compile_wrapper( $specialization_arghash );
527 while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies
for any reason)
530 my $jobs_done_by_batches_loop = 0; # by all iterations of
internal loop
531 $self->{
'_interval_partial_timing'} = {};
533 if( my $special_batch = $self->special_batch() ) {
534 my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
535 $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
536 $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ?
'JOB_LIMIT' :
'CONTAMINATED');
537 }
else { # a proper
"BATCHES" loop
539 while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
540 my $current_role = $self->current_role;
542 if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
543 my $msg =
"Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
544 $self->worker_say( $msg );
545 $self->cause_of_death(
'CONTAMINATED');
546 $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
548 } elsif( $self->job_limiter->reached()) {
549 $self->worker_say(
"job_limit reached (".$self->work_done.
" jobs completed)" );
550 $self->cause_of_death(
'JOB_LIMIT');
552 } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
553 $self->worker_say(
"life_span limit reached (alive for $alive_for_secs secs)" );
554 $self->cause_of_death(
'LIFESPAN');
557 # No need to refresh the stats or the hive_current_load # since it's all been refreshed in
558 # specialize_and_compile_wrapper()
559 my $stats = $current_role->analysis->stats;
560 my $desired_batch_size = $stats->get_or_estimate_batch_size();
561 my $hit_the_limit; # dummy at the moment
562 ($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
564 my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
567 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
568 "Claiming: ready_job_count=".$stats->ready_job_count
569 .
", num_running_workers=".$stats->num_running_workers
570 .
", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
574 if(scalar(@$actual_batch)) {
575 my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
576 $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
577 $self->job_limiter->final_decision( $jobs_done_by_this_batch );
579 $self->cause_of_death(
'NO_WORK');
585 # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
586 # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
588 if($jobs_done_by_batches_loop) {
590 $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
591 $self->current_role->analysis->dbID,
592 $jobs_done_by_batches_loop,
593 $batches_stopwatch->get_elapsed,
594 $self->{
'_interval_partial_timing'}{
'FETCH_INPUT'} || 0,
595 $self->{
'_interval_partial_timing'}{
'RUN'} || 0,
596 $self->{
'_interval_partial_timing'}{
'WRITE_OUTPUT'} || 0,
600 # A mechanism whereby workers can be caused to exit even if they were doing fine:
601 if (!$self->cause_of_death) {
602 # We're here after having run a batch, so we need to refresh the stats
603 my $analysis = $self->current_role->analysis;
604 my $stats = $analysis->stats;
605 if ( $stats->refresh($self->refresh_tolerance_seconds) ) { #
if we DID refresh
606 $self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
607 $stats->hive_pipeline->invalidate_hive_current_load;
608 if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
609 or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
611 $self->cause_of_death(
'HIVE_OVERLOAD');
616 my $cod = $self->cause_of_death() ||
'';
618 if( $cod eq
'NO_WORK') {
619 $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id,
'ALL_CLAIMED' );
623 # 1) No work to do (computed across all
624 # 2) allowed to by the command-line option
625 # 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses !
626 if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{
'-analyses_pattern'} or $specialization_arghash->{
'-analyses_pattern'}!~/^\w+$/) ) {
627 my $old_role = $self->current_role;
628 $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
629 $self->current_role( undef );
630 $self->cause_of_death(undef);
631 $self->specialize_and_compile_wrapper( $specialization_arghash );
634 } # /Worker
's lifespan loop
636 # The second argument ("update_when_checked_in") is set to force an
637 # update of the "when_checked_in" timestamp in the worker table
638 $self->adaptor->register_worker_death($self, 1);
641 $self->worker_say( 'AnalysisStats :
'.$self->current_role->analysis->stats->toString ) if( $self->current_role );
642 $self->worker_say( 'dbc
'.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles
' );
645 $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
647 if( $self->log_dir ) {
648 $self->get_stdout_redirector->pop();
649 $self->get_stderr_redirector->pop();
654 sub specialize_and_compile_wrapper {
655 my ($self, $specialization_arghash) = @_;
658 $self->enter_status('SPECIALIZATION
');
659 $self->adaptor->specialize_worker( $self, $specialization_arghash );
664 $self->worker_say( "specialization failed:\t$msg" );
666 $self->cause_of_death('SEE_MSG
') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
669 if ($self->cause_of_death() eq "NO_ROLE") {
670 $message_class = 'INFO
';
672 $message_class = 'WORKER_ERROR
'
675 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $message_class );
678 if( !$self->cause_of_death() ) {
679 $self->compile_runnable;
683 sub compile_runnable {
686 $self->enter_status('COMPILATION
');
688 my $current_analysis = $self->current_role->analysis;
689 my $runnable_object = $current_analysis->get_compiled_module_name->new($self->debug, $current_analysis->language, $current_analysis->module) # Only GuestProcess will read the arguments
690 or die "Unknown compilation error";
692 $runnable_object->worker( $self );
694 $self->runnable_object( $runnable_object );
695 $self->enter_status('READY
');
700 $self->handle_compilation_failure($last_err);
704 sub handle_compilation_failure {
705 my ($self, $msg) = @_;
706 $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" );
707 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 'WORKER_ERROR
' );
709 $self->cause_of_death('SEE_MSG
') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
711 $self->check_analysis_for_exclusion();
715 my ($self, $jobs, $is_special_batch) = @_;
717 my $jobs_done_here = 0;
719 my $current_role = $self->current_role;
720 my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
721 my $stats = $current_role->analysis->stats; # cache it to avoid reloading
723 $self->adaptor->check_in_worker( $self );
724 $self->adaptor->safe_synchronize_AnalysisStats( $stats );
727 $self->worker_say( 'AnalysisStats :
' . $stats->toString );
728 $self->worker_say( 'claimed
'.scalar(@{$jobs}).' jobs to process
' );
731 my $job_partial_timing;
733 ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
735 my $job_id = $job->dbID();
736 $self->worker_say( $job->toString ) if($self->debug);
738 my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
739 $job_partial_timing = {};
741 $self->start_job_output_redirection($job); # switch logging into job's STDERR
742 eval { # capture any
throw/die
743 my $runnable_object = $self->runnable_object();
744 $runnable_object->input_job( $job ); #
"take" the job
747 $self->adaptor->db->dbc->query_count(0);
748 $job_stopwatch->restart();
750 $job->load_parameters( $runnable_object );
752 $self->worker_say(
"Job $job_id unsubstituted_params= ".stringify($job->{
'_unsubstituted_param_hash'}) )
if($self->debug());
754 $job_partial_timing = $runnable_object->life_cycle();
757 $job->died_somewhere( $job->incomplete ); # it will be OR
'd inside
758 Bio::EnsEMBL::Hive::Process::warning($self->runnable_object, $msg, $job->incomplete?'WORKER_ERROR
':'INFO
'); # In case the Runnable has redefined warning()
761 # whether the job completed successfully or not:
762 $self->runnable_object->input_job( undef ); # release an extra reference to the job
763 $job->runtime_msec( $job_stopwatch->get_elapsed );
764 $job->query_count( $self->adaptor->db->dbc->query_count );
766 my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died
' : 'complete
' );
768 print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
769 $self->stop_job_output_redirection($job); # and then we
switch back to worker
's STDERR
770 $self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
772 $self->current_role->register_attempt( ! $job->died_somewhere );
774 if($job->died_somewhere) {
775 # Both flags default to 1, meaning that jobs would by default be retried.
776 # If the job specifically said not to retry, or if the worker is configured
777 # not to retry jobs, follow their wish.
778 my $may_retry = $job->transient_error && $self->retry_throwing_jobs;
780 $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
782 if( $self->prev_job_error # a bit of AI:
if the previous job failed as well, it is LIKELY that we have contamination
783 or $job->lethal_for_worker ) { # trust the job
's expert knowledge
784 my $reason = $self->prev_job_error ? 'two failed jobs in a row
'
785 : 'suggested by job itself
';
786 $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die
" );
787 $self->cause_of_death('CONTAMINATED');
790 } else { # job successfully completed:
791 $self->more_work_done( $job_partial_timing );
793 $job->set_and_update_status('DONE');
795 if( my $controlled_semaphore = $job->controlled_semaphore ) {
796 $controlled_semaphore->decrease_by( [ $job ] );
799 if($job->lethal_for_worker) {
800 $self->worker_say( "The Job, although complete, wants the Worker to die
" );
801 $self->cause_of_death('CONTAMINATED');
806 $self->prev_job_error( $job->died_somewhere );
807 $self->enter_status('READY');
809 # UNCLAIM THE SURPLUS:
810 my $remaining_jobs_in_batch = scalar(@$jobs);
811 if( !$is_special_batch and $remaining_jobs_in_batch and $stats->refresh( $self->refresh_tolerance_seconds ) ) { # if we DID refresh
812 my $ready_job_count = $stats->ready_job_count;
813 $stats->hive_pipeline->invalidate_hive_current_load;
814 my $optimal_batch_now = $stats->get_or_estimate_batch_size( $remaining_jobs_in_batch );
815 my $jobs_to_unclaim = $remaining_jobs_in_batch - $optimal_batch_now;
817 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
818 "Check-point: rdy=$ready_job_count, rem=$remaining_jobs_in_batch,
"
819 . "opt=$optimal_batch_now, 2unc=$jobs_to_unclaim
",
822 if( $jobs_to_unclaim > 0 ) {
823 # FIXME: a faster way would be to unclaim( splice(@$jobs, -$jobs_to_unclaim) ); # unclaim the last $jobs_to_unclaim elements
824 # currently we just dump all the remaining jobs and prepare to take a fresh batch:
825 $job->adaptor->release_claimed_jobs_from_role( $current_role );
827 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "Unclaimed $jobs_to_unclaim jobs (trimming the tail)
", 'INFO' );
831 } # /while(my $job = shift @$jobs)
833 return $jobs_done_here;
837 sub set_and_update_status {
838 my ($self, $status ) = @_;
840 $self->status($status);
842 if(my $adaptor = $self->adaptor) {
843 $adaptor->check_in_worker( $self );
849 my ($self, $status) = @_;
852 $self->worker_say( '-> '.$status );
855 $self->set_and_update_status( $status );
859 sub start_job_output_redirection {
860 my ($self, $job) = @_;
862 if(my $worker_log_dir = $self->log_dir) {
863 $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
864 $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
866 if(my $job_adaptor = $job->adaptor) {
867 $job_adaptor->store_out_files($job);
873 sub stop_job_output_redirection {
874 my ($self, $job) = @_;
877 $self->get_stdout_redirector->pop();
878 $self->get_stderr_redirector->pop();
880 my $force_cleanup = !($self->debug || $job->died_somewhere);
882 if($force_cleanup or -z $job->stdout_file) {
883 $self->worker_say( "Deleting
'".$job->stdout_file."' file
" );
884 unlink $job->stdout_file;
885 $job->stdout_file(undef);
887 if($force_cleanup or -z $job->stderr_file) {
888 $self->worker_say( "Deleting
'".$job->stderr_file."' file
" );
889 unlink $job->stderr_file;
890 $job->stderr_file(undef);
893 if(my $job_adaptor = $job->adaptor) {
894 $job_adaptor->store_out_files($job);
899 sub check_analysis_for_exclusion {
900 my $self = shift(@_);
901 my $worker_errors_this_analysis =
902 $self->adaptor->db->get_LogMessageAdaptor()->count_analysis_events(
903 $self->current_role->analysis_id,
905 # warn "There are $worker_errors_this_analysis worker errors
for this analysis\n
";
906 if ($worker_errors_this_analysis > $self->worker_error_threshold) {
907 my $current_logic_name = $self->current_role->analysis->logic_name;
908 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "setting analysis
'$current_logic_name' to excluded
", 'INFO' );
909 $self->current_role->analysis->stats->is_excluded(1);
910 $self->adaptor->db->get_AnalysisStatsAdaptor->update_is_excluded($self->current_role->analysis->stats);
914 sub set_log_directory_name {
915 my ($self, $hive_log_dir, $worker_log_dir) = @_;
917 return unless ($hive_log_dir or $worker_log_dir);
919 my $dir_revhash = dir_revhash($self->dbID // ''); # Database-less workers are not hashed
920 $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/
" : '') . ($self->adaptor ? 'worker_id_' . $self->dbID : 'standalone/worker_pid_' . $self->process_id);
923 make_path( $worker_log_dir );
925 } or die "Could not create
'$worker_log_dir' directory : $
@";
927 $self->log_dir( $worker_log_dir );
928 $self->adaptor->update_log_dir( $self ) if $self->adaptor; # autoloaded
932 =head2 set_temp_directory_name
934 Title : set_temp_directory_name
935 Description : Generates and sets the name of a temporary directory suitable for this worker.
936 It will be under the base directory requested by $base_temp_dir, or the standard
937 location otherwise (as advised by File::Spec), and includes worker attributes
938 to make the path unique.
942 sub set_temp_directory_name {
943 my ($self, $base_temp_dir) = @_;
945 $base_temp_dir //= File::Spec->tmpdir();
947 my $temp_directory_name;
948 if ($self->adaptor) {
949 $temp_directory_name = sprintf('%s/worker_%s_%s.%s/', $base_temp_dir, $self->meadow_user, $self->hive_pipeline->hive_pipeline_name, $self->dbID);
951 $temp_directory_name = sprintf('%s/worker_%s.standalone.%s/', $base_temp_dir, $self->meadow_user, $self->process_id);
954 $self->temp_directory_name( $temp_directory_name );
955 $self->adaptor->update_temp_directory_name( $self ) if $self->adaptor; # autoloaded