ensembl-hive  2.8.1
Worker.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  Object which encapsulates the details of how to find jobs, how to run those
10  jobs, and then check the rules to create the next jobs in the chain.
11  Essentially knows where to find data, how to process data, and where to
12  put it when it is done (put in next persons INBOX) so the next Worker
13  in the chain can find data to work on.
14 
15  Hive based processing is a concept based on a more controlled version
16  of an autonomous agent type system. Each worker is not told what to do
17  (like a centralized control system - like the current pipeline system)
18  but rather queries a central database for jobs (give me jobs).
19 
20  Each worker is linked to an analysis_id, registers its self on creation
21  into the Hive, creates a RunnableDB instance of the Analysis->module,
22  gets relevant configuration information from the database, does its
23  work, creates the next layer of job entries by interfacing to
24  the DataflowRuleAdaptor to determine the analyses it needs to pass its
25  output data to and creates jobs on the database of the next analysis.
26  It repeats this cycle until it has lived its lifetime or until there are no
27  more jobs left to process.
28  The lifetime limit is a safety limit to prevent these from 'infecting'
29  a system and sitting on a compute node for longer than is socially exceptable.
30  This is primarily needed on compute resources like an LSF system where jobs
31  are not preempted and run until they are done.
32 
33  The Queens primary job is to create Workers to get the work down.
34  As part of this, she is also responsible for summarizing the status of the
35  analyses by querying the jobs, summarizing, and updating the
36  analysis_stats table. From this she is also responsible for monitoring and
37  'unblocking' analyses via the analysis_ctrl_rules.
38  The Queen is also responsible for freeing up jobs that were claimed by Workers
39  that died unexpectantly so that other workers can take over the work.
40 
41  The Beekeeper is in charge of interfacing between the Queen and a compute resource
42  or 'compute farm'. Its job is to query Queens if they need any workers and to
43  send the requested number of workers to open machines via the runWorker.pl script.
44  It is also responsible for interfacing with the Queen to identify workers which died
45  unexpectantly so that she can free the dead workers unfinished jobs.
46 
47 =head1 LICENSE
48 
49  See the NOTICE file distributed with this work for additional information
50  regarding copyright ownership.
51 
52  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
53  You may obtain a copy of the License at
54 
55  http://www.apache.org/licenses/LICENSE-2.0
56 
57  Unless required by applicable law or agreed to in writing, software distributed under the License
58  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59  See the License for the specific language governing permissions and limitations under the License.
60 
61 =head1 CONTACT
62 
63  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
64 
65 =head1 APPENDIX
66 
67  The rest of the documentation details each of the object methods.
68  Internal methods are usually preceded with a _
69 
70 =cut
71 
72 package Bio::EnsEMBL::Hive::Worker;
73 
74 
75 use strict;
76 use warnings;
77 use POSIX;
78 use File::Path 'make_path';
79 use File::Spec;
80 
85 use Bio::EnsEMBL::Hive::Utils ('dir_revhash', 'stringify', 'throw');
86 
87 use base ( 'Bio::EnsEMBL::Hive::Storable' );
88 
89 $| = 1;
90 
91  ## How often we should refresh the AnalysisStats objects
92 sub refresh_tolerance_seconds {
93  return 20;
94 }
95 
96 sub worker_error_threshold {
97  return 2;
98 }
99 
100 
101 
102 =head1 AUTOLOADED
103 
104  resource_class_id / resource_class
105 
106 =cut
107 
108 
109 sub init {
110  my $self = shift;
111 
112  my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
113  $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
114  $lifespan_stopwatch->restart;
115  $self->lifespan_stopwatch( $lifespan_stopwatch );
116 
117  return $self;
118 }
119 
120 
121 ## Storable object's getters/setters:
122 
123 
124 sub meadow_type {
125  my $self = shift;
126  $self->{'_meadow_type'} = shift if(@_);
127  return $self->{'_meadow_type'};
128 }
129 
130 
131 sub meadow_name {
132  my $self = shift;
133  $self->{'_meadow_name'} = shift if(@_);
134  return $self->{'_meadow_name'};
135 }
136 
137 
138 sub meadow_host {
139  my $self = shift;
140  $self->{'_meadow_host'} = shift if(@_);
141  return $self->{'_meadow_host'};
142 }
143 
144 
145 sub meadow_user {
146  my $self = shift;
147  $self->{'_meadow_user'} = shift if(@_);
148  return $self->{'_meadow_user'};
149 }
150 
151 
152 sub process_id {
153  my $self = shift;
154  $self->{'_process_id'} = shift if(@_);
155  return $self->{'_process_id'};
156 }
157 
158 
159 sub work_done {
160  my $self = shift;
161  $self->{'_work_done'} = shift if(@_);
162  return $self->{'_work_done'} || 0;
163 }
164 
165 
166 sub status {
167  my $self = shift;
168  $self->{'_status'} = shift if(@_);
169  return $self->{'_status'};
170 }
171 
172 
173 sub beekeeper_id {
174  my $self = shift;
175  $self->{'_beekeeper_id'} = shift if(@_);
176  return $self->{'_beekeeper_id'} || undef;
177 }
178 
179 
180 sub when_submitted {
181  my $self = shift;
182  $self->{'_when_submitted'} = shift if(@_);
183  return $self->{'_when_submitted'};
184 }
185 
186 
187 sub seconds_since_when_submitted {
188  my $self = shift;
189  $self->{'_seconds_since_when_submitted'} = shift if(@_);
190  return $self->{'_seconds_since_when_submitted'};
191 }
192 
193 
194 sub when_born {
195  my $self = shift;
196  $self->{'_when_born'} = shift if(@_);
197  return $self->{'_when_born'};
198 }
199 
200 
201 sub when_checked_in {
202  my $self = shift;
203  $self->{'_when_checked_in'} = shift if(@_);
204  return $self->{'_when_checked_in'};
205 }
206 
207 
208 sub when_seen {
209  my $self = shift;
210  $self->{'_when_seen'} = shift if(@_);
211  return $self->{'_when_seen'};
212 }
213 
214 
215 sub when_died {
216  my $self = shift;
217  $self->{'_when_died'} = shift if(@_);
218  return $self->{'_when_died'};
219 }
220 
221 
222 sub cause_of_death {
223  my $self = shift;
224  $self->{'_cause_of_death'} = shift if(@_);
225  return $self->{'_cause_of_death'};
226 }
227 
228 
229 =head2 log_dir
230 
231  Arg [1] : (optional) string directory path
232  Title : log_dir
233  Usage : $worker_log_dir = $self->log_dir;
234  $self->log_dir($worker_log_dir);
235  Description: Storable getter/setter attribute for the directory where STDOUT and STRERR of the worker will be redirected to.
236  In this directory each job will have its own .out and .err files.
237  Returntype : string
238 
239 =cut
240 
241 sub log_dir {
242  my $self = shift;
243  $self->{'_log_dir'} = shift if(@_);
244  return $self->{'_log_dir'};
245 }
246 
247 
248 =head2 temp_directory_name
249 
250  Arg [1] : (optional) string directory path
251  Title : temp_directory_name
252  Usage : $worker_tmp_dir = $self->temp_directory_name;
253  $self->temp_directory_name($worker_tmp_dir);
254  Description: Storable getter/setter attribute for the directory where jobs can store temporary data.
255  Returntype : string
256 
257 =cut
258 
259 sub temp_directory_name {
260  my $self = shift;
261  $self->{'_tmp_dir'} = shift if(@_);
262  return $self->{'_tmp_dir'};
263 }
264 
265 
266 ## Non-Storable attributes:
267 
268 sub current_role {
269  my $self = shift;
270 
271  if( @_ ) {
272  if( my $from_analysis = $self->{'_current_role'} && $self->{'_current_role'}->analysis ) {
273  $self->worker_say( "unspecializing from ".$from_analysis->logic_name.'('.$from_analysis->dbID.')' );
274  }
275  my $new_role = shift @_;
276  if( my $to_analysis = $new_role && $new_role->analysis ) {
277  $self->worker_say( "specializing to " . $to_analysis->logic_name . '('.($to_analysis->dbID // 'unstored') . ')' );
278  }
279  $self->{'_current_role'} = $new_role;
280  }
281  return $self->{'_current_role'};
282 }
283 
284 
285 sub debug {
286  my $self = shift;
287  $self->{'_debug'} = shift if(@_);
288  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
289  return $self->{'_debug'};
290 }
291 
292 
293 sub execute_writes {
294  my $self = shift;
295  $self->{'_execute_writes'} = shift if(@_);
296  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
297  return $self->{'_execute_writes'};
298 }
299 
300 
301 sub special_batch {
302  my $self = shift;
303  $self->{'_special_batch'} = shift if(@_);
304  return $self->{'_special_batch'};
305 }
306 
307 
308 sub perform_cleanup {
309  my $self = shift;
310  $self->{'_perform_cleanup'} = shift if(@_);
311  $self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
312  return $self->{'_perform_cleanup'};
313 }
314 
315 
316 # this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
317 
318 sub retry_throwing_jobs {
319  my $self = shift @_;
320 
321  $self->{'_retry_throwing_jobs'} = shift @_ if(@_);
322  return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
323 }
324 
325 
326 sub can_respecialize {
327  my $self = shift;
328  $self->{'_can_respecialize'} = shift if(@_);
329  return $self->{'_can_respecialize'};
330 }
331 
332 
333 =head2 life_span
334 
335  Arg [1] : (optional) integer $value (in seconds)
336  Title : life_span
337  Usage : $value = $self->life_span;
338  $self->life_span($new_value);
339  Description: Defines the maximum time a worker can live for. Workers are always
340  allowed to complete the jobs they get, but whether they can
341  do multiple rounds of work is limited by their life_span
342  DefaultValue : 3600 (60 minutes)
343  Returntype : integer scalar
344 
345 =cut
346 
347 sub life_span { # default life_span = 60minutes
348  my ($self, $value) = @_;
349 
350  if(defined($value)) { # you can still set it to 0 and avoid having the limit on lifespan
351  $self->{'_life_span'} = $value;
352  } elsif(!defined($self->{'_life_span'})) {
353  $self->{'_life_span'} = 60*60;
354  }
355  return $self->{'_life_span'};
356 }
357 
358 sub lifespan_stopwatch {
359  my $self = shift @_;
360 
361  if(@_) {
362  $self->{'_lifespan_stopwatch'} = shift @_;
363  }
364  return $self->{'_lifespan_stopwatch'};
365 }
366 
367 sub life_span_limit_reached {
368  my $self = shift @_;
369 
370  if( $self->life_span() ) {
371  my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
372  if($alive_for_secs > $self->life_span() ) {
373  return $alive_for_secs;
374  }
375  }
376  return 0;
377 }
378 
379 
380 =head2 job_limiter
381 
382  Title : job_limiter
383  Arg [1] : (optional) integer $value
384  Usage : $limiter_obj = $self->job_limiter;
385  $self->job_limiter($new_value);
386  Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
387  A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded.
388  DefaultValue : undef (relies on life_span to limit life of worker)
389  Returntype : Hive::Limiter object
390 
391 =cut
392 
393 sub job_limiter {
394  my $self=shift;
395  if( scalar(@_) or !defined($self->{'_job_limiter'}) ) {
396  $self->{'_job_limiter'} = Bio::EnsEMBL::Hive::Limiter->new("Total number of jobs this Worker is allowed to take", shift @_);
397  }
398  return $self->{'_job_limiter'};
399 }
400 
401 
402 sub more_work_done {
403  my ($self, $job_partial_timing) = @_;
404 
405  $self->{'_work_done'}++;
406 
407  while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
408  $self->{'_interval_partial_timing'}{$state} += $partial_timing_in_state;
409  }
410 }
411 
412 
413 # By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
414 #
415 # Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
416 # but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.
417 
418 sub prev_job_error {
419  my $self = shift @_;
420 
421  $self->{'_prev_job_error'} = shift if(@_);
422  return $self->{'_prev_job_error'};
423 }
424 
425 sub runnable_object {
426  my $self = shift @_;
427 
428  $self->{'_runnable_object'} = shift @_ if(@_);
429  return $self->{'_runnable_object'};
430 }
431 
432 
433 sub get_stdout_redirector {
434  my $self = shift;
435 
436  return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
437 }
438 
439 sub get_stderr_redirector {
440  my $self = shift;
441 
442  return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
443 }
444 
445 
446 sub worker_say {
447  my ($self, $msg) = @_;
448 
449  unless ($self->adaptor) {
450  print "Standalone worker $$ : $msg\n";
451  return;
452  }
453 
454  my $worker_id = $self->dbID();
455  my $current_role = $self->current_role;
456  my $job_id = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
457  print "Worker $worker_id [ ". ( $current_role
458  ? ('Role '.$current_role->dbID.' , '.$current_role->analysis->logic_name.'('.$current_role->analysis_id.')'
459  . ($job_id ? ", Job $job_id" : '')
460  )
461  : 'UNSPECIALIZED'
462  )." ] $msg\n";
463 }
464 
465 
466 sub toString {
467  my ($self, $include_analysis) = @_;
468 
469  my $current_role = $self->current_role;
470 
471  return join(', ',
472  $include_analysis ? ( 'analysis='.($current_role ? $current_role->analysis->logic_name.'('.$current_role->analysis_id.')' : 'UNSPECIALIZED') ) : (),
473  'resource_class_id='.($self->resource_class_id // 'NULL'),
474  'meadow='.$self->meadow_type.'/'.$self->meadow_name,
475  'process='.$self->meadow_user.($self->meadow_host ? '@'.$self->meadow_host : 'UNALLOCATED').'#'.$self->process_id,
476  'when_checked_in='.($self->when_checked_in // 'NEVER'),
477  'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() : 'UNSPECIALIZED'),
478  'job_limit='.($self->job_limiter->available_capacity() // 'NONE'),
479  'life_span='.($self->life_span // 'UNLIM'),
480  'worker_log_dir='.($self->log_dir // 'STDOUT/STDERR'),
481  );
482 }
483 
484 
485 ###############################
486 #
487 # WORK section
488 #
489 ###############################
490 
491 
492 =head2 run
493 
494  Title : run
495  Usage : $worker->run;
496  Description:
497  This is a self looping autonomous function to process jobs.
498  First all STDOUT/STDERR is rediected, then looping commences.
499  Looping consists of
500  1) claiming jobs,
501  2) processing those jobs through an instance of the 'module class' of
502  the analysis asigned to this worker,
503  3) updating the job, analysis_stats, and hive tables to track the
504  progress of the job, the analysis and this worker.
505  Looping stops when any one of these are met:
506  1) there is no more jobs to process
507  2) job_limit is reached
508  3) life_span has been reached.
509  Returntype : none
510 
511 =cut
512 
513 sub run {
514  my ($self, $specialization_arghash) = @_;
515 
516  if( my $worker_log_dir = $self->log_dir ) {
517  $self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
518  $self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
519  }
520 
522  my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
523 
524  print "\n"; # to clear beekeeper's prompt in case output is not logged
525  $self->worker_say( $self->toString() );
526  $self->specialize_and_compile_wrapper( $specialization_arghash );
527 
528  while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
529 
530  my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
531  my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
532  $self->{'_interval_partial_timing'} = {};
533 
534  if( my $special_batch = $self->special_batch() ) {
535  my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
536  $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch, $special_batch_length );
537  $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ? 'JOB_LIMIT' : 'CONTAMINATED');
538  } else { # a proper "BATCHES" loop
539 
540  while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
541  my $current_role = $self->current_role;
542 
543  if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
544  my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
545  $self->worker_say( $msg );
546  $self->cause_of_death('CONTAMINATED');
547  $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
548 
549  } elsif( $self->job_limiter->reached()) {
550  $self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
551  $self->cause_of_death('JOB_LIMIT');
552 
553  } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
554  $self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
555  $self->cause_of_death('LIFESPAN');
556 
557  } else {
558  # No need to refresh the stats or the hive_current_load # since it's all been refreshed in
559  # specialize_and_compile_wrapper()
560  my $stats = $current_role->analysis->stats;
561  my $desired_batch_size = $stats->get_or_estimate_batch_size();
562  my $hit_the_limit; # dummy at the moment
563  ($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
564 
565  my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
566 
567  if($self->debug) {
568  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
569  "Claiming: ready_job_count=".$stats->ready_job_count
570  .", num_running_workers=".$stats->num_running_workers
571  .", desired_batch_size=$desired_batch_size, actual_batch_size=".scalar(@$actual_batch),
572  'INFO' );
573  }
574 
575  if(scalar(@$actual_batch)) {
576  my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
577  $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
578  $self->job_limiter->final_decision( $jobs_done_by_this_batch );
579  } else {
580  $self->cause_of_death('NO_WORK');
581  }
582  }
583  }
584  }
585 
586  # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
587  # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
588  #
589  if($jobs_done_by_batches_loop) {
590 
591  $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
592  $self->current_role->analysis->dbID,
593  $jobs_done_by_batches_loop,
594  $batches_stopwatch->get_elapsed,
595  $self->{'_interval_partial_timing'}{'FETCH_INPUT'} || 0,
596  $self->{'_interval_partial_timing'}{'RUN'} || 0,
597  $self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
598  );
599  }
600 
601  # A mechanism whereby workers can be caused to exit even if they were doing fine:
602  if (!$self->cause_of_death) {
603  # We're here after having run a batch, so we need to refresh the stats
604  my $analysis = $self->current_role->analysis;
605  my $stats = $analysis->stats;
606  if ( $stats->refresh($self->refresh_tolerance_seconds) ) { # if we DID refresh
607  $self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
608  $stats->hive_pipeline->invalidate_hive_current_load;
609  if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
610  or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
611  ) {
612  $self->cause_of_death('HIVE_OVERLOAD');
613  }
614  }
615  }
616 
617  my $cod = $self->cause_of_death() || '';
618 
619  if( $cod eq 'NO_WORK') {
620  $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id, 'ALL_CLAIMED' );
621  }
622 
623  # Respecialize if:
624  # 1) No work to do (computed across all
625  # 2) allowed to by the command-line option
626  # 3) [heuristic] there are some possible candidates for the next analysis (i.e. no pattern set or the pattern has multiple components). This doesn't guarantee that the pattern will resolve to multiple analyses !
627  if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{'-analyses_pattern'} or $specialization_arghash->{'-analyses_pattern'}!~/^\w+$/) ) {
628  my $old_role = $self->current_role;
629  $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 0 );
630  $self->current_role( undef );
631  $self->cause_of_death(undef);
632  $self->specialize_and_compile_wrapper( $specialization_arghash );
633  }
634 
635  } # /Worker's lifespan loop
636 
637  # The second argument ("update_when_checked_in") is set to force an
638  # update of the "when_checked_in" timestamp in the worker table
639  $self->adaptor->register_worker_death($self, 1);
640 
641  if($self->debug) {
642  $self->worker_say( 'AnalysisStats : '.$self->current_role->analysis->stats->toString ) if( $self->current_role );
643  $self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
644  }
645 
646  $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
647 
648  if( $self->log_dir ) {
649  $self->get_stdout_redirector->pop();
650  $self->get_stderr_redirector->pop();
651  }
652 }
653 
654 
655 sub specialize_and_compile_wrapper {
656  my ($self, $specialization_arghash) = @_;
657 
658  eval {
659  $self->enter_status('SPECIALIZATION');
660  $self->adaptor->specialize_worker( $self, $specialization_arghash );
661  1;
662  } or do {
663  my $msg = $@;
664  chomp $msg;
665  $self->worker_say( "specialization failed:\t$msg" );
666 
667  $self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
668 
669  my $message_class;
670  if ($self->cause_of_death() eq "NO_ROLE") {
671  $message_class = 'INFO';
672  } else {
673  $message_class = 'WORKER_ERROR'
674  }
675 
676  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $message_class );
677  };
678 
679  if( !$self->cause_of_death() ) {
680  $self->compile_runnable;
681  }
682 }
683 
684 sub compile_runnable {
685  my $self = shift;
686  eval {
687  $self->enter_status('COMPILATION');
688 
689  my $current_analysis = $self->current_role->analysis;
690  my $runnable_object = $current_analysis->get_compiled_module_name->new($self->debug, $current_analysis->language, $current_analysis->module) # Only GuestProcess will read the arguments
691  or die "Unknown compilation error";
692 
693  $runnable_object->worker( $self );
694 
695  $self->runnable_object( $runnable_object );
696  $self->enter_status('READY');
697 
698  1;
699  } or do {
700  my $last_err = $@;
701  $self->handle_compilation_failure($last_err);
702  };
703 }
704 
705 sub handle_compilation_failure {
706  my ($self, $msg) = @_;
707  $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" );
708  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 'WORKER_ERROR' );
709 
710  $self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
711 
712  $self->check_analysis_for_exclusion();
713 }
714 
715 sub run_one_batch {
716  my ($self, $jobs, $is_special_batch) = @_;
717 
718  my $jobs_done_here = 0;
719 
720  my $current_role = $self->current_role;
721  my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
722  my $stats = $current_role->analysis->stats; # cache it to avoid reloading
723 
724  $self->adaptor->check_in_worker( $self );
725  $self->adaptor->safe_synchronize_AnalysisStats( $stats );
726 
727  if($self->debug) {
728  $self->worker_say( 'AnalysisStats : ' . $stats->toString );
729  $self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
730  }
731 
732  my $job_partial_timing;
733 
734  ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
735 
736  my $job_id = $job->dbID();
737  $self->worker_say( $job->toString ) if($self->debug);
738 
739  my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
740  $job_partial_timing = {};
741 
742  $self->start_job_output_redirection($job); # switch logging into job's STDERR
743  eval { # capture any throw/die
744  my $runnable_object = $self->runnable_object();
745  $runnable_object->input_job( $job ); # "take" the job
746 
747  $job->incomplete(1);
748  $self->adaptor->db->dbc->query_count(0);
749  $job_stopwatch->restart();
750 
751  $job->load_parameters( $runnable_object );
752 
753  $self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
754 
755  $job_partial_timing = $runnable_object->life_cycle();
756  };
757  if(my $msg = $@) {
758  $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
759  Bio::EnsEMBL::Hive::Process::warning($self->runnable_object, $msg, $job->incomplete?'WORKER_ERROR':'INFO'); # In case the Runnable has redefined warning()
760  }
761 
762  # whether the job completed successfully or not:
763  $self->runnable_object->input_job( undef ); # release an extra reference to the job
764  $job->runtime_msec( $job_stopwatch->get_elapsed );
765  $job->query_count( $self->adaptor->db->dbc->query_count );
766 
767  my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
768 
769  print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
770  $self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
771  $self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
772 
773  $self->current_role->register_attempt( ! $job->died_somewhere );
774 
775  if($job->died_somewhere) {
776  # Both flags default to 1, meaning that jobs would by default be retried.
777  # If the job specifically said not to retry, or if the worker is configured
778  # not to retry jobs, follow their wish.
779  my $may_retry = $job->transient_error && $self->retry_throwing_jobs;
780 
781  $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
782 
783  if( $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
784  or $job->lethal_for_worker ) { # trust the job's expert knowledge
785  my $reason = $self->prev_job_error ? 'two failed jobs in a row'
786  : 'suggested by job itself';
787  $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
788  $self->cause_of_death('CONTAMINATED');
789  last ONE_BATCH;
790  }
791  } else { # job successfully completed:
792  $self->more_work_done( $job_partial_timing );
793  $jobs_done_here++;
794  $job->set_and_update_status('DONE');
795 
796  if( my $controlled_semaphore = $job->controlled_semaphore ) {
797  $controlled_semaphore->decrease_by( [ $job ] );
798  }
799 
800  if($job->lethal_for_worker) {
801  $self->worker_say( "The Job, although complete, wants the Worker to die" );
802  $self->cause_of_death('CONTAMINATED');
803  last ONE_BATCH;
804  }
805  }
806 
807  $self->prev_job_error( $job->died_somewhere );
808  $self->enter_status('READY');
809 
810  # UNCLAIM THE SURPLUS:
811  my $remaining_jobs_in_batch = scalar(@$jobs);
812  if( !$is_special_batch and $remaining_jobs_in_batch and $stats->refresh( $self->refresh_tolerance_seconds ) ) { # if we DID refresh
813  my $ready_job_count = $stats->ready_job_count;
814  $stats->hive_pipeline->invalidate_hive_current_load;
815  my $optimal_batch_now = $stats->get_or_estimate_batch_size( $remaining_jobs_in_batch );
816  my $jobs_to_unclaim = $remaining_jobs_in_batch - $optimal_batch_now;
817  if($self->debug) {
818  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self,
819  "Check-point: rdy=$ready_job_count, rem=$remaining_jobs_in_batch, "
820  . "opt=$optimal_batch_now, 2unc=$jobs_to_unclaim",
821  'INFO' );
822  }
823  if( $jobs_to_unclaim > 0 ) {
824  # FIXME: a faster way would be to unclaim( splice(@$jobs, -$jobs_to_unclaim) ); # unclaim the last $jobs_to_unclaim elements
825  # currently we just dump all the remaining jobs and prepare to take a fresh batch:
826  $job->adaptor->release_claimed_jobs_from_role( $current_role );
827  $jobs = [];
828  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "Unclaimed $jobs_to_unclaim jobs (trimming the tail)", 'INFO' );
829  }
830  }
831 
832  } # /while(my $job = shift @$jobs)
833 
834  return $jobs_done_here;
835 }
836 
837 
838 sub set_and_update_status {
839  my ($self, $status ) = @_;
840 
841  $self->status($status);
842 
843  if(my $adaptor = $self->adaptor) {
844  $adaptor->check_in_worker( $self );
845  }
846 }
847 
848 
849 sub enter_status {
850  my ($self, $status) = @_;
851 
852  if($self->debug) {
853  $self->worker_say( '-> '.$status );
854  }
855 
856  $self->set_and_update_status( $status );
857 }
858 
859 
860 sub start_job_output_redirection {
861  my ($self, $job) = @_;
862 
863  if(my $worker_log_dir = $self->log_dir) {
864  $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
865  $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
866 
867  if(my $job_adaptor = $job->adaptor) {
868  $job_adaptor->store_out_files($job);
869  }
870  }
871 }
872 
873 
874 sub stop_job_output_redirection {
875  my ($self, $job) = @_;
876 
877  if($self->log_dir) {
878  $self->get_stdout_redirector->pop();
879  $self->get_stderr_redirector->pop();
880 
881  my $force_cleanup = !($self->debug || $job->died_somewhere);
882 
883  if($force_cleanup or -z $job->stdout_file) {
884  $self->worker_say( "Deleting '".$job->stdout_file."' file" );
885  unlink $job->stdout_file;
886  $job->stdout_file(undef);
887  }
888  if($force_cleanup or -z $job->stderr_file) {
889  $self->worker_say( "Deleting '".$job->stderr_file."' file" );
890  unlink $job->stderr_file;
891  $job->stderr_file(undef);
892  }
893 
894  if(my $job_adaptor = $job->adaptor) {
895  $job_adaptor->store_out_files($job);
896  }
897  }
898 }
899 
900 sub check_analysis_for_exclusion {
901  my $self = shift(@_);
902  my $worker_errors_this_analysis =
903  $self->adaptor->db->get_LogMessageAdaptor()->count_analysis_events(
904  $self->current_role->analysis_id,
905  'WORKER_ERROR');
906 # warn "There are $worker_errors_this_analysis worker errors for this analysis\n";
907  if ($worker_errors_this_analysis > $self->worker_error_threshold) {
908  my $current_logic_name = $self->current_role->analysis->logic_name;
909  $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, "setting analysis '$current_logic_name' to excluded", 'INFO' );
910  $self->current_role->analysis->stats->is_excluded(1);
911  $self->adaptor->db->get_AnalysisStatsAdaptor->update_is_excluded($self->current_role->analysis->stats);
912  }
913 }
914 
915 sub set_log_directory_name {
916  my ($self, $hive_log_dir, $worker_log_dir) = @_;
917 
918  return unless ($hive_log_dir or $worker_log_dir);
919 
920  my $dir_revhash = dir_revhash($self->dbID // ''); # Database-less workers are not hashed
921  $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') . ($self->adaptor ? 'worker_id_' . $self->dbID : 'standalone/worker_pid_' . $self->process_id);
922 
923  eval {
924  make_path( $worker_log_dir );
925  1;
926  } or die "Could not create '$worker_log_dir' directory : $@";
927 
928  $self->log_dir( $worker_log_dir );
929  $self->adaptor->update_log_dir( $self ) if $self->adaptor; # autoloaded
930 }
931 
932 
933 =head2 set_temp_directory_name
934 
935  Title : set_temp_directory_name
936  Description : Generates and sets the name of a temporary directory suitable for this worker.
937  It will be under the base directory requested by $base_temp_dir, or the standard
938  location otherwise (as advised by File::Spec), and includes worker attributes
939  to make the path unique.
940 
941 =cut
942 
943 sub set_temp_directory_name {
944  my ($self, $base_temp_dir) = @_;
945 
946  $base_temp_dir //= File::Spec->tmpdir();
947 
948  my $temp_directory_name;
949  if ($self->adaptor) {
950  $temp_directory_name = sprintf('%s/worker_%s_%s.%s/', $base_temp_dir, $self->meadow_user, $self->hive_pipeline->hive_pipeline_name, $self->dbID);
951  } else {
952  $temp_directory_name = sprintf('%s/worker_%s.standalone.%s/', $base_temp_dir, $self->meadow_user, $self->process_id);
953  }
954 
955  $self->temp_directory_name( $temp_directory_name );
956  $self->adaptor->update_temp_directory_name( $self ) if $self->adaptor; # autoloaded
957 }
958 
959 
960 1;
Bio::EnsEMBL::Hive::Utils
Definition: Collection.pm:4
Bio::EnsEMBL::Hive::Limiter::new
public new()
Bio::EnsEMBL::Hive::Utils::RedirectStack::new
public new()
Bio::EnsEMBL::Hive::Limiter
Definition: Limiter.pm:10
Bio::EnsEMBL::Hive::AnalysisStats
Definition: AnalysisStats.pm:12
Bio::EnsEMBL::Hive::AnalysisStats::min_batch_time
public min_batch_time()
Bio::EnsEMBL::Hive::Utils::RedirectStack
Definition: RedirectStack.pm:14
Bio::EnsEMBL::Hive::Worker
Definition: Worker.pm:53
debug
public debug()
run
public run()
Bio::EnsEMBL::Hive::Utils::Stopwatch
Definition: Stopwatch.pm:33
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::Utils::Stopwatch::new
public new()