9 Object which encapsulates the details of how to find jobs, how to run those
10 jobs, and then check the rules to create the next jobs in the chain.
11 Essentially knows where to find data, how to process data, and where to
12 put it when it is done (put in next persons INBOX) so the next Worker
13 in the chain can find data to work on.
15 Hive based processing is a concept based on a more controlled version
16 of an autonomous agent type system. Each worker is not told what to
do 17 (like a centralized control system - like the current pipeline system)
18 but rather queries a central database
for jobs (give me jobs).
20 Each worker is linked to an analysis_id, registers its
self on creation
21 into the Hive, creates a RunnableDB instance of the Analysis->module,
22 gets relevant configuration information from the database, does its
23 work, creates the next layer of job entries by interfacing to
24 the DataflowRuleAdaptor to determine the analyses it needs to pass its
25 output data to and creates jobs on the database of the next analysis.
26 It repeats
this cycle until it has lived its lifetime or until there are no
27 more jobs left to process.
28 The lifetime limit is a safety limit to prevent these from
'infecting' 29 a system and sitting on a compute node
for longer than is socially exceptable.
30 This is primarily needed on compute resources like an LSF system where jobs
31 are not preempted and run until they are done.
33 The Queens primary job is to create Workers to
get the work down.
34 As part of
this, she is also responsible
for summarizing the status of the
35 analyses by querying the jobs, summarizing, and updating the
36 analysis_stats table. From
this she is also responsible
for monitoring and
37 'unblocking' analyses via the analysis_ctrl_rules.
38 The Queen is also responsible
for freeing up jobs that were claimed by Workers
39 that died unexpectantly so that other workers can take over the work.
41 The Beekeeper is in charge of interfacing between the Queen and a compute resource
42 or
'compute farm'. Its job is to query Queens
if they need any workers and to
43 send the requested number of workers to open machines via the runWorker.pl script.
44 It is also responsible
for interfacing with the Queen to identify workers which died
45 unexpectantly so that she can free the dead workers unfinished jobs.
49 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
50 Copyright [2016-2022] EMBL-European Bioinformatics Institute
52 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
53 You may obtain a copy of the License at
57 Unless required by applicable law or agreed to in writing, software distributed under the License
58 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59 See the License
for the specific language governing permissions and limitations under the License.
63 Please subscribe to the Hive mailing list: http:
67 The rest of the documentation details each of the
object methods.
68 Internal methods are usually preceded with a _
73 package Bio::EnsEMBL::Hive::Worker;
78 use File::Path
'make_path';
86 use base (
'Bio::EnsEMBL::Hive::Storable' );
89 ## How often we should refresh the AnalysisStats objects 90 sub refresh_tolerance_seconds {
94 sub worker_error_threshold {
102 resource_class_id / resource_class
110 my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
111 $lifespan_stopwatch->_unit(1); # count in seconds (
default is milliseconds)
112 $lifespan_stopwatch->restart;
113 $self->lifespan_stopwatch( $lifespan_stopwatch );
119 ## Storable object's getters/setters: 124 $self->{
'_meadow_type'} = shift
if(@_);
125 return $self->{
'_meadow_type'};
131 $self->{
'_meadow_name'} = shift
if(@_);
132 return $self->{
'_meadow_name'};
138 $self->{
'_meadow_host'} = shift
if(@_);
139 return $self->{
'_meadow_host'};
145 $self->{
'_meadow_user'} = shift
if(@_);
146 return $self->{
'_meadow_user'};
152 $self->{
'_process_id'} = shift
if(@_);
153 return $self->{
'_process_id'};
159 $self->{
'_work_done'} = shift
if(@_);
160 return $self->{
'_work_done'} || 0;
166 $self->{
'_status'} = shift
if(@_);
167 return $self->{
'_status'};
173 $self->{
'_beekeeper_id'} = shift
if(@_);
174 return $self->{
'_beekeeper_id'} || undef;
180 $self->{
'_when_submitted'} = shift
if(@_);
181 return $self->{
'_when_submitted'};
185 sub seconds_since_when_submitted {
187 $self->{
'_seconds_since_when_submitted'} = shift
if(@_);
188 return $self->{
'_seconds_since_when_submitted'};
194 $self->{
'_when_born'} = shift
if(@_);
195 return $self->{
'_when_born'};
199 sub when_checked_in {
201 $self->{
'_when_checked_in'} = shift
if(@_);
202 return $self->{
'_when_checked_in'};
208 $self->{
'_when_seen'} = shift
if(@_);
209 return $self->{
'_when_seen'};
215 $self->{
'_when_died'} = shift
if(@_);
216 return $self->{
'_when_died'};
222 $self->{
'_cause_of_death'} = shift
if(@_);
223 return $self->{
'_cause_of_death'};
229 Arg [1] : (optional)
string directory path
231 Usage : $worker_log_dir = $self->log_dir;
232 $self->log_dir($worker_log_dir);
233 Description:
Storable getter/setter attribute
for the directory where STDOUT and STRERR of the worker will be redirected to.
234 In
this directory each job will have its own .out and .err files.
241 $self->{
'_log_dir'} = shift
if(@_);
242 return $self->{
'_log_dir'};
247 ## Non-Storable attributes: 253 if( my $from_analysis = $self->{
'_current_role'} && $self->{
'_current_role'}->analysis ) {
254 $self->worker_say(
"unspecializing from ".$from_analysis->logic_name.
'('.$from_analysis->dbID.
')' );
256 my $new_role = shift @_;
257 if( my $to_analysis = $new_role && $new_role->analysis ) {
258 $self->worker_say(
"specializing to " . $to_analysis->logic_name .
'('.($to_analysis->dbID
260 $self->{
'_current_role'} = $new_role;
262 return $self->{
'_current_role'};
268 $self->{
'_debug'} = shift
if(@_);
269 $self->{
'_debug'}=0 unless(defined($self->{
'_debug'}));
270 return $self->{
'_debug'};
276 $self->{
'_execute_writes'} = shift
if(@_);
277 $self->{
'_execute_writes'}=1 unless(defined($self->{
'_execute_writes'}));
278 return $self->{
'_execute_writes'};
284 $self->{
'_special_batch'} = shift
if(@_);
285 return $self->{
'_special_batch'};
289 sub perform_cleanup {
291 $self->{
'_perform_cleanup'} = shift
if(@_);
292 $self->{
'_perform_cleanup'} = 1 unless(defined($self->{
'_perform_cleanup'}));
293 return $self->{
'_perform_cleanup'};
297 # this is a setter/getter that defines default behaviour when a job throws: should it be retried or not? 299 sub retry_throwing_jobs {
302 $self->{
'_retry_throwing_jobs'} = shift @_
if(@_);
303 return defined($self->{
'_retry_throwing_jobs'}) ? $self->{
'_retry_throwing_jobs'} : 1;
307 sub can_respecialize {
309 $self->{
'_can_respecialize'} = shift
if(@_);
310 return $self->{
'_can_respecialize'};
316 Arg [1] : (optional) integer $value (in seconds)
318 Usage : $value = $self->life_span;
319 $self->life_span($new_value);
320 Description: Defines the maximum time a worker can live
for. Workers are always
321 allowed to complete the jobs they
get, but whether they can
322 do multiple rounds of work is limited by their life_span
323 DefaultValue : 3600 (60 minutes)
324 Returntype : integer scalar
328 sub life_span { #
default life_span = 60minutes
329 my ($self, $value) = @_;
331 if(defined($value)) { # you can still
set it to 0 and avoid having the limit on lifespan
332 $self->{
'_life_span'} = $value;
333 } elsif(!defined($self->{
'_life_span'})) {
334 $self->{
'_life_span'} = 60*60;
336 return $self->{
'_life_span'};
339 sub lifespan_stopwatch {
343 $self->{
'_lifespan_stopwatch'} = shift @_;
345 return $self->{
'_lifespan_stopwatch'};
348 sub life_span_limit_reached {
351 if( $self->life_span() ) {
352 my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
353 if($alive_for_secs > $self->life_span() ) {
354 return $alive_for_secs;
364 Arg [1] : (optional) integer $value
365 Usage : $limiter_obj = $self->job_limiter;
366 $self->job_limiter($new_value);
367 Description: The maximum number of jobs to be done by the
Worker can be limited by the given number.
368 A worker
'dies' when either the
'life_span' or
'job_limit' is exceeded.
369 DefaultValue : undef (relies on life_span to limit life of worker)
370 Returntype : Hive::Limiter
object 376 if( scalar(@_) or !defined($self->{
'_job_limiter'}) ) {
379 return $self->{
'_job_limiter'};
384 my ($self, $job_partial_timing) = @_;
386 $self->{
'_work_done'}++;
388 while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
389 $self->{
'_interval_partial_timing'}{$state} += $partial_timing_in_state;
394 # By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it 396 # Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ), 397 # but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die. 402 $self->{
'_prev_job_error'} = shift
if(@_);
403 return $self->{
'_prev_job_error'};
406 sub runnable_object {
409 $self->{
'_runnable_object'} = shift @_
if(@_);
410 return $self->{
'_runnable_object'};
414 sub get_stdout_redirector {
420 sub get_stderr_redirector {
428 my ($self, $msg) = @_;
430 unless ($self->adaptor) {
431 print
"Standalone worker $$ : $msg\n";
435 my $worker_id = $self->dbID();
436 my $current_role = $self->current_role;
437 my $job_id = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
438 print
"Worker $worker_id [ ". ( $current_role
439 ? (
'Role '.$current_role->dbID.
' , '.$current_role->analysis->logic_name.
'('.$current_role->analysis_id.
')' 440 . ($job_id ?
", Job $job_id" :
'')
448 my ($self, $include_analysis) = @_;
450 my $current_role = $self->current_role;
453 $include_analysis ? (
'analysis='.($current_role ? $current_role->analysis->logic_name.
'('.$current_role->analysis_id.
')' :
'UNSPECIALIZED') ) : (),
454 'resource_class_id='.($self->resource_class_id
455 'meadow='.$self->meadow_type.
'/'.$self->meadow_name,
456 'process='.$self->meadow_user.
'@'.$self->meadow_host.
'#'.$self->process_id,
457 'when_checked_in='.($self->when_checked_in
458 'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() :
'UNSPECIALIZED'),
459 'job_limit='.($self->job_limiter->available_capacity()
460 'life_span='.($self->life_span
461 'worker_log_dir='.($self->log_dir
466 ############################### 470 ############################### 476 Usage : $worker->run;
478 This is a
self looping autonomous
function to process jobs.
479 First all STDOUT/STDERR is rediected, then looping commences.
482 2) processing those jobs through an instance of the
'module class' of
483 the analysis asigned to
this worker,
484 3) updating the job, analysis_stats, and hive tables to track the
485 progress of the job, the analysis and
this worker.
486 Looping stops when any one of these are met:
487 1) there is no more jobs to process
488 2) job_limit is reached
489 3) life_span has been reached.
495 my ($self, $specialization_arghash) = @_;
497 if( my $worker_log_dir = $self->log_dir ) {
498 $self->get_stdout_redirector->push( $worker_log_dir.
'/worker.out' );
499 $self->get_stderr_redirector->push( $worker_log_dir.
'/worker.err' );
503 my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
505 print
"\n"; # to clear beekeeper
's prompt in case output is not logged 506 $self->worker_say( $self->toString() ); 507 $self->specialize_and_compile_wrapper( $specialization_arghash ); 509 while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies
for any reason)
512 my $jobs_done_by_batches_loop = 0; # by all iterations of
internal loop
513 $self->{
'_interval_partial_timing'} = {};
515 if( my $special_batch = $self->special_batch() ) {
516 my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
517 $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
518 $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ?
'JOB_LIMIT' :
'CONTAMINATED');
519 }
else { # a proper
"BATCHES" loop
521 while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
522 my $current_role = $self->current_role;
524 if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
525 my $msg =
"Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
526 $self->worker_say( $msg );
527 $self->cause_of_death(
'CONTAMINATED');
528 $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
530 } elsif( $self->job_limiter->reached()) {
531 $self->worker_say(
"job_limit reached (".$self->work_done.
" jobs completed)" );
532 $self->cause_of_death(
'JOB_LIMIT');
534 } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
535 $self->worker_say(
"life_span limit reached (alive for $alive_for_secs secs)" );
536 $self->cause_of_death(
'LIFESPAN');
539 # No need to refresh the stats or the hive_current_load # since it's all been refreshed in 540 # specialize_and_compile_wrapper() 541 my $stats = $current_role->analysis->stats;
542 my $desired_batch_size = $stats->get_or_estimate_batch_size();
543 my $hit_the_limit; # dummy at the moment
544 ($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
546 my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
549 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
550 "Claiming: ready_job_count=".$stats->ready_job_count
551 .
", num_running_workers=".$stats->num_running_workers
552 .
", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
556 if(scalar(@$actual_batch)) {
557 my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
558 $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
559 $self->job_limiter->final_decision( $jobs_done_by_this_batch );
561 $self->cause_of_death(
'NO_WORK');
567 # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables), 568 # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done) 570 if($jobs_done_by_batches_loop) {
572 $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
573 $self->current_role->analysis->dbID,
574 $jobs_done_by_batches_loop,
575 $batches_stopwatch->get_elapsed,
576 $self->{
'_interval_partial_timing'}{
'FETCH_INPUT'} || 0,
577 $self->{
'_interval_partial_timing'}{
'RUN'} || 0,
578 $self->{
'_interval_partial_timing'}{
'WRITE_OUTPUT'} || 0,
582 # A mechanism whereby workers can be caused to exit even if they were doing fine: 583 if (!$self->cause_of_death) {
584 # We're here after having run a batch, so we need to refresh the stats 585 my $analysis = $self->current_role->analysis;
586 my $stats = $analysis->stats;
587 if ( $stats->refresh($self->refresh_tolerance_seconds) ) { #
if we DID refresh
588 $self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
589 $stats->hive_pipeline->invalidate_hive_current_load;
590 if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
591 or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
593 $self->cause_of_death(
'HIVE_OVERLOAD');
598 my $cod = $self->cause_of_death() ||
'';
600 if( $cod eq
'NO_WORK') {
601 $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id,
'ALL_CLAIMED' );
605 # 1) No work to do (computed across all 606 # 2) allowed to by the command-line option 607 # 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses ! 608 if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{
'-analyses_pattern'} or $specialization_arghash->{
'-analyses_pattern'}!~/^\w+$/) ) {
609 my $old_role = $self->current_role;
610 $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
611 $self->current_role( undef );
612 $self->cause_of_death(undef);
613 $self->specialize_and_compile_wrapper( $specialization_arghash );
616 } # /
Worker's lifespan loop 618 # The second argument ("update_when_checked_in") is set to force an 619 # update of the "when_checked_in" timestamp in the worker table 620 $self->adaptor->register_worker_death($self, 1); 623 $self->worker_say( 'AnalysisStats :
'.$self->current_role->analysis->stats->toString ) if( $self->current_role ); 624 $self->worker_say( 'dbc
'.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles
' ); 627 $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death ); 629 if( $self->log_dir ) { 630 $self->get_stdout_redirector->pop(); 631 $self->get_stderr_redirector->pop(); 636 sub specialize_and_compile_wrapper { 637 my ($self, $specialization_arghash) = @_; 640 $self->enter_status('SPECIALIZATION
'); 641 $self->adaptor->specialize_worker( $self, $specialization_arghash ); 646 $self->worker_say( "specialization failed:\t$msg" ); 648 $self->cause_of_death('SEE_MSG
') unless($self->cause_of_death()); # some specific causes could have been set prior to die "..."; 651 if ($self->cause_of_death() eq "NO_ROLE") { 652 $message_class = 'INFO
'; 654 $message_class = 'WORKER_ERROR
' 657 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $message_class ); 660 if( !$self->cause_of_death() ) { 661 $self->compile_runnable; 665 sub compile_runnable { 668 $self->enter_status('COMPILATION
'); 670 my $current_analysis = $self->current_role->analysis; 671 my $runnable_object = $current_analysis->get_compiled_module_name->new($self->debug, $current_analysis->language, $current_analysis->module) # Only GuestProcess will read the arguments 672 or die "Unknown compilation error"; 674 $runnable_object->worker( $self ); 676 $self->runnable_object( $runnable_object ); 677 $self->enter_status('READY
'); 682 $self->handle_compilation_failure($last_err); 686 sub handle_compilation_failure { 687 my ($self, $msg) = @_; 688 $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" ); 689 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 'WORKER_ERROR
' ); 691 $self->cause_of_death('SEE_MSG
') unless($self->cause_of_death()); # some specific causes could have been set prior to die "..."; 693 $self->check_analysis_for_exclusion(); 697 my ($self, $jobs, $is_special_batch) = @_; 699 my $jobs_done_here = 0; 701 my $current_role = $self->current_role; 702 my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs 703 my $stats = $current_role->analysis->stats; # cache it to avoid reloading 705 $self->adaptor->check_in_worker( $self ); 706 $self->adaptor->safe_synchronize_AnalysisStats( $stats ); 710 $self->worker_say( 'claimed
'.scalar(@{$jobs}).' jobs to process
' ); 713 my $job_partial_timing; 715 ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay 717 my $job_id = $job->dbID(); 718 $self->worker_say( $job->toString ) if($self->debug); 720 my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new(); 721 $job_partial_timing = {}; 723 $self->start_job_output_redirection($job); # switch logging into job's STDERR
724 eval { # capture any
throw/die
725 my $runnable_object = $self->runnable_object();
726 $runnable_object->input_job( $job ); #
"take" the job
730 $job_stopwatch->restart();
732 $job->load_parameters( $runnable_object );
734 $self->worker_say(
"Job $job_id unsubstituted_params= ".stringify($job->{
'_unsubstituted_param_hash'}) )
if($self->debug());
736 $job_partial_timing = $runnable_object->life_cycle();
739 $job->died_somewhere( $job->incomplete ); # it will be OR
'd inside 740 Bio::EnsEMBL::Hive::Process::warning($self->runnable_object, $msg, $job->incomplete?'WORKER_ERROR
':'INFO
'); # In case the Runnable has redefined warning() 743 # whether the job completed successfully or not: 744 $self->runnable_object->input_job( undef ); # release an extra reference to the job 745 $job->runtime_msec( $job_stopwatch->get_elapsed ); 746 $job->query_count( $self->adaptor->db->dbc->query_count ); 748 my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died
' : 'complete
' ); 750 print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
751 $self->stop_job_output_redirection($job); # and then we
switch back to worker
's STDERR 752 $self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
754 $self->current_role->register_attempt( ! $job->died_somewhere );
756 if($job->died_somewhere) {
757 # Both flags default to 1, meaning that jobs would by default be retried. 758 # If the job specifically said not to retry, or if the worker is configured 759 # not to retry jobs, follow their wish. 760 my $may_retry = $job->transient_error && $self->retry_throwing_jobs;
762 $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
764 if( $self->prev_job_error # a bit of AI:
if the previous job failed as well, it is LIKELY that we have contamination
765 or $job->lethal_for_worker ) { # trust the job
's expert knowledge 766 my $reason = $self->prev_job_error ? 'two failed jobs in a row
' 767 : 'suggested by job itself
'; 768 $self->worker_say( "Job's error has contaminated the
Worker ($reason), so the
Worker will now die
" ); 769 $self->cause_of_death('CONTAMINATED'); 772 } else { # job successfully completed: 773 $self->more_work_done( $job_partial_timing ); 775 $job->set_and_update_status('DONE'); 777 if( my $controlled_semaphore = $job->controlled_semaphore ) { 778 $controlled_semaphore->decrease_by( [ $job ] ); 781 if($job->lethal_for_worker) { 782 $self->worker_say( "The Job, although complete, wants the
Worker to die
" ); 783 $self->cause_of_death('CONTAMINATED'); 788 $self->prev_job_error( $job->died_somewhere ); 789 $self->enter_status('READY'); 791 # UNCLAIM THE SURPLUS: 792 my $remaining_jobs_in_batch = scalar(@$jobs); 793 if( !$is_special_batch and $remaining_jobs_in_batch and $stats->refresh( $self->refresh_tolerance_seconds ) ) { # if we DID refresh 794 my $ready_job_count = $stats->ready_job_count; 795 $stats->hive_pipeline->invalidate_hive_current_load; 796 my $optimal_batch_now = $stats->get_or_estimate_batch_size( $remaining_jobs_in_batch ); 797 my $jobs_to_unclaim = $remaining_jobs_in_batch - $optimal_batch_now; 799 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, 800 "Check-point: rdy=$ready_job_count, rem=$remaining_jobs_in_batch,
" 801 . "opt=$optimal_batch_now, 2unc=$jobs_to_unclaim
", 804 if( $jobs_to_unclaim > 0 ) { 805 # FIXME: a faster way would be to unclaim( splice(@$jobs, -$jobs_to_unclaim) ); # unclaim the last $jobs_to_unclaim elements 806 # currently we just dump all the remaining jobs and prepare to take a fresh batch: 807 $job->adaptor->release_claimed_jobs_from_role( $current_role ); 809 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "Unclaimed $jobs_to_unclaim jobs (trimming the tail)
", 'INFO' ); 813 } # /while(my $job = shift @$jobs) 815 return $jobs_done_here; 819 sub set_and_update_status { 820 my ($self, $status ) = @_; 822 $self->status($status); 824 if(my $adaptor = $self->adaptor) { 825 $adaptor->check_in_worker( $self ); 831 my ($self, $status) = @_; 834 $self->worker_say( '-> '.$status ); 837 $self->set_and_update_status( $status ); 841 sub start_job_output_redirection { 842 my ($self, $job) = @_; 844 if(my $worker_log_dir = $self->log_dir) { 845 $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) ); 846 $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) ); 848 if(my $job_adaptor = $job->adaptor) { 849 $job_adaptor->store_out_files($job); 855 sub stop_job_output_redirection { 856 my ($self, $job) = @_; 859 $self->get_stdout_redirector->pop(); 860 $self->get_stderr_redirector->pop(); 862 my $force_cleanup = !($self->debug || $job->died_somewhere); 864 if($force_cleanup or -z $job->stdout_file) { 865 $self->worker_say( "Deleting
'".$job->stdout_file."' file
" ); 866 unlink $job->stdout_file; 867 $job->stdout_file(undef); 869 if($force_cleanup or -z $job->stderr_file) { 870 $self->worker_say( "Deleting
'".$job->stderr_file."' file
" ); 871 unlink $job->stderr_file; 872 $job->stderr_file(undef); 875 if(my $job_adaptor = $job->adaptor) { 876 $job_adaptor->store_out_files($job); 881 sub check_analysis_for_exclusion { 882 my $self = shift(@_); 883 my $worker_errors_this_analysis = 884 $self->adaptor->db->get_LogMessageAdaptor()->count_analysis_events( 885 $self->current_role->analysis_id, 887 # warn "There are $worker_errors_this_analysis worker errors
for this analysis\n
"; 888 if ($worker_errors_this_analysis > $self->worker_error_threshold) { 889 my $current_logic_name = $self->current_role->analysis->logic_name; 890 $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "setting analysis
'$current_logic_name' to excluded
", 'INFO' ); 891 $self->current_role->analysis->stats->is_excluded(1); 892 $self->adaptor->db->get_AnalysisStatsAdaptor->update_is_excluded($self->current_role->analysis->stats); 896 sub set_log_directory_name { 897 my ($self, $hive_log_dir, $worker_log_dir) = @_; 899 return unless ($hive_log_dir or $worker_log_dir); 901 my $dir_revhash = dir_revhash($self->dbID // ''); # Database-less workers are not hashed 902 $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/
" : '') . ($self->adaptor ? 'worker_id_' . $self->dbID : 'standalone/worker_pid_' . $self->process_id); 905 make_path( $worker_log_dir ); 907 } or die "Could not create
'$worker_log_dir' directory : $
@"; 909 $self->log_dir( $worker_log_dir ); 910 $self->adaptor->update_log_dir( $self ) if $self->adaptor; # autoloaded 914 sub temp_directory_name { 917 if ($self->adaptor) { 918 return sprintf('/tmp/worker_%s_%s.%s/', $self->meadow_user, $self->hive_pipeline->hive_pipeline_name, $self->dbID); 920 return sprintf('/tmp/worker_%s.standalone.%s/', $self->meadow_user, $self->process_id);
public Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor adaptor()