ensembl-hive  2.6
Worker.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  Object which encapsulates the details of how to find jobs, how to run those
10  jobs, and then check the rules to create the next jobs in the chain.
11  Essentially knows where to find data, how to process data, and where to
12  put it when it is done (put in next persons INBOX) so the next Worker
13  in the chain can find data to work on.
14 
15  Hive based processing is a concept based on a more controlled version
16  of an autonomous agent type system. Each worker is not told what to do
17  (like a centralized control system - like the current pipeline system)
18  but rather queries a central database for jobs (give me jobs).
19 
20  Each worker is linked to an analysis_id, registers its self on creation
21  into the Hive, creates a RunnableDB instance of the Analysis->module,
22  gets relevant configuration information from the database, does its
23  work, creates the next layer of job entries by interfacing to
24  the DataflowRuleAdaptor to determine the analyses it needs to pass its
25  output data to and creates jobs on the database of the next analysis.
26  It repeats this cycle until it has lived its lifetime or until there are no
27  more jobs left to process.
28  The lifetime limit is a safety limit to prevent these from 'infecting'
29  a system and sitting on a compute node for longer than is socially exceptable.
30  This is primarily needed on compute resources like an LSF system where jobs
31  are not preempted and run until they are done.
32 
33  The Queens primary job is to create Workers to get the work down.
34  As part of this, she is also responsible for summarizing the status of the
35  analyses by querying the jobs, summarizing, and updating the
36  analysis_stats table. From this she is also responsible for monitoring and
37  'unblocking' analyses via the analysis_ctrl_rules.
38  The Queen is also responsible for freeing up jobs that were claimed by Workers
39  that died unexpectantly so that other workers can take over the work.
40 
41  The Beekeeper is in charge of interfacing between the Queen and a compute resource
42  or 'compute farm'. Its job is to query Queens if they need any workers and to
43  send the requested number of workers to open machines via the runWorker.pl script.
44  It is also responsible for interfacing with the Queen to identify workers which died
45  unexpectantly so that she can free the dead workers unfinished jobs.
46 
47 =head1 LICENSE
48 
49  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
50  Copyright [2016-2024] EMBL-European Bioinformatics Institute
51 
52  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
53  You may obtain a copy of the License at
54 
55  http://www.apache.org/licenses/LICENSE-2.0
56 
57  Unless required by applicable law or agreed to in writing, software distributed under the License
58  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59  See the License for the specific language governing permissions and limitations under the License.
60 
61 =head1 CONTACT
62 
63  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
64 
65 =head1 APPENDIX
66 
67  The rest of the documentation details each of the object methods.
68  Internal methods are usually preceded with a _
69 
70 =cut
71 
72 
73 package Bio::EnsEMBL::Hive::Worker;
74 
75 use strict;
76 use warnings;
77 use POSIX;
78 use File::Path 'make_path';
79 use File::Spec;
80 
85 use Bio::EnsEMBL::Hive::Utils ('dir_revhash', 'stringify', 'throw');
86 
87 use base ( 'Bio::EnsEMBL::Hive::Storable' );
88 
89 
90  ## How often we should refresh the AnalysisStats objects
91 sub refresh_tolerance_seconds {
92  return 20;
93 }
94 
95 sub worker_error_threshold {
96  return 2;
97 }
98 
99 
100 
101 =head1 AUTOLOADED
102 
103  resource_class_id / resource_class
104 
105 =cut
106 
107 
108 sub init {
109  my $self = shift;
110 
111  my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
112  $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
113  $lifespan_stopwatch->restart;
114  $self->lifespan_stopwatch( $lifespan_stopwatch );
115 
116  return $self;
117 }
118 
119 
120 ## Storable object's getters/setters:
121 
122 
123 sub meadow_type {
124  my $self = shift;
125  $self->{'_meadow_type'} = shift if(@_);
126  return $self->{'_meadow_type'};
127 }
128 
129 
130 sub meadow_name {
131  my $self = shift;
132  $self->{'_meadow_name'} = shift if(@_);
133  return $self->{'_meadow_name'};
134 }
135 
136 
137 sub meadow_host {
138  my $self = shift;
139  $self->{'_meadow_host'} = shift if(@_);
140  return $self->{'_meadow_host'};
141 }
142 
143 
144 sub meadow_user {
145  my $self = shift;
146  $self->{'_meadow_user'} = shift if(@_);
147  return $self->{'_meadow_user'};
148 }
149 
150 
151 sub process_id {
152  my $self = shift;
153  $self->{'_process_id'} = shift if(@_);
154  return $self->{'_process_id'};
155 }
156 
157 
158 sub work_done {
159  my $self = shift;
160  $self->{'_work_done'} = shift if(@_);
161  return $self->{'_work_done'} || 0;
162 }
163 
164 
165 sub status {
166  my $self = shift;
167  $self->{'_status'} = shift if(@_);
168  return $self->{'_status'};
169 }
170 
171 
172 sub beekeeper_id {
173  my $self = shift;
174  $self->{'_beekeeper_id'} = shift if(@_);
175  return $self->{'_beekeeper_id'} || undef;
176 }
177 
178 
179 sub when_submitted {
180  my $self = shift;
181  $self->{'_when_submitted'} = shift if(@_);
182  return $self->{'_when_submitted'};
183 }
184 
185 
186 sub seconds_since_when_submitted {
187  my $self = shift;
188  $self->{'_seconds_since_when_submitted'} = shift if(@_);
189  return $self->{'_seconds_since_when_submitted'};
190 }
191 
192 
193 sub when_born {
194  my $self = shift;
195  $self->{'_when_born'} = shift if(@_);
196  return $self->{'_when_born'};
197 }
198 
199 
200 sub when_checked_in {
201  my $self = shift;
202  $self->{'_when_checked_in'} = shift if(@_);
203  return $self->{'_when_checked_in'};
204 }
205 
206 
207 sub when_seen {
208  my $self = shift;
209  $self->{'_when_seen'} = shift if(@_);
210  return $self->{'_when_seen'};
211 }
212 
213 
214 sub when_died {
215  my $self = shift;
216  $self->{'_when_died'} = shift if(@_);
217  return $self->{'_when_died'};
218 }
219 
220 
221 sub cause_of_death {
222  my $self = shift;
223  $self->{'_cause_of_death'} = shift if(@_);
224  return $self->{'_cause_of_death'};
225 }
226 
227 
228 =head2 log_dir
229 
230  Arg [1] : (optional) string directory path
231  Title : log_dir
232  Usage : $worker_log_dir = $self->log_dir;
233  $self->log_dir($worker_log_dir);
234  Description: Storable getter/setter attribute for the directory where STDOUT and STRERR of the worker will be redirected to.
235  In this directory each job will have its own .out and .err files.
236  Returntype : string
237 
238 =cut
239 
240 sub log_dir {
241  my $self = shift;
242  $self->{'_log_dir'} = shift if(@_);
243  return $self->{'_log_dir'};
244 }
245 
246 
247 =head2 temp_directory_name
248 
249  Arg [1] : (optional) string directory path
250  Title : temp_directory_name
251  Usage : $worker_tmp_dir = $self->temp_directory_name;
252  $self->temp_directory_name($worker_tmp_dir);
253  Description: Storable getter/setter attribute for the directory where jobs can store temporary data.
254  Returntype : string
255 
256 =cut
257 
258 sub temp_directory_name {
259  my $self = shift;
260  $self->{'_tmp_dir'} = shift if(@_);
261  return $self->{'_tmp_dir'};
262 }
263 
264 
265 ## Non-Storable attributes:
266 
267 sub current_role {
268  my $self = shift;
269 
270  if( @_ ) {
271  if( my $from_analysis = $self->{'_current_role'} && $self->{'_current_role'}->analysis ) {
272  $self->worker_say( "unspecializing from ".$from_analysis->logic_name.'('.$from_analysis->dbID.')' );
273  }
274  my $new_role = shift @_;
275  if( my $to_analysis = $new_role && $new_role->analysis ) {
276  $self->worker_say( "specializing to " . $to_analysis->logic_name . '('.($to_analysis->dbID // 'unstored') . ')' );
277  }
278  $self->{'_current_role'} = $new_role;
279  }
280  return $self->{'_current_role'};
281 }
282 
283 
284 sub debug {
285  my $self = shift;
286  $self->{'_debug'} = shift if(@_);
287  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
288  return $self->{'_debug'};
289 }
290 
291 
292 sub execute_writes {
293  my $self = shift;
294  $self->{'_execute_writes'} = shift if(@_);
295  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
296  return $self->{'_execute_writes'};
297 }
298 
299 
300 sub special_batch {
301  my $self = shift;
302  $self->{'_special_batch'} = shift if(@_);
303  return $self->{'_special_batch'};
304 }
305 
306 
307 sub perform_cleanup {
308  my $self = shift;
309  $self->{'_perform_cleanup'} = shift if(@_);
310  $self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
311  return $self->{'_perform_cleanup'};
312 }
313 
314 
315 # this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
316 
317 sub retry_throwing_jobs {
318  my $self = shift @_;
319 
320  $self->{'_retry_throwing_jobs'} = shift @_ if(@_);
321  return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
322 }
323 
324 
325 sub can_respecialize {
326  my $self = shift;
327  $self->{'_can_respecialize'} = shift if(@_);
328  return $self->{'_can_respecialize'};
329 }
330 
331 
332 =head2 life_span
333 
334  Arg [1] : (optional) integer $value (in seconds)
335  Title : life_span
336  Usage : $value = $self->life_span;
337  $self->life_span($new_value);
338  Description: Defines the maximum time a worker can live for. Workers are always
339  allowed to complete the jobs they get, but whether they can
340  do multiple rounds of work is limited by their life_span
341  DefaultValue : 3600 (60 minutes)
342  Returntype : integer scalar
343 
344 =cut
345 
346 sub life_span { # default life_span = 60minutes
347  my ($self, $value) = @_;
348 
349  if(defined($value)) { # you can still set it to 0 and avoid having the limit on lifespan
350  $self->{'_life_span'} = $value;
351  } elsif(!defined($self->{'_life_span'})) {
352  $self->{'_life_span'} = 60*60;
353  }
354  return $self->{'_life_span'};
355 }
356 
357 sub lifespan_stopwatch {
358  my $self = shift @_;
359 
360  if(@_) {
361  $self->{'_lifespan_stopwatch'} = shift @_;
362  }
363  return $self->{'_lifespan_stopwatch'};
364 }
365 
366 sub life_span_limit_reached {
367  my $self = shift @_;
368 
369  if( $self->life_span() ) {
370  my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
371  if($alive_for_secs > $self->life_span() ) {
372  return $alive_for_secs;
373  }
374  }
375  return 0;
376 }
377 
378 
379 =head2 job_limiter
380 
381  Title : job_limiter
382  Arg [1] : (optional) integer $value
383  Usage : $limiter_obj = $self->job_limiter;
384  $self->job_limiter($new_value);
385  Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
386  A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded.
387  DefaultValue : undef (relies on life_span to limit life of worker)
388  Returntype : Hive::Limiter object
389 
390 =cut
391 
392 sub job_limiter {
393  my $self=shift;
394  if( scalar(@_) or !defined($self->{'_job_limiter'}) ) {
395  $self->{'_job_limiter'} = Bio::EnsEMBL::Hive::Limiter->new("Total number of jobs this Worker is allowed to take", shift @_);
396  }
397  return $self->{'_job_limiter'};
398 }
399 
400 
401 sub more_work_done {
402  my ($self, $job_partial_timing) = @_;
403 
404  $self->{'_work_done'}++;
405 
406  while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
407  $self->{'_interval_partial_timing'}{$state} += $partial_timing_in_state;
408  }
409 }
410 
411 
412 # By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
413 #
414 # Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
415 # but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.
416 
417 sub prev_job_error {
418  my $self = shift @_;
419 
420  $self->{'_prev_job_error'} = shift if(@_);
421  return $self->{'_prev_job_error'};
422 }
423 
424 sub runnable_object {
425  my $self = shift @_;
426 
427  $self->{'_runnable_object'} = shift @_ if(@_);
428  return $self->{'_runnable_object'};
429 }
430 
431 
432 sub get_stdout_redirector {
433  my $self = shift;
434 
435  return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
436 }
437 
438 sub get_stderr_redirector {
439  my $self = shift;
440 
441  return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
442 }
443 
444 
445 sub worker_say {
446  my ($self, $msg) = @_;
447 
448  unless ($self->adaptor) {
449  print "Standalone worker $$ : $msg\n";
450  return;
451  }
452 
453  my $worker_id = $self->dbID();
454  my $current_role = $self->current_role;
455  my $job_id = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
456  print "Worker $worker_id [ ". ( $current_role
457  ? ('Role '.$current_role->dbID.' , '.$current_role->analysis->logic_name.'('.$current_role->analysis_id.')'
458  . ($job_id ? ", Job $job_id" : '')
459  )
460  : 'UNSPECIALIZED'
461  )." ] $msg\n";
462 }
463 
464 
465 sub toString {
466  my ($self, $include_analysis) = @_;
467 
468  my $current_role = $self->current_role;
469 
470  return join(', ',
471  $include_analysis ? ( 'analysis='.($current_role ? $current_role->analysis->logic_name.'('.$current_role->analysis_id.')' : 'UNSPECIALIZED') ) : (),
472  'resource_class_id='.($self->resource_class_id // 'NULL'),
473  'meadow='.$self->meadow_type.'/'.$self->meadow_name,
474  'process='.$self->meadow_user.'@'.$self->meadow_host.'#'.$self->process_id,
475  'when_checked_in='.($self->when_checked_in // 'NEVER'),
476  'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() : 'UNSPECIALIZED'),
477  'job_limit='.($self->job_limiter->available_capacity() // 'NONE'),
478  'life_span='.($self->life_span // 'UNLIM'),
479  'worker_log_dir='.($self->log_dir // 'STDOUT/STDERR'),
480  );
481 }
482 
483 
484 ###############################
485 #
486 # WORK section
487 #
488 ###############################
489 
490 
491 =head2 run
492 
493  Title : run
494  Usage : $worker->run;
495  Description:
496  This is a self looping autonomous function to process jobs.
497  First all STDOUT/STDERR is rediected, then looping commences.
498  Looping consists of
499  1) claiming jobs,
500  2) processing those jobs through an instance of the 'module class' of
501  the analysis asigned to this worker,
502  3) updating the job, analysis_stats, and hive tables to track the
503  progress of the job, the analysis and this worker.
504  Looping stops when any one of these are met:
505  1) there is no more jobs to process
506  2) job_limit is reached
507  3) life_span has been reached.
508  Returntype : none
509 
510 =cut
511 
512 sub run {
513  my ($self, $specialization_arghash) = @_;
514 
515  if( my $worker_log_dir = $self->log_dir ) {
516  $self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
517  $self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
518  }
519 
521  my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
522 
523  print "\n"; # to clear beekeeper's prompt in case output is not logged
524  $self->worker_say( $self->toString() );
525  $self->specialize_and_compile_wrapper( $specialization_arghash );
526 
527  while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
528 
529  my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
530  my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
531  $self->{'_interval_partial_timing'} = {};
532 
533  if( my $special_batch = $self->special_batch() ) {
534  my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
535  $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
536  $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ? 'JOB_LIMIT' : 'CONTAMINATED');
537  } else { # a proper "BATCHES" loop
538 
539  while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
540  my $current_role = $self->current_role;
541 
542  if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
543  my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
544  $self->worker_say( $msg );
545  $self->cause_of_death('CONTAMINATED');
546  $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
547 
548  } elsif( $self->job_limiter->reached()) {
549  $self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
550  $self->cause_of_death('JOB_LIMIT');
551 
552  } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
553  $self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
554  $self->cause_of_death('LIFESPAN');
555 
556  } else {
557  # No need to refresh the stats or the hive_current_load # since it's all been refreshed in
558  # specialize_and_compile_wrapper()
559  my $stats = $current_role->analysis->stats;
560  my $desired_batch_size = $stats->get_or_estimate_batch_size();
561  my $hit_the_limit; # dummy at the moment
562  ($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
563 
564  my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
565 
566  if($self->debug) {
567  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
568  "Claiming: ready_job_count=".$stats->ready_job_count
569  .", num_running_workers=".$stats->num_running_workers
570  .", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
571  'INFO' );
572  }
573 
574  if(scalar(@$actual_batch)) {
575  my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
576  $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
577  $self->job_limiter->final_decision( $jobs_done_by_this_batch );
578  } else {
579  $self->cause_of_death('NO_WORK');
580  }
581  }
582  }
583  }
584 
585  # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
586  # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
587  #
588  if($jobs_done_by_batches_loop) {
589 
590  $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
591  $self->current_role->analysis->dbID,
592  $jobs_done_by_batches_loop,
593  $batches_stopwatch->get_elapsed,
594  $self->{'_interval_partial_timing'}{'FETCH_INPUT'} || 0,
595  $self->{'_interval_partial_timing'}{'RUN'} || 0,
596  $self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
597  );
598  }
599 
600  # A mechanism whereby workers can be caused to exit even if they were doing fine:
601  if (!$self->cause_of_death) {
602  # We're here after having run a batch, so we need to refresh the stats
603  my $analysis = $self->current_role->analysis;
604  my $stats = $analysis->stats;
605  if ( $stats->refresh($self->refresh_tolerance_seconds) ) { # if we DID refresh
606  $self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
607  $stats->hive_pipeline->invalidate_hive_current_load;
608  if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
609  or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
610  ) {
611  $self->cause_of_death('HIVE_OVERLOAD');
612  }
613  }
614  }
615 
616  my $cod = $self->cause_of_death() || '';
617 
618  if( $cod eq 'NO_WORK') {
619  $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id, 'ALL_CLAIMED' );
620  }
621 
622  # Respecialize if:
623  # 1) No work to do (computed across all
624  # 2) allowed to by the command-line option
625  # 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses !
626  if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{'-analyses_pattern'} or $specialization_arghash->{'-analyses_pattern'}!~/^\w+$/) ) {
627  my $old_role = $self->current_role;
628  $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
629  $self->current_role( undef );
630  $self->cause_of_death(undef);
631  $self->specialize_and_compile_wrapper( $specialization_arghash );
632  }
633 
634  } # /Worker's lifespan loop
635 
636  # The second argument ("update_when_checked_in") is set to force an
637  # update of the "when_checked_in" timestamp in the worker table
638  $self->adaptor->register_worker_death($self, 1);
639 
640  if($self->debug) {
641  $self->worker_say( 'AnalysisStats : '.$self->current_role->analysis->stats->toString ) if( $self->current_role );
642  $self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
643  }
644 
645  $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
646 
647  if( $self->log_dir ) {
648  $self->get_stdout_redirector->pop();
649  $self->get_stderr_redirector->pop();
650  }
651 }
652 
653 
654 sub specialize_and_compile_wrapper {
655  my ($self, $specialization_arghash) = @_;
656 
657  eval {
658  $self->enter_status('SPECIALIZATION');
659  $self->adaptor->specialize_worker( $self, $specialization_arghash );
660  1;
661  } or do {
662  my $msg = $@;
663  chomp $msg;
664  $self->worker_say( "specialization failed:\t$msg" );
665 
666  $self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
667 
668  my $message_class;
669  if ($self->cause_of_death() eq "NO_ROLE") {
670  $message_class = 'INFO';
671  } else {
672  $message_class = 'WORKER_ERROR'
673  }
674 
675  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $message_class );
676  };
677 
678  if( !$self->cause_of_death() ) {
679  $self->compile_runnable;
680  }
681 }
682 
683 sub compile_runnable {
684  my $self = shift;
685  eval {
686  $self->enter_status('COMPILATION');
687 
688  my $current_analysis = $self->current_role->analysis;
689  my $runnable_object = $current_analysis->get_compiled_module_name->new($self->debug, $current_analysis->language, $current_analysis->module) # Only GuestProcess will read the arguments
690  or die "Unknown compilation error";
691 
692  $runnable_object->worker( $self );
693 
694  $self->runnable_object( $runnable_object );
695  $self->enter_status('READY');
696 
697  1;
698  } or do {
699  my $last_err = $@;
700  $self->handle_compilation_failure($last_err);
701  };
702 }
703 
704 sub handle_compilation_failure {
705  my ($self, $msg) = @_;
706  $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" );
707  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 'WORKER_ERROR' );
708 
709  $self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
710 
711  $self->check_analysis_for_exclusion();
712 }
713 
714 sub run_one_batch {
715  my ($self, $jobs, $is_special_batch) = @_;
716 
717  my $jobs_done_here = 0;
718 
719  my $current_role = $self->current_role;
720  my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
721  my $stats = $current_role->analysis->stats; # cache it to avoid reloading
722 
723  $self->adaptor->check_in_worker( $self );
724  $self->adaptor->safe_synchronize_AnalysisStats( $stats );
725 
726  if($self->debug) {
727  $self->worker_say( 'AnalysisStats : ' . $stats->toString );
728  $self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
729  }
730 
731  my $job_partial_timing;
732 
733  ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
734 
735  my $job_id = $job->dbID();
736  $self->worker_say( $job->toString ) if($self->debug);
737 
738  my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
739  $job_partial_timing = {};
740 
741  $self->start_job_output_redirection($job); # switch logging into job's STDERR
742  eval { # capture any throw/die
743  my $runnable_object = $self->runnable_object();
744  $runnable_object->input_job( $job ); # "take" the job
745 
746  $job->incomplete(1);
747  $self->adaptor->db->dbc->query_count(0);
748  $job_stopwatch->restart();
749 
750  $job->load_parameters( $runnable_object );
751 
752  $self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
753 
754  $job_partial_timing = $runnable_object->life_cycle();
755  };
756  if(my $msg = $@) {
757  $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
758  Bio::EnsEMBL::Hive::Process::warning($self->runnable_object, $msg, $job->incomplete?'WORKER_ERROR':'INFO'); # In case the Runnable has redefined warning()
759  }
760 
761  # whether the job completed successfully or not:
762  $self->runnable_object->input_job( undef ); # release an extra reference to the job
763  $job->runtime_msec( $job_stopwatch->get_elapsed );
764  $job->query_count( $self->adaptor->db->dbc->query_count );
765 
766  my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
767 
768  print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
769  $self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
770  $self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
771 
772  $self->current_role->register_attempt( ! $job->died_somewhere );
773 
774  if($job->died_somewhere) {
775  # Both flags default to 1, meaning that jobs would by default be retried.
776  # If the job specifically said not to retry, or if the worker is configured
777  # not to retry jobs, follow their wish.
778  my $may_retry = $job->transient_error && $self->retry_throwing_jobs;
779 
780  $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
781 
782  if( $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
783  or $job->lethal_for_worker ) { # trust the job's expert knowledge
784  my $reason = $self->prev_job_error ? 'two failed jobs in a row'
785  : 'suggested by job itself';
786  $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
787  $self->cause_of_death('CONTAMINATED');
788  last ONE_BATCH;
789  }
790  } else { # job successfully completed:
791  $self->more_work_done( $job_partial_timing );
792  $jobs_done_here++;
793  $job->set_and_update_status('DONE');
794 
795  if( my $controlled_semaphore = $job->controlled_semaphore ) {
796  $controlled_semaphore->decrease_by( [ $job ] );
797  }
798 
799  if($job->lethal_for_worker) {
800  $self->worker_say( "The Job, although complete, wants the Worker to die" );
801  $self->cause_of_death('CONTAMINATED');
802  last ONE_BATCH;
803  }
804  }
805 
806  $self->prev_job_error( $job->died_somewhere );
807  $self->enter_status('READY');
808 
809  # UNCLAIM THE SURPLUS:
810  my $remaining_jobs_in_batch = scalar(@$jobs);
811  if( !$is_special_batch and $remaining_jobs_in_batch and $stats->refresh( $self->refresh_tolerance_seconds ) ) { # if we DID refresh
812  my $ready_job_count = $stats->ready_job_count;
813  $stats->hive_pipeline->invalidate_hive_current_load;
814  my $optimal_batch_now = $stats->get_or_estimate_batch_size( $remaining_jobs_in_batch );
815  my $jobs_to_unclaim = $remaining_jobs_in_batch - $optimal_batch_now;
816  if($self->debug) {
817  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
818  "Check-point: rdy=$ready_job_count, rem=$remaining_jobs_in_batch, "
819  . "opt=$optimal_batch_now, 2unc=$jobs_to_unclaim",
820  'INFO' );
821  }
822  if( $jobs_to_unclaim > 0 ) {
823  # FIXME: a faster way would be to unclaim( splice(@$jobs, -$jobs_to_unclaim) ); # unclaim the last $jobs_to_unclaim elements
824  # currently we just dump all the remaining jobs and prepare to take a fresh batch:
825  $job->adaptor->release_claimed_jobs_from_role( $current_role );
826  $jobs = [];
827  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "Unclaimed $jobs_to_unclaim jobs (trimming the tail)", 'INFO' );
828  }
829  }
830 
831  } # /while(my $job = shift @$jobs)
832 
833  return $jobs_done_here;
834 }
835 
836 
837 sub set_and_update_status {
838  my ($self, $status ) = @_;
839 
840  $self->status($status);
841 
842  if(my $adaptor = $self->adaptor) {
843  $adaptor->check_in_worker( $self );
844  }
845 }
846 
847 
848 sub enter_status {
849  my ($self, $status) = @_;
850 
851  if($self->debug) {
852  $self->worker_say( '-> '.$status );
853  }
854 
855  $self->set_and_update_status( $status );
856 }
857 
858 
859 sub start_job_output_redirection {
860  my ($self, $job) = @_;
861 
862  if(my $worker_log_dir = $self->log_dir) {
863  $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
864  $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
865 
866  if(my $job_adaptor = $job->adaptor) {
867  $job_adaptor->store_out_files($job);
868  }
869  }
870 }
871 
872 
873 sub stop_job_output_redirection {
874  my ($self, $job) = @_;
875 
876  if($self->log_dir) {
877  $self->get_stdout_redirector->pop();
878  $self->get_stderr_redirector->pop();
879 
880  my $force_cleanup = !($self->debug || $job->died_somewhere);
881 
882  if($force_cleanup or -z $job->stdout_file) {
883  $self->worker_say( "Deleting '".$job->stdout_file."' file" );
884  unlink $job->stdout_file;
885  $job->stdout_file(undef);
886  }
887  if($force_cleanup or -z $job->stderr_file) {
888  $self->worker_say( "Deleting '".$job->stderr_file."' file" );
889  unlink $job->stderr_file;
890  $job->stderr_file(undef);
891  }
892 
893  if(my $job_adaptor = $job->adaptor) {
894  $job_adaptor->store_out_files($job);
895  }
896  }
897 }
898 
899 sub check_analysis_for_exclusion {
900  my $self = shift(@_);
901  my $worker_errors_this_analysis =
902  $self->adaptor->db->get_LogMessageAdaptor()->count_analysis_events(
903  $self->current_role->analysis_id,
904  'WORKER_ERROR');
905 # warn "There are $worker_errors_this_analysis worker errors for this analysis\n";
906  if ($worker_errors_this_analysis > $self->worker_error_threshold) {
907  my $current_logic_name = $self->current_role->analysis->logic_name;
908  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "setting analysis '$current_logic_name' to excluded", 'INFO' );
909  $self->current_role->analysis->stats->is_excluded(1);
910  $self->adaptor->db->get_AnalysisStatsAdaptor->update_is_excluded($self->current_role->analysis->stats);
911  }
912 }
913 
914 sub set_log_directory_name {
915  my ($self, $hive_log_dir, $worker_log_dir) = @_;
916 
917  return unless ($hive_log_dir or $worker_log_dir);
918 
919  my $dir_revhash = dir_revhash($self->dbID // ''); # Database-less workers are not hashed
920  $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') . ($self->adaptor ? 'worker_id_' . $self->dbID : 'standalone/worker_pid_' . $self->process_id);
921 
922  eval {
923  make_path( $worker_log_dir );
924  1;
925  } or die "Could not create '$worker_log_dir' directory : $@";
926 
927  $self->log_dir( $worker_log_dir );
928  $self->adaptor->update_log_dir( $self ) if $self->adaptor; # autoloaded
929 }
930 
931 
932 =head2 set_temp_directory_name
933 
934  Title : set_temp_directory_name
935  Description : Generates and sets the name of a temporary directory suitable for this worker.
936  It will be under the base directory requested by $base_temp_dir, or the standard
937  location otherwise (as advised by File::Spec), and includes worker attributes
938  to make the path unique.
939 
940 =cut
941 
942 sub set_temp_directory_name {
943  my ($self, $base_temp_dir) = @_;
944 
945  $base_temp_dir //= File::Spec->tmpdir();
946 
947  my $temp_directory_name;
948  if ($self->adaptor) {
949  $temp_directory_name = sprintf('%s/worker_%s_%s.%s/', $base_temp_dir, $self->meadow_user, $self->hive_pipeline->hive_pipeline_name, $self->dbID);
950  } else {
951  $temp_directory_name = sprintf('%s/worker_%s.standalone.%s/', $base_temp_dir, $self->meadow_user, $self->process_id);
952  }
953 
954  $self->temp_directory_name( $temp_directory_name );
955  $self->adaptor->update_temp_directory_name( $self ) if $self->adaptor; # autoloaded
956 }
957 
958 
959 1;
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::Limiter::new
public new()
Bio::EnsEMBL::Hive::Utils::RedirectStack::new
public new()
Bio::EnsEMBL::Hive::Limiter
Definition: Limiter.pm:10
Bio::EnsEMBL::Hive::AnalysisStats
Definition: AnalysisStats.pm:12
Bio::EnsEMBL::Hive::AnalysisStats::min_batch_time
public min_batch_time()
Bio::EnsEMBL::Hive::Utils::RedirectStack
Definition: RedirectStack.pm:14
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
debug
public debug()
run
public run()
Bio::EnsEMBL::Hive::Utils::Stopwatch
Definition: Stopwatch.pm:33
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::Utils::Stopwatch::new
public new()