ensembl-hive  2.5
Process.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 DESCRIPTION
8 
9  Abstract superclass. Each Process makes up the individual building blocks
10  of the system. Instances of these processes are created in a hive workflow
11  graph of Analysis entries that are linked together with dataflow and
12  AnalysisCtrl rules.
13 
14  Instances of these Processes are created by the system as work is done.
15  The newly created Process will have preset $self->db, $self->dbc, $self->input_id
16  and several other variables.
17  From this input and configuration data, each Process can then proceed to
18  do something. The flow of execution within a Process is:
19  pre_cleanup() if($retry_count>0); # clean up databases/filesystem before subsequent attempts
20  fetch_input(); # fetch the data from databases/filesystems
21  run(); # perform the main computation
22  write_output(); # record the results in databases/filesystems
23  post_healthcheck(); # check if we got the expected result (optional)
24  post_cleanup(); # destroy all non-trivial data structures after the job is done
25  The developer can implement their own versions of
26  pre_cleanup, fetch_input, run, write_output, and post_cleanup to do what they need.
27 
28  The entire system is based around the concept of a workflow graph which
29  can split and loop back on itself. This is accomplished by dataflow
30  rules (similar to Unix pipes) that connect one Process (or analysis) to others.
31  Where a Unix command line program can send output on STDOUT STDERR pipes,
32  a hive Process has access to unlimited pipes referenced by numerical
33  branch_codes. This is accomplished within the Process via
34  $self->dataflow_output_id(...);
35 
36  The design philosophy is that each Process does its work and creates output,
37  but it doesn't worry about where the input came from, or where its output
38  goes. If the system has dataflow pipes connected, then the output jobs
39  have purpose, if not - the output work is thrown away. The workflow graph
40  'controls' the behaviour of the system, not the processes. The processes just
41  need to do their job. The design of the workflow graph is based on the knowledge
42  of what each Process does so that the graph can be correctly constructed.
43  The workflow graph can be constructed a priori or can be constructed and
44  modified by intelligent Processes as the system runs.
45 
46 
47  The Hive is based on AI concepts and modeled on the social structure and
48  behaviour of a honey bee hive. So where a worker honey bee's purpose is
49  (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive
50  worker's purpose is (find a job, create a Process for that job, run it,
51  drop off output job(s), repeat). While most workflow systems are based
52  on 'smart' central controllers and external control of 'dumb' processes,
53  the Hive is based on 'dumb' workflow graphs and job kiosk, and 'smart' workers
54  (autonomous agents) who are self configuring and figure out for themselves what
55  needs to be done, and then do it. The workers are based around a set of
56  emergent behaviour rules which allow a predictible system behaviour to emerge
57  from what otherwise might appear at first glance to be a chaotic system. There
58  is an inherent asynchronous disconnect between one worker and the next.
59  Work (or jobs) are simply 'posted' on a blackboard or kiosk within the hive
60  database where other workers can find them.
61  The emergent behaviour rules of a worker are:
62  1) If a job is posted, someone needs to do it.
63  2) Don't grab something that someone else is working on
64  3) Don't grab more than you can handle
65  4) If you grab a job, it needs to be finished correctly
66  5) Keep busy doing work
67  6) If you fail, do the best you can to report back
68 
69  For further reading on the AI principles employed in this design see:
70  http://en.wikipedia.org/wiki/Autonomous_Agent
71  http://en.wikipedia.org/wiki/Emergence
72 
73 =head1 LICENSE
74 
75  Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
76  Copyright [2016-2022] EMBL-European Bioinformatics Institute
77 
78  Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
79  You may obtain a copy of the License at
80 
81  http://www.apache.org/licenses/LICENSE-2.0
82 
83  Unless required by applicable law or agreed to in writing, software distributed under the License
84  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
85  See the License for the specific language governing permissions and limitations under the License.
86 
87 =head1 CONTACT
88 
89  Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
90 
91 =head1 APPENDIX
92 
93  The rest of the documentation details each of the object methods.
94  Internal methods are usually preceded with a _
95 
96 =cut
97 
98 
99 package Bio::EnsEMBL::Hive::Process;
100 
101 use strict;
102 use warnings;
103 
104 use File::Path qw(remove_tree);
105 use JSON;
106 use Scalar::Util qw(looks_like_number);
107 use Time::HiRes qw(time);
108 
109 use Bio::EnsEMBL::Hive::Utils ('stringify', 'go_figure_dbc', 'join_command_args', 'timeout');
110 use Bio::EnsEMBL::Hive::Utils::Stopwatch;
111 
112 
113 sub new {
114  my $class = shift @_;
115 
116  my $self = bless {}, $class;
117 
118  return $self;
119 }
120 
121 
122 sub life_cycle {
123  my ($self) = @_;
124 
125  my $job = $self->input_job();
126  my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
127  my %job_partial_timing = ();
128 
129  $job->incomplete(1); # reinforce, in case the life_cycle is not run by a Worker
130  $job->autoflow(1);
131 
132  eval {
133  # Catch all the "warn" calls
134  #$SIG{__WARN__} = sub { $self->warning(@_) };
135 
136  if( $self->can('pre_cleanup') and $job->retry_count()>0 ) {
137  $self->enter_status('PRE_CLEANUP');
138  $self->pre_cleanup;
139  }
140 
141  # PRE_HEALTHCHECK can come here
142 
143  $self->enter_status('FETCH_INPUT');
144  $partial_stopwatch->restart();
145  $self->fetch_input;
146  $job_partial_timing{'FETCH_INPUT'} = $partial_stopwatch->pause->get_elapsed;
147 
148  $self->enter_status('RUN');
149  $partial_stopwatch->restart();
150  $self->run;
151  $job_partial_timing{'RUN'} = $partial_stopwatch->pause->get_elapsed;
152 
153  if($self->worker->execute_writes) {
154  $self->enter_status('WRITE_OUTPUT');
155  $partial_stopwatch->restart();
156  $self->write_output;
157  $job_partial_timing{'WRITE_OUTPUT'} = $partial_stopwatch->pause->get_elapsed;
158 
159  if( $self->can('post_healthcheck') ) {
160  $self->enter_status('POST_HEALTHCHECK');
161  $self->post_healthcheck;
162  }
163  } else {
164  $self->say_with_header( ": *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW" );
165  }
166  };
167  # Restore the default handler
168  #$SIG{__WARN__} = 'DEFAULT';
169 
170  if(my $life_cycle_msg = $@) {
171  $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
172  Bio::EnsEMBL::Hive::Process::warning($self, $life_cycle_msg, $job->incomplete?'WORKER_ERROR':'INFO'); # In case the Runnable has redefined warning()
173  }
174 
175  if( $self->can('post_cleanup') ) { # may be run to clean up memory even after partially failed attempts
176  eval {
177  $job->incomplete(1); # it could have been reset by a previous call to complete_early
178  $self->enter_status('POST_CLEANUP');
179  $self->post_cleanup;
180  };
181  if(my $post_cleanup_msg = $@) {
182  $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
183  Bio::EnsEMBL::Hive::Process::warning($self, $post_cleanup_msg, $job->incomplete?'WORKER_ERROR':'INFO'); # In case the Runnable has redefined warning()
184  }
185  }
186 
187  unless( $job->died_somewhere ) {
188 
189  if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
190  $self->say_with_header( ': AUTOFLOW input->output' );
191  $job->dataflow_output_id();
192  }
193 
194  my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
195  if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
196  $job->transient_error(0);
197  die "The group of semaphored jobs is incomplete ! Some fan jobs (coming from dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") are missing a job on their funnel. Check the order of your dataflow_output_id() calls.";
198  }
199 
200  $job->incomplete(0);
201 
202  return \%job_partial_timing;
203  }
204 }
205 
206 
207 sub say_with_header {
208  my ($self, $msg, $important) = @_;
209 
210  $important //= $self->debug();
211 
212  if($important) {
213  if(my $worker = $self->worker) {
214  $worker->worker_say( $msg );
215  } else {
216  print "StandaloneJob $msg\n";
217  }
218  }
219 }
220 
221 
222 sub enter_status {
223  my ($self, $status) = @_;
224 
225  my $job = $self->input_job();
226 
227  $job->set_and_update_status( $status );
228 
229  if(my $worker = $self->worker) {
230  $worker->set_and_update_status( 'JOB_LIFECYCLE' ); # to ensure when_checked_in TIMESTAMP is updated
231  }
232 
233  $self->say_with_header( '-> '.$status );
234 }
235 
236 
237 sub warning {
238  my ($self, $msg, $message_class) = @_;
239 
240  $message_class = 'WORKER_ERROR' if $message_class && looks_like_number($message_class);
241  $message_class ||= 'INFO';
242  chomp $msg;
243 
244  $self->say_with_header( "$message_class : $msg", 1 );
245 
246  my $job = $self->input_job;
247  my $worker = $self->worker;
248 
249  if(my $job_adaptor = ($job && $job->adaptor)) {
250  $job_adaptor->db->get_LogMessageAdaptor()->store_job_message($job->dbID, $msg, $message_class);
251  } elsif(my $worker_adaptor = ($worker && $worker->adaptor)) {
252  $worker_adaptor->db->get_LogMessageAdaptor()->store_worker_message($worker, $msg, $message_class);
253  }
254 }
255 
256 
257 ##########################################
258 #
259 # methods subclasses should override
260 # in order to give this process function
261 #
262 ##########################################
263 
264 
265 =head2 param_defaults
266 
267  Title : param_defaults
268  Function: sublcass can define defaults for all params used by the RunnableDB/Process
269 
270 =cut
271 
272 sub param_defaults {
273  return {};
274 }
275 
276 
277 #
278 ## Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
279 #
280 
281 # sub pre_cleanup {
282 # my $self = shift;
283 #
284 # return 1;
285 # }
286 
287 
288 =head2 fetch_input
289 
290  Title : fetch_input
291  Function: sublcass can implement functions related to data fetching.
292  Typical acivities would be to parse $self->input_id .
293  Subclasses may also want to fetch data from databases
294  or from files within this function.
295 
296 =cut
297 
298 sub fetch_input {
299  my $self = shift;
300 
301  return 1;
302 }
303 
304 
305 =head2 run
306 
307  Title : run
308  Function: sublcass can implement functions related to process execution.
309  Typical activities include running external programs or running
310  algorithms by calling perl methods. Process may also choose to
311  parse results into memory if an external program was used.
312 
313 =cut
314 
315 sub run {
316  my $self = shift;
317 
318  return 1;
319 }
320 
321 
322 =head2 write_output
323 
324  Title : write_output
325  Function: sublcass can implement functions related to storing results.
326  Typical activities including writing results into database tables
327  or into files on a shared filesystem.
328 
329 =cut
330 
331 sub write_output {
332  my $self = shift;
333 
334  return 1;
335 }
336 
337 
338 #
339 ## Function: sublcass can implement functions related to cleaning up after running one job
340 # (destroying non-trivial data structures in memory).
341 #
342 
343 #sub post_cleanup {
344 # my $self = shift;
345 #
346 # return 1;
347 #}
348 
349 
350 ######################################################
351 #
352 # methods that subclasses can use to get access
353 # to hive infrastructure
354 #
355 ######################################################
356 
357 
358 =head2 worker
359 
360  Title : worker
361  Usage : my $worker = $self->worker;
362  Function: returns the Worker object this Process is run by
364 
365 =cut
366 
367 sub worker {
368  my $self = shift;
369 
370  $self->{'_worker'} = shift if(@_);
371  return $self->{'_worker'};
372 }
373 
374 
375 sub execute_writes {
376  my $self = shift;
377 
378  return $self->worker->execute_writes(@_);
379 }
380 
381 
382 =head2 db
383 
384  Title : db
385  Usage : my $hiveDBA = $self->db;
386  Function: returns DBAdaptor to Hive database
388 
389 =cut
390 
391 sub db {
392  my $self = shift;
393 
394  return $self->worker->adaptor && $self->worker->adaptor->db(@_);
395 }
396 
397 
398 =head2 dbc
399 
400  Title : dbc
401  Usage : my $hiveDBConnection = $self->dbc;
402  Function: returns DBConnection to Hive database
404 
405 =cut
406 
407 sub dbc {
408  my $self = shift;
409 
410  return $self->db && $self->db->dbc;
411 }
412 
413 
414 =head2 data_dbc
415 
416  Title : data_dbc
417  Usage : my $data_dbc = $self->data_dbc;
418  Function: returns a Bio::EnsEMBL::Hive::DBSQL::DBConnection object (the "current" one by default, but can be set up otherwise)
420 
421 =cut
422 
423 sub data_dbc {
424  my $self = shift @_;
425 
426  my $given_db_conn = shift @_ || ($self->param_is_defined('db_conn') ? $self->param('db_conn') : $self);
427  my $given_ref = ref( $given_db_conn );
428  my $given_signature = ($given_ref eq 'ARRAY' or $given_ref eq 'HASH') ? stringify ( $given_db_conn ) : "$given_db_conn";
429 
430  if (!$self->param_is_defined('db_conn') and !$self->db and !$self->dbc) {
431  # go_figure_dbc won't be able to create a DBConnection, so let's
432  # just print a nicer error message
433  $self->input_job->transient_error(0);
434  throw('In standaloneJob mode, $self->data_dbc requires the -db_conn parameter to be defined on the command-line');
435  }
436 
437  if( !$self->{'_cached_db_signature'} or ($self->{'_cached_db_signature'} ne $given_signature) ) {
438  $self->{'_cached_db_signature'} = $given_signature;
439  $self->{'_cached_data_dbc'} = go_figure_dbc( $given_db_conn );
440  }
441 
442  return $self->{'_cached_data_dbc'};
443 }
444 
445 
446 =head2 run_system_command
447 
448  Title : run_system_command
449  Arg[1] : (string or arrayref) Command to be run
450  Arg[2] : (hashref, optional) Options, amongst:
451  - use_bash_pipefail: when enabled, a command with pipes will require all sides to succeed
452  - use_bash_errexit: when enabled, will stop at the first failure (otherwise commands such as "do_something_that_fails; do_something_that_succeeds" would return 0)
453  - timeout: the maximum number of seconds the command can run for. Will return the exit code -2 if the command has to be aborted
454  Usage : my $return_code = $self->run_system_command('script.sh with many_arguments'); # Command as a single string
455  my $return_code = $self->run_system_command(['script.sh', 'arg1', 'arg2']); # Command as an array-ref
456  my ($return_code, $stderr, $string_command) = $self->run_system_command(['script.sh', 'arg1', 'arg2']); # Same in list-context. $string_command will be "script.sh arg1 arg2"
457  my $return_code = $self->run_system_command('script1.sh with many_arguments | script2.sh', {'use_bash_pipefail' => 1}); # Command with pipes evaluated in a bash "pipefail" environment
458  Function: Runs a command given as a single-string or an array-ref. The second argument is a list of options
459  Returns : Returns the return-code in scalar context, or a triplet (return-code, standard-error, command) in list context
460 
461 =cut
462 
463 sub run_system_command {
464  my ($self, $cmd, $options) = @_;
465 
466  require Capture::Tiny;
467 
468  $options //= {};
469  my ($join_needed, $flat_cmd) = join_command_args($cmd);
470  my @cmd_to_run;
471 
472  my $need_bash = $options->{'use_bash_pipefail'} || $options->{'use_bash_errexit'};
473  if ($need_bash) {
474  @cmd_to_run = ('bash',
475  $options->{'use_bash_pipefail'} ? ('-o' => 'pipefail') : (),
476  $options->{'use_bash_errexit'} ? ('-o' => 'errexit') : (),
477  '-c' => $flat_cmd);
478  } else {
479  # Let's use the array if possible, it saves us from running a shell
480  @cmd_to_run = $join_needed ? $flat_cmd : (ref($cmd) ? @$cmd : $cmd)
481  }
482 
483  $self->say_with_header("Command given: " . stringify($cmd));
484  $self->say_with_header("Command to run: " . stringify(\@cmd_to_run));
485 
486  $self->dbc and $self->dbc->disconnect_if_idle(); # release this connection for the duration of system() call
487 
488  my $return_value;
489 
490  # Capture:Tiny has weird behavior if 'require'd instead of 'use'd
491  # see, for example,http://www.perlmonks.org/?node_id=870439
492  my $starttime = time() * 1000;
493  my ($stdout, $stderr) = Capture::Tiny::tee(sub {
494  $return_value = timeout( sub {system(@cmd_to_run)}, $options->{'timeout'} );
495  });
496  die sprintf("Could not run '%s', got %s\nSTDERR %s\n", $flat_cmd, $return_value, $stderr) if $return_value && $options->{die_on_failure};
497 
498  return ($return_value, $stderr, $flat_cmd, $stdout, time()*1000-$starttime) if wantarray;
499  return $return_value;
500 }
501 
502 
503 =head2 input_job
504 
505  Title : input_job
506  Function: Returns the AnalysisJob to be run by this process
507  Subclasses should treat this as a read_only object.
508  Returns : Bio::EnsEMBL::Hive::AnalysisJob object
509 
510 =cut
511 
512 sub input_job {
513  my $self = shift @_;
514 
515  if(@_) {
516  if(my $job = $self->{'_input_job'} = shift) {
517  throw("Not a Bio::EnsEMBL::Hive::AnalysisJob object") unless ($job->isa("Bio::EnsEMBL::Hive::AnalysisJob"));
518  }
519  }
520  return $self->{'_input_job'};
521 }
522 
523 
524 # ##################### subroutines that link through to Job's methods #########################
525 
526 sub input_id {
527  my $self = shift;
528 
529  return $self->input_job->input_id(@_);
530 }
531 
532 sub param {
533  my $self = shift @_;
534 
535  return $self->input_job->param(@_);
536 }
537 
538 sub param_required {
539  my $self = shift @_;
540 
541  my $prev_transient_error = $self->input_job->transient_error(); # make a note of previously set transience status
542  $self->input_job->transient_error(0); # make sure if we die in param_required it is not transient
543 
544  my $value = $self->input_job->param_required(@_);
545 
546  $self->input_job->transient_error($prev_transient_error); # restore the previous transience status
547  return $value;
548 }
549 
550 sub param_exists {
551  my $self = shift @_;
552 
553  return $self->input_job->param_exists(@_);
554 }
555 
556 sub param_is_defined {
557  my $self = shift @_;
558 
559  return $self->input_job->param_is_defined(@_);
560 }
561 
562 sub param_substitute {
563  my $self = shift @_;
564 
565  return $self->input_job->param_substitute(@_);
566 }
567 
568 sub dataflow_output_id {
569  my $self = shift @_;
570 
571  # Let's not spend time stringifying a large object if it's not going to be printed anyway
572  $self->say_with_header('Dataflow on branch #' . ($_[1] // 1) . (defined $_[0] ? ' of ' . stringify($_[0]) : ' (no parameters -> input parameters repeated)')) if $self->debug;
573  return $self->input_job->dataflow_output_id(@_);
574 }
575 
576 
577 =head2 dataflow_output_ids_from_json
578 
579  Title : dataflow_output_ids_from_json
580  Arg[1] : File name
581  Arg[2] : (optional) Branch number, defaults to 1 (see L<AnalysisJob::dataflow_output_id>)
582  Function: Wrapper around L<dataflow_output_id> that takes the output_ids from a JSON file.
583  Each line in the JSON file is expected to be a complete JSON structure, which
584  may be prefixed with a branch number
585 
586 =cut
587 
588 sub dataflow_output_ids_from_json {
589  my ($self, $filename, $default_branch) = @_;
590 
591  my $json_formatter = JSON->new()->indent(0);
592  my @output_job_ids;
593  open(my $fh, '<', $filename) or die "Could not open '$filename' because: $!";
594  while (my $l = $fh->getline()) {
595  chomp $l;
596  my $branch = $default_branch;
597  my $json = $l;
598  if ($l =~ /^(-?\d+)\s+(.*)$/) {
599  $branch = $1;
600  $json = $2;
601  }
602  my $hash = $json_formatter->decode($json);
603  push @output_job_ids, @{ $self->dataflow_output_id($hash, $branch) };
604  }
605  close($fh);
606  return \@output_job_ids;
607 }
608 
609 
610 sub throw {
611  my $msg = pop @_;
612 
613  Bio::EnsEMBL::Hive::Utils::throw( $msg ); # this module doesn't import 'throw' to avoid namespace clash
614 }
615 
616 
617 =head2 complete_early
618 
619  Arg[1] : (string) message
620  Arg[2] : (integer, optional) branch number
621  Description : Ends the job with the given message, whilst marking the job as complete
622  Dataflows to the given branch right before if a branch number if given,
623  in which case the autoflow is disabled too.
624  Returntype : This function does not return
625 
626 =cut
627 
628 sub complete_early {
629  my ($self, $msg, $branch_code) = @_;
630 
631  if (defined $branch_code) {
632  $self->dataflow_output_id(undef, $branch_code);
633  $self->input_job->autoflow(0);
634  }
635  $self->input_job->incomplete(0);
636  $msg .= "\n" unless $msg =~ /\n$/;
637  die $msg;
638 }
639 
640 
641 =head2 debug
642 
643  Title : debug
644  Function: Gets/sets flag for debug level. Set through Worker/runWorker.pl
645  Subclasses should treat as a read_only variable.
646  Returns : integer
647 
648 =cut
649 
650 sub debug {
651  my $self = shift;
652 
653  return $self->worker->debug(@_);
654 }
655 
656 
657 =head2 worker_temp_directory
658 
659  Title : worker_temp_directory
660  Function: Returns a path to a directory on the local /tmp disk
661  which the subclass can use as temporary file space.
662  This directory is made the first time the function is called.
663  It persists for as long as the worker is alive. This allows
664  multiple jobs run by the worker to potentially share temp data.
665  For example the worker (which is a single Analysis) might need
666  to dump a datafile file which is needed by all jobs run through
667  this analysis. The process can first check the worker_temp_directory
668  for the file and dump it if it is missing. This way the first job
669  run by the worker will do the dump, but subsequent jobs can reuse the
670  file.
671  Usage : $tmp_dir = $self->worker_temp_directory;
672  Returns : <string> path to a local (/tmp) directory
673 
674 =cut
675 
676 sub worker_temp_directory {
677  my $self = shift @_;
678 
679  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
680  $self->{'_tmp_dir'} = $self->worker_temp_directory_name();
681  mkdir($self->{'_tmp_dir'}, 0777);
682  throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
683  }
684  return $self->{'_tmp_dir'};
685 }
686 
687 sub worker_temp_directory_name {
688  my $self = shift @_;
689 
690  return $self->worker->temp_directory_name;
691 }
692 
693 
694 =head2 cleanup_worker_temp_directory
695 
696  Title : cleanup_worker_temp_directory
697  Function: Cleans up the directory on the local /tmp disk that is used for the
698  worker. It can be used to remove files left there by previous jobs.
699  Usage : $self->cleanup_worker_temp_directory;
700 
701 =cut
702 
703 sub cleanup_worker_temp_directory {
704  my $self = shift @_;
705 
706  my $tmp_dir = $self->worker_temp_directory_name();
707  if(-e $tmp_dir) {
708  remove_tree($tmp_dir, {error => undef});
709  }
710 }
711 
712 
713 1;
714 
public Bio::EnsEMBL::Hive::Worker worker()
public Bio::EnsEMBL::Hive::DBSQL::DBAdaptor db()
public main()