ensembl-hive  2.5
Worker.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  Object which encapsulates the details of how to find jobs, how to run those
10  jobs, and then check the rules to create the next jobs in the chain.
11  Essentially knows where to find data, how to process data, and where to
12  put it when it is done (put in next persons INBOX) so the next Worker
13  in the chain can find data to work on.
14 
15  Hive based processing is a concept based on a more controlled version
16  of an autonomous agent type system. Each worker is not told what to do
17  (like a centralized control system - like the current pipeline system)
18  but rather queries a central database for jobs (give me jobs).
19 
20  Each worker is linked to an analysis_id, registers its self on creation
21  into the Hive, creates a RunnableDB instance of the Analysis->module,
22  gets relevant configuration information from the database, does its
23  work, creates the next layer of job entries by interfacing to
24  the DataflowRuleAdaptor to determine the analyses it needs to pass its
25  output data to and creates jobs on the database of the next analysis.
26  It repeats this cycle until it has lived its lifetime or until there are no
27  more jobs left to process.
28  The lifetime limit is a safety limit to prevent these from 'infecting'
29  a system and sitting on a compute node for longer than is socially exceptable.
30  This is primarily needed on compute resources like an LSF system where jobs
31  are not preempted and run until they are done.
32 
33  The Queens primary job is to create Workers to get the work down.
34  As part of this, she is also responsible for summarizing the status of the
35  analyses by querying the jobs, summarizing, and updating the
36  analysis_stats table. From this she is also responsible for monitoring and
37  'unblocking' analyses via the analysis_ctrl_rules.
38  The Queen is also responsible for freeing up jobs that were claimed by Workers
39  that died unexpectantly so that other workers can take over the work.
40 
41  The Beekeeper is in charge of interfacing between the Queen and a compute resource
42  or 'compute farm'. Its job is to query Queens if they need any workers and to
43  send the requested number of workers to open machines via the runWorker.pl script.
44  It is also responsible for interfacing with the Queen to identify workers which died
45  unexpectantly so that she can free the dead workers unfinished jobs.
46 
47 =head1 LICENSE
48 
49  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
50  Copyright [2016-2022] EMBL-European Bioinformatics Institute
51 
52  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
53  You may obtain a copy of the License at
54 
55  http://www.apache.org/licenses/LICENSE-2.0
56 
57  Unless required by applicable law or agreed to in writing, software distributed under the License
58  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59  See the License for the specific language governing permissions and limitations under the License.
60 
61 =head1 CONTACT
62 
63  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
64 
65 =head1 APPENDIX
66 
67  The rest of the documentation details each of the object methods.
68  Internal methods are usually preceded with a _
69 
70 =cut
71 
72 
73 package Bio::EnsEMBL::Hive::Worker;
74 
75 use strict;
76 use warnings;
77 use POSIX;
78 use File::Path 'make_path';
79 
84 use Bio::EnsEMBL::Hive::Utils ('dir_revhash', 'stringify', 'throw');
85 
86 use base ( 'Bio::EnsEMBL::Hive::Storable' );
87 
88 
89  ## How often we should refresh the AnalysisStats objects
90 sub refresh_tolerance_seconds {
91  return 20;
92 }
93 
94 sub worker_error_threshold {
95  return 2;
96 }
97 
98 
99 
100 =head1 AUTOLOADED
101 
102  resource_class_id / resource_class
103 
104 =cut
105 
106 
107 sub init {
108  my $self = shift;
109 
110  my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
111  $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
112  $lifespan_stopwatch->restart;
113  $self->lifespan_stopwatch( $lifespan_stopwatch );
114 
115  return $self;
116 }
117 
118 
119 ## Storable object's getters/setters:
120 
121 
122 sub meadow_type {
123  my $self = shift;
124  $self->{'_meadow_type'} = shift if(@_);
125  return $self->{'_meadow_type'};
126 }
127 
128 
129 sub meadow_name {
130  my $self = shift;
131  $self->{'_meadow_name'} = shift if(@_);
132  return $self->{'_meadow_name'};
133 }
134 
135 
136 sub meadow_host {
137  my $self = shift;
138  $self->{'_meadow_host'} = shift if(@_);
139  return $self->{'_meadow_host'};
140 }
141 
142 
143 sub meadow_user {
144  my $self = shift;
145  $self->{'_meadow_user'} = shift if(@_);
146  return $self->{'_meadow_user'};
147 }
148 
149 
150 sub process_id {
151  my $self = shift;
152  $self->{'_process_id'} = shift if(@_);
153  return $self->{'_process_id'};
154 }
155 
156 
157 sub work_done {
158  my $self = shift;
159  $self->{'_work_done'} = shift if(@_);
160  return $self->{'_work_done'} || 0;
161 }
162 
163 
164 sub status {
165  my $self = shift;
166  $self->{'_status'} = shift if(@_);
167  return $self->{'_status'};
168 }
169 
170 
171 sub beekeeper_id {
172  my $self = shift;
173  $self->{'_beekeeper_id'} = shift if(@_);
174  return $self->{'_beekeeper_id'} || undef;
175 }
176 
177 
178 sub when_submitted {
179  my $self = shift;
180  $self->{'_when_submitted'} = shift if(@_);
181  return $self->{'_when_submitted'};
182 }
183 
184 
185 sub seconds_since_when_submitted {
186  my $self = shift;
187  $self->{'_seconds_since_when_submitted'} = shift if(@_);
188  return $self->{'_seconds_since_when_submitted'};
189 }
190 
191 
192 sub when_born {
193  my $self = shift;
194  $self->{'_when_born'} = shift if(@_);
195  return $self->{'_when_born'};
196 }
197 
198 
199 sub when_checked_in {
200  my $self = shift;
201  $self->{'_when_checked_in'} = shift if(@_);
202  return $self->{'_when_checked_in'};
203 }
204 
205 
206 sub when_seen {
207  my $self = shift;
208  $self->{'_when_seen'} = shift if(@_);
209  return $self->{'_when_seen'};
210 }
211 
212 
213 sub when_died {
214  my $self = shift;
215  $self->{'_when_died'} = shift if(@_);
216  return $self->{'_when_died'};
217 }
218 
219 
220 sub cause_of_death {
221  my $self = shift;
222  $self->{'_cause_of_death'} = shift if(@_);
223  return $self->{'_cause_of_death'};
224 }
225 
226 
227 =head2 log_dir
228 
229  Arg [1] : (optional) string directory path
230  Title : log_dir
231  Usage : $worker_log_dir = $self->log_dir;
232  $self->log_dir($worker_log_dir);
233  Description: Storable getter/setter attribute for the directory where STDOUT and STRERR of the worker will be redirected to.
234  In this directory each job will have its own .out and .err files.
235  Returntype : string
236 
237 =cut
238 
239 sub log_dir {
240  my $self = shift;
241  $self->{'_log_dir'} = shift if(@_);
242  return $self->{'_log_dir'};
243 }
244 
245 
246 
247 ## Non-Storable attributes:
248 
249 sub current_role {
250  my $self = shift;
251 
252  if( @_ ) {
253  if( my $from_analysis = $self->{'_current_role'} && $self->{'_current_role'}->analysis ) {
254  $self->worker_say( "unspecializing from ".$from_analysis->logic_name.'('.$from_analysis->dbID.')' );
255  }
256  my $new_role = shift @_;
257  if( my $to_analysis = $new_role && $new_role->analysis ) {
258  $self->worker_say( "specializing to " . $to_analysis->logic_name . '('.($to_analysis->dbID // 'unstored') . ')' );
259  }
260  $self->{'_current_role'} = $new_role;
261  }
262  return $self->{'_current_role'};
263 }
264 
265 
266 sub debug {
267  my $self = shift;
268  $self->{'_debug'} = shift if(@_);
269  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
270  return $self->{'_debug'};
271 }
272 
273 
274 sub execute_writes {
275  my $self = shift;
276  $self->{'_execute_writes'} = shift if(@_);
277  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
278  return $self->{'_execute_writes'};
279 }
280 
281 
282 sub special_batch {
283  my $self = shift;
284  $self->{'_special_batch'} = shift if(@_);
285  return $self->{'_special_batch'};
286 }
287 
288 
289 sub perform_cleanup {
290  my $self = shift;
291  $self->{'_perform_cleanup'} = shift if(@_);
292  $self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
293  return $self->{'_perform_cleanup'};
294 }
295 
296 
297 # this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
298 
299 sub retry_throwing_jobs {
300  my $self = shift @_;
301 
302  $self->{'_retry_throwing_jobs'} = shift @_ if(@_);
303  return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
304 }
305 
306 
307 sub can_respecialize {
308  my $self = shift;
309  $self->{'_can_respecialize'} = shift if(@_);
310  return $self->{'_can_respecialize'};
311 }
312 
313 
314 =head2 life_span
315 
316  Arg [1] : (optional) integer $value (in seconds)
317  Title : life_span
318  Usage : $value = $self->life_span;
319  $self->life_span($new_value);
320  Description: Defines the maximum time a worker can live for. Workers are always
321  allowed to complete the jobs they get, but whether they can
322  do multiple rounds of work is limited by their life_span
323  DefaultValue : 3600 (60 minutes)
324  Returntype : integer scalar
325 
326 =cut
327 
328 sub life_span { # default life_span = 60minutes
329  my ($self, $value) = @_;
330 
331  if(defined($value)) { # you can still set it to 0 and avoid having the limit on lifespan
332  $self->{'_life_span'} = $value;
333  } elsif(!defined($self->{'_life_span'})) {
334  $self->{'_life_span'} = 60*60;
335  }
336  return $self->{'_life_span'};
337 }
338 
339 sub lifespan_stopwatch {
340  my $self = shift @_;
341 
342  if(@_) {
343  $self->{'_lifespan_stopwatch'} = shift @_;
344  }
345  return $self->{'_lifespan_stopwatch'};
346 }
347 
348 sub life_span_limit_reached {
349  my $self = shift @_;
350 
351  if( $self->life_span() ) {
352  my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
353  if($alive_for_secs > $self->life_span() ) {
354  return $alive_for_secs;
355  }
356  }
357  return 0;
358 }
359 
360 
361 =head2 job_limiter
362 
363  Title : job_limiter
364  Arg [1] : (optional) integer $value
365  Usage : $limiter_obj = $self->job_limiter;
366  $self->job_limiter($new_value);
367  Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
368  A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded.
369  DefaultValue : undef (relies on life_span to limit life of worker)
370  Returntype : Hive::Limiter object
371 
372 =cut
373 
374 sub job_limiter {
375  my $self=shift;
376  if( scalar(@_) or !defined($self->{'_job_limiter'}) ) {
377  $self->{'_job_limiter'} = Bio::EnsEMBL::Hive::Limiter->new("Total number of jobs this Worker is allowed to take", shift @_);
378  }
379  return $self->{'_job_limiter'};
380 }
381 
382 
383 sub more_work_done {
384  my ($self, $job_partial_timing) = @_;
385 
386  $self->{'_work_done'}++;
387 
388  while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
389  $self->{'_interval_partial_timing'}{$state} += $partial_timing_in_state;
390  }
391 }
392 
393 
394 # By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
395 #
396 # Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
397 # but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.
398 
399 sub prev_job_error {
400  my $self = shift @_;
401 
402  $self->{'_prev_job_error'} = shift if(@_);
403  return $self->{'_prev_job_error'};
404 }
405 
406 sub runnable_object {
407  my $self = shift @_;
408 
409  $self->{'_runnable_object'} = shift @_ if(@_);
410  return $self->{'_runnable_object'};
411 }
412 
413 
414 sub get_stdout_redirector {
415  my $self = shift;
416 
417  return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
418 }
419 
420 sub get_stderr_redirector {
421  my $self = shift;
422 
423  return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
424 }
425 
426 
427 sub worker_say {
428  my ($self, $msg) = @_;
429 
430  unless ($self->adaptor) {
431  print "Standalone worker $$ : $msg\n";
432  return;
433  }
434 
435  my $worker_id = $self->dbID();
436  my $current_role = $self->current_role;
437  my $job_id = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
438  print "Worker $worker_id [ ". ( $current_role
439  ? ('Role '.$current_role->dbID.' , '.$current_role->analysis->logic_name.'('.$current_role->analysis_id.')'
440  . ($job_id ? ", Job $job_id" : '')
441  )
442  : 'UNSPECIALIZED'
443  )." ] $msg\n";
444 }
445 
446 
447 sub toString {
448  my ($self, $include_analysis) = @_;
449 
450  my $current_role = $self->current_role;
451 
452  return join(', ',
453  $include_analysis ? ( 'analysis='.($current_role ? $current_role->analysis->logic_name.'('.$current_role->analysis_id.')' : 'UNSPECIALIZED') ) : (),
454  'resource_class_id='.($self->resource_class_id // 'NULL'),
455  'meadow='.$self->meadow_type.'/'.$self->meadow_name,
456  'process='.$self->meadow_user.'@'.$self->meadow_host.'#'.$self->process_id,
457  'when_checked_in='.($self->when_checked_in // 'NEVER'),
458  'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() : 'UNSPECIALIZED'),
459  'job_limit='.($self->job_limiter->available_capacity() // 'NONE'),
460  'life_span='.($self->life_span // 'UNLIM'),
461  'worker_log_dir='.($self->log_dir // 'STDOUT/STDERR'),
462  );
463 }
464 
465 
466 ###############################
467 #
468 # WORK section
469 #
470 ###############################
471 
472 
473 =head2 run
474 
475  Title : run
476  Usage : $worker->run;
477  Description:
478  This is a self looping autonomous function to process jobs.
479  First all STDOUT/STDERR is rediected, then looping commences.
480  Looping consists of
481  1) claiming jobs,
482  2) processing those jobs through an instance of the 'module class' of
483  the analysis asigned to this worker,
484  3) updating the job, analysis_stats, and hive tables to track the
485  progress of the job, the analysis and this worker.
486  Looping stops when any one of these are met:
487  1) there is no more jobs to process
488  2) job_limit is reached
489  3) life_span has been reached.
490  Returntype : none
491 
492 =cut
493 
494 sub run {
495  my ($self, $specialization_arghash) = @_;
496 
497  if( my $worker_log_dir = $self->log_dir ) {
498  $self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
499  $self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
500  }
501 
503  my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
504 
505  print "\n"; # to clear beekeeper's prompt in case output is not logged
506  $self->worker_say( $self->toString() );
507  $self->specialize_and_compile_wrapper( $specialization_arghash );
508 
509  while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
510 
511  my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
512  my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
513  $self->{'_interval_partial_timing'} = {};
514 
515  if( my $special_batch = $self->special_batch() ) {
516  my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
517  $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
518  $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ? 'JOB_LIMIT' : 'CONTAMINATED');
519  } else { # a proper "BATCHES" loop
520 
521  while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
522  my $current_role = $self->current_role;
523 
524  if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
525  my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
526  $self->worker_say( $msg );
527  $self->cause_of_death('CONTAMINATED');
528  $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
529 
530  } elsif( $self->job_limiter->reached()) {
531  $self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
532  $self->cause_of_death('JOB_LIMIT');
533 
534  } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
535  $self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
536  $self->cause_of_death('LIFESPAN');
537 
538  } else {
539  # No need to refresh the stats or the hive_current_load # since it's all been refreshed in
540  # specialize_and_compile_wrapper()
541  my $stats = $current_role->analysis->stats;
542  my $desired_batch_size = $stats->get_or_estimate_batch_size();
543  my $hit_the_limit; # dummy at the moment
544  ($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
545 
546  my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
547 
548  if($self->debug) {
549  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
550  "Claiming: ready_job_count=".$stats->ready_job_count
551  .", num_running_workers=".$stats->num_running_workers
552  .", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
553  'INFO' );
554  }
555 
556  if(scalar(@$actual_batch)) {
557  my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
558  $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
559  $self->job_limiter->final_decision( $jobs_done_by_this_batch );
560  } else {
561  $self->cause_of_death('NO_WORK');
562  }
563  }
564  }
565  }
566 
567  # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
568  # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
569  #
570  if($jobs_done_by_batches_loop) {
571 
572  $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
573  $self->current_role->analysis->dbID,
574  $jobs_done_by_batches_loop,
575  $batches_stopwatch->get_elapsed,
576  $self->{'_interval_partial_timing'}{'FETCH_INPUT'} || 0,
577  $self->{'_interval_partial_timing'}{'RUN'} || 0,
578  $self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
579  );
580  }
581 
582  # A mechanism whereby workers can be caused to exit even if they were doing fine:
583  if (!$self->cause_of_death) {
584  # We're here after having run a batch, so we need to refresh the stats
585  my $analysis = $self->current_role->analysis;
586  my $stats = $analysis->stats;
587  if ( $stats->refresh($self->refresh_tolerance_seconds) ) { # if we DID refresh
588  $self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
589  $stats->hive_pipeline->invalidate_hive_current_load;
590  if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
591  or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
592  ) {
593  $self->cause_of_death('HIVE_OVERLOAD');
594  }
595  }
596  }
597 
598  my $cod = $self->cause_of_death() || '';
599 
600  if( $cod eq 'NO_WORK') {
601  $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id, 'ALL_CLAIMED' );
602  }
603 
604  # Respecialize if:
605  # 1) No work to do (computed across all
606  # 2) allowed to by the command-line option
607  # 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses !
608  if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{'-analyses_pattern'} or $specialization_arghash->{'-analyses_pattern'}!~/^\w+$/) ) {
609  my $old_role = $self->current_role;
610  $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
611  $self->current_role( undef );
612  $self->cause_of_death(undef);
613  $self->specialize_and_compile_wrapper( $specialization_arghash );
614  }
615 
616  } # /Worker's lifespan loop
617 
618  # The second argument ("update_when_checked_in") is set to force an
619  # update of the "when_checked_in" timestamp in the worker table
620  $self->adaptor->register_worker_death($self, 1);
621 
622  if($self->debug) {
623  $self->worker_say( 'AnalysisStats : '.$self->current_role->analysis->stats->toString ) if( $self->current_role );
624  $self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
625  }
626 
627  $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
628 
629  if( $self->log_dir ) {
630  $self->get_stdout_redirector->pop();
631  $self->get_stderr_redirector->pop();
632  }
633 }
634 
635 
636 sub specialize_and_compile_wrapper {
637  my ($self, $specialization_arghash) = @_;
638 
639  eval {
640  $self->enter_status('SPECIALIZATION');
641  $self->adaptor->specialize_worker( $self, $specialization_arghash );
642  1;
643  } or do {
644  my $msg = $@;
645  chomp $msg;
646  $self->worker_say( "specialization failed:\t$msg" );
647 
648  $self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
649 
650  my $message_class;
651  if ($self->cause_of_death() eq "NO_ROLE") {
652  $message_class = 'INFO';
653  } else {
654  $message_class = 'WORKER_ERROR'
655  }
656 
657  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $message_class );
658  };
659 
660  if( !$self->cause_of_death() ) {
661  $self->compile_runnable;
662  }
663 }
664 
665 sub compile_runnable {
666  my $self = shift;
667  eval {
668  $self->enter_status('COMPILATION');
669 
670  my $current_analysis = $self->current_role->analysis;
671  my $runnable_object = $current_analysis->get_compiled_module_name->new($self->debug, $current_analysis->language, $current_analysis->module) # Only GuestProcess will read the arguments
672  or die "Unknown compilation error";
673 
674  $runnable_object->worker( $self );
675 
676  $self->runnable_object( $runnable_object );
677  $self->enter_status('READY');
678 
679  1;
680  } or do {
681  my $last_err = $@;
682  $self->handle_compilation_failure($last_err);
683  };
684 }
685 
686 sub handle_compilation_failure {
687  my ($self, $msg) = @_;
688  $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" );
689  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 'WORKER_ERROR' );
690 
691  $self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
692 
693  $self->check_analysis_for_exclusion();
694 }
695 
696 sub run_one_batch {
697  my ($self, $jobs, $is_special_batch) = @_;
698 
699  my $jobs_done_here = 0;
700 
701  my $current_role = $self->current_role;
702  my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
703  my $stats = $current_role->analysis->stats; # cache it to avoid reloading
704 
705  $self->adaptor->check_in_worker( $self );
706  $self->adaptor->safe_synchronize_AnalysisStats( $stats );
707 
708  if($self->debug) {
709  $self->worker_say( 'AnalysisStats : ' . $stats->toString );
710  $self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
711  }
712 
713  my $job_partial_timing;
714 
715  ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
716 
717  my $job_id = $job->dbID();
718  $self->worker_say( $job->toString ) if($self->debug);
719 
720  my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
721  $job_partial_timing = {};
722 
723  $self->start_job_output_redirection($job); # switch logging into job's STDERR
724  eval { # capture any throw/die
725  my $runnable_object = $self->runnable_object();
726  $runnable_object->input_job( $job ); # "take" the job
727 
728  $job->incomplete(1);
729  $self->adaptor->db->dbc->query_count(0);
730  $job_stopwatch->restart();
731 
732  $job->load_parameters( $runnable_object );
733 
734  $self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
735 
736  $job_partial_timing = $runnable_object->life_cycle();
737  };
738  if(my $msg = $@) {
739  $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
740  Bio::EnsEMBL::Hive::Process::warning($self->runnable_object, $msg, $job->incomplete?'WORKER_ERROR':'INFO'); # In case the Runnable has redefined warning()
741  }
742 
743  # whether the job completed successfully or not:
744  $self->runnable_object->input_job( undef ); # release an extra reference to the job
745  $job->runtime_msec( $job_stopwatch->get_elapsed );
746  $job->query_count( $self->adaptor->db->dbc->query_count );
747 
748  my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
749 
750  print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
751  $self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
752  $self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
753 
754  $self->current_role->register_attempt( ! $job->died_somewhere );
755 
756  if($job->died_somewhere) {
757  # Both flags default to 1, meaning that jobs would by default be retried.
758  # If the job specifically said not to retry, or if the worker is configured
759  # not to retry jobs, follow their wish.
760  my $may_retry = $job->transient_error && $self->retry_throwing_jobs;
761 
762  $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
763 
764  if( $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
765  or $job->lethal_for_worker ) { # trust the job's expert knowledge
766  my $reason = $self->prev_job_error ? 'two failed jobs in a row'
767  : 'suggested by job itself';
768  $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
769  $self->cause_of_death('CONTAMINATED');
770  last ONE_BATCH;
771  }
772  } else { # job successfully completed:
773  $self->more_work_done( $job_partial_timing );
774  $jobs_done_here++;
775  $job->set_and_update_status('DONE');
776 
777  if( my $controlled_semaphore = $job->controlled_semaphore ) {
778  $controlled_semaphore->decrease_by( [ $job ] );
779  }
780 
781  if($job->lethal_for_worker) {
782  $self->worker_say( "The Job, although complete, wants the Worker to die" );
783  $self->cause_of_death('CONTAMINATED');
784  last ONE_BATCH;
785  }
786  }
787 
788  $self->prev_job_error( $job->died_somewhere );
789  $self->enter_status('READY');
790 
791  # UNCLAIM THE SURPLUS:
792  my $remaining_jobs_in_batch = scalar(@$jobs);
793  if( !$is_special_batch and $remaining_jobs_in_batch and $stats->refresh( $self->refresh_tolerance_seconds ) ) { # if we DID refresh
794  my $ready_job_count = $stats->ready_job_count;
795  $stats->hive_pipeline->invalidate_hive_current_load;
796  my $optimal_batch_now = $stats->get_or_estimate_batch_size( $remaining_jobs_in_batch );
797  my $jobs_to_unclaim = $remaining_jobs_in_batch - $optimal_batch_now;
798  if($self->debug) {
799  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
800  "Check-point: rdy=$ready_job_count, rem=$remaining_jobs_in_batch, "
801  . "opt=$optimal_batch_now, 2unc=$jobs_to_unclaim",
802  'INFO' );
803  }
804  if( $jobs_to_unclaim > 0 ) {
805  # FIXME: a faster way would be to unclaim( splice(@$jobs, -$jobs_to_unclaim) ); # unclaim the last $jobs_to_unclaim elements
806  # currently we just dump all the remaining jobs and prepare to take a fresh batch:
807  $job->adaptor->release_claimed_jobs_from_role( $current_role );
808  $jobs = [];
809  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "Unclaimed $jobs_to_unclaim jobs (trimming the tail)", 'INFO' );
810  }
811  }
812 
813  } # /while(my $job = shift @$jobs)
814 
815  return $jobs_done_here;
816 }
817 
818 
819 sub set_and_update_status {
820  my ($self, $status ) = @_;
821 
822  $self->status($status);
823 
824  if(my $adaptor = $self->adaptor) {
825  $adaptor->check_in_worker( $self );
826  }
827 }
828 
829 
830 sub enter_status {
831  my ($self, $status) = @_;
832 
833  if($self->debug) {
834  $self->worker_say( '-> '.$status );
835  }
836 
837  $self->set_and_update_status( $status );
838 }
839 
840 
841 sub start_job_output_redirection {
842  my ($self, $job) = @_;
843 
844  if(my $worker_log_dir = $self->log_dir) {
845  $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
846  $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
847 
848  if(my $job_adaptor = $job->adaptor) {
849  $job_adaptor->store_out_files($job);
850  }
851  }
852 }
853 
854 
855 sub stop_job_output_redirection {
856  my ($self, $job) = @_;
857 
858  if($self->log_dir) {
859  $self->get_stdout_redirector->pop();
860  $self->get_stderr_redirector->pop();
861 
862  my $force_cleanup = !($self->debug || $job->died_somewhere);
863 
864  if($force_cleanup or -z $job->stdout_file) {
865  $self->worker_say( "Deleting '".$job->stdout_file."' file" );
866  unlink $job->stdout_file;
867  $job->stdout_file(undef);
868  }
869  if($force_cleanup or -z $job->stderr_file) {
870  $self->worker_say( "Deleting '".$job->stderr_file."' file" );
871  unlink $job->stderr_file;
872  $job->stderr_file(undef);
873  }
874 
875  if(my $job_adaptor = $job->adaptor) {
876  $job_adaptor->store_out_files($job);
877  }
878  }
879 }
880 
881 sub check_analysis_for_exclusion {
882  my $self = shift(@_);
883  my $worker_errors_this_analysis =
884  $self->adaptor->db->get_LogMessageAdaptor()->count_analysis_events(
885  $self->current_role->analysis_id,
886  'WORKER_ERROR');
887 # warn "There are $worker_errors_this_analysis worker errors for this analysis\n";
888  if ($worker_errors_this_analysis > $self->worker_error_threshold) {
889  my $current_logic_name = $self->current_role->analysis->logic_name;
890  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "setting analysis '$current_logic_name' to excluded", 'INFO' );
891  $self->current_role->analysis->stats->is_excluded(1);
892  $self->adaptor->db->get_AnalysisStatsAdaptor->update_is_excluded($self->current_role->analysis->stats);
893  }
894 }
895 
896 sub set_log_directory_name {
897  my ($self, $hive_log_dir, $worker_log_dir) = @_;
898 
899  return unless ($hive_log_dir or $worker_log_dir);
900 
901  my $dir_revhash = dir_revhash($self->dbID // ''); # Database-less workers are not hashed
902  $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') . ($self->adaptor ? 'worker_id_' . $self->dbID : 'standalone/worker_pid_' . $self->process_id);
903 
904  eval {
905  make_path( $worker_log_dir );
906  1;
907  } or die "Could not create '$worker_log_dir' directory : $@";
908 
909  $self->log_dir( $worker_log_dir );
910  $self->adaptor->update_log_dir( $self ) if $self->adaptor; # autoloaded
911 }
912 
913 
914 sub temp_directory_name {
915  my $self = shift @_;
916 
917  if ($self->adaptor) {
918  return sprintf('/tmp/worker_%s_%s.%s/', $self->meadow_user, $self->hive_pipeline->hive_pipeline_name, $self->dbID);
919  } else {
920  return sprintf('/tmp/worker_%s.standalone.%s/', $self->meadow_user, $self->process_id);
921  }
922 }
923 
924 
925 1;
public Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor adaptor()