9 Abstract superclass. Each Process makes up the individual building blocks
10 of the system. Instances of these processes are created in a hive workflow
11 graph of Analysis entries that are linked together with dataflow and
14 Instances of these Processes are created by the system as work is done.
15 The newly created Process will have preset $self->
db, $self->
dbc, $self->input_id
16 and several other variables.
17 From
this input and configuration data, each Process can then proceed to
18 do something. The flow of execution within a Process is:
19 pre_cleanup() if($retry_count>0);
# clean up databases/filesystem before subsequent attempts 20 fetch_input(); # fetch the data from databases/filesystems
21 run(); # perform the
main computation
22 write_output(); # record the results in databases/filesystems
23 post_healthcheck(); # check
if we got the expected result (optional)
24 post_cleanup(); # destroy all non-trivial data structures after the job is done
25 The developer can implement their own versions of
26 pre_cleanup, fetch_input, run, write_output, and post_cleanup to
do what they need.
28 The entire system is based around the concept of a workflow graph which
29 can split and loop back on itself. This is accomplished by dataflow
30 rules (similar to Unix pipes) that connect one Process (or analysis) to others.
31 Where a Unix command line program can send output on STDOUT STDERR pipes,
32 a hive Process has access to unlimited pipes referenced by numerical
33 branch_codes. This is accomplished within the Process via
34 $self->dataflow_output_id(...);
36 The design philosophy is that each Process does its work and creates output,
37 but it doesn
't worry about where the input came from, or where its output 38 goes. If the system has dataflow pipes connected, then the output jobs 39 have purpose, if not - the output work is thrown away. The workflow graph 40 'controls
' the behaviour of the system, not the processes. The processes just 41 need to do their job. The design of the workflow graph is based on the knowledge 42 of what each Process does so that the graph can be correctly constructed. 43 The workflow graph can be constructed a priori or can be constructed and 44 modified by intelligent Processes as the system runs. 47 The Hive is based on AI concepts and modeled on the social structure and 48 behaviour of a honey bee hive. So where a worker honey bee's purpose is
49 (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive
50 worker
's purpose is (find a job, create a Process for that job, run it, 51 drop off output job(s), repeat). While most workflow systems are based 52 on 'smart
' central controllers and external control of 'dumb
' processes, 53 the Hive is based on 'dumb
' workflow graphs and job kiosk, and 'smart
' workers 54 (autonomous agents) who are self configuring and figure out for themselves what 55 needs to be done, and then do it. The workers are based around a set of 56 emergent behaviour rules which allow a predictible system behaviour to emerge 57 from what otherwise might appear at first glance to be a chaotic system. There 58 is an inherent asynchronous disconnect between one worker and the next. 59 Work (or jobs) are simply 'posted
' on a blackboard or kiosk within the hive 60 database where other workers can find them. 61 The emergent behaviour rules of a worker are: 62 1) If a job is posted, someone needs to do it. 63 2) Don't grab something that someone
else is working on
64 3) Don
't grab more than you can handle 65 4) If you grab a job, it needs to be finished correctly 66 5) Keep busy doing work 67 6) If you fail, do the best you can to report back 69 For further reading on the AI principles employed in this design see: 70 http://en.wikipedia.org/wiki/Autonomous_Agent 71 http://en.wikipedia.org/wiki/Emergence 75 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute 76 Copyright [2016-2022] EMBL-European Bioinformatics Institute 78 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 79 You may obtain a copy of the License at 81 http://www.apache.org/licenses/LICENSE-2.0 83 Unless required by applicable law or agreed to in writing, software distributed under the License 84 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 85 See the License for the specific language governing permissions and limitations under the License. 89 Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates 93 The rest of the documentation details each of the object methods. 94 Internal methods are usually preceded with a _ 99 package Bio::EnsEMBL::Hive::Process; 104 use File::Path qw(remove_tree); 106 use Scalar::Util qw(looks_like_number); 107 use Time::HiRes qw(time); 109 use Bio::EnsEMBL::Hive::Utils ('stringify
', 'go_figure_dbc
', 'join_command_args
', 'timeout
'); 110 use Bio::EnsEMBL::Hive::Utils::Stopwatch; 114 my $class = shift @_; 116 my $self = bless {}, $class; 125 my $job = $self->input_job(); 126 my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new(); 127 my %job_partial_timing = (); 129 $job->incomplete(1); # reinforce, in case the life_cycle is not run by a Worker 133 # Catch all the "warn" calls 134 #$SIG{__WARN__} = sub { $self->warning(@_) }; 136 if( $self->can('pre_cleanup
') and $job->retry_count()>0 ) { 137 $self->enter_status('PRE_CLEANUP
'); 141 # PRE_HEALTHCHECK can come here 143 $self->enter_status('FETCH_INPUT
'); 144 $partial_stopwatch->restart(); 146 $job_partial_timing{'FETCH_INPUT
'} = $partial_stopwatch->pause->get_elapsed; 148 $self->enter_status('RUN
'); 149 $partial_stopwatch->restart(); 151 $job_partial_timing{'RUN
'} = $partial_stopwatch->pause->get_elapsed; 153 if($self->worker->execute_writes) { 154 $self->enter_status('WRITE_OUTPUT
'); 155 $partial_stopwatch->restart(); 157 $job_partial_timing{'WRITE_OUTPUT
'} = $partial_stopwatch->pause->get_elapsed; 159 if( $self->can('post_healthcheck
') ) { 160 $self->enter_status('POST_HEALTHCHECK
'); 161 $self->post_healthcheck; 164 $self->say_with_header( ": *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW" ); 167 # Restore the default handler 168 #$SIG{__WARN__} = 'DEFAULT
'; 170 if(my $life_cycle_msg = $@) { 171 $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
175 if( $self->can(
'post_cleanup') ) { # may be run to clean up memory even after partially failed attempts
177 $job->incomplete(1); # it could have been reset by a previous call to complete_early
178 $self->enter_status(
'POST_CLEANUP');
181 if(my $post_cleanup_msg = $@) {
182 $job->died_somewhere( $job->incomplete ); # it will be OR
'd inside 183 Bio::EnsEMBL::Hive::Process::warning($self, $post_cleanup_msg, $job->incomplete?'WORKER_ERROR
':'INFO
'); # In case the Runnable has redefined warning() 187 unless( $job->died_somewhere ) { 189 if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
190 $self->say_with_header(
': AUTOFLOW input->output' );
191 $job->dataflow_output_id();
194 my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
195 if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
196 $job->transient_error(0);
197 die
"The group of semaphored jobs is incomplete ! Some fan jobs (coming from dataflow_rule_id(s) ".join(
',',@zombie_funnel_dataflow_rule_ids).
") are missing a job on their funnel. Check the order of your dataflow_output_id() calls.";
202 return \%job_partial_timing;
207 sub say_with_header {
208 my ($self, $msg, $important) = @_;
213 if(my $worker = $self->worker) {
214 $worker->worker_say( $msg );
216 print
"StandaloneJob $msg\n";
223 my ($self, $status) = @_;
225 my $job = $self->input_job();
227 $job->set_and_update_status( $status );
229 if(my $worker = $self->worker) {
230 $worker->set_and_update_status(
'JOB_LIFECYCLE' ); # to ensure when_checked_in TIMESTAMP is updated
233 $self->say_with_header(
'-> '.$status );
238 my ($self, $msg, $message_class) = @_;
240 $message_class =
'WORKER_ERROR' if $message_class && looks_like_number($message_class);
241 $message_class ||=
'INFO';
244 $self->say_with_header(
"$message_class : $msg", 1 );
246 my $job = $self->input_job;
247 my $worker = $self->worker;
249 if(my $job_adaptor = ($job && $job->adaptor)) {
250 $job_adaptor->db->get_LogMessageAdaptor()->store_job_message($job->dbID, $msg, $message_class);
251 } elsif(my $worker_adaptor = ($worker && $worker->adaptor)) {
252 $worker_adaptor->db->get_LogMessageAdaptor()->store_worker_message($worker, $msg, $message_class);
257 ########################################## 259 # methods subclasses should override 260 # in order to give this process function 262 ########################################## 265 =head2 param_defaults
267 Title : param_defaults
268 Function: sublcass can define defaults
for all params used by the RunnableDB/
Process 278 ## Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run. 291 Function: sublcass can implement functions related to data fetching.
292 Typical acivities would be to parse $self->
input_id .
293 Subclasses may also want to fetch data from databases
294 or from files within
this function.
308 Function: sublcass can implement functions related to process execution.
309 Typical activities include running external programs or running
310 algorithms by calling perl methods.
Process may also choose to
311 parse results into memory
if an external program was used.
325 Function: sublcass can implement functions related to storing results.
326 Typical activities including writing results into database tables
327 or into files on a shared filesystem.
339 ## Function: sublcass can implement functions related to cleaning up after running one job 340 # (destroying non-trivial data structures in memory). 350 ###################################################### 352 # methods that subclasses can use to get access 353 # to hive infrastructure 355 ###################################################### 361 Usage : my $worker = $self->
worker;
370 $self->{
'_worker'} = shift
if(@_);
371 return $self->{
'_worker'};
385 Usage : my $hiveDBA = $self->db;
386 Function: returns DBAdaptor to
Hive database
394 return $self->worker->adaptor && $self->worker->adaptor->db(@_);
401 Usage : my $hiveDBConnection = $self->
dbc;
402 Function: returns DBConnection to
Hive database
410 return $self->db && $self->db->dbc;
417 Usage : my $data_dbc = $self->data_dbc;
426 my $given_db_conn = shift @_ || ($self->param_is_defined(
'db_conn') ? $self->param(
'db_conn') : $self);
427 my $given_ref = ref( $given_db_conn );
428 my $given_signature = ($given_ref eq
'ARRAY' or $given_ref eq
'HASH') ? stringify ( $given_db_conn ) :
"$given_db_conn";
430 if (!$self->param_is_defined(
'db_conn') and !$self->db and !$self->dbc) {
431 # go_figure_dbc won't be able to create a DBConnection, so let's 432 # just print a nicer error message 433 $self->input_job->transient_error(0);
434 throw(
'In standaloneJob mode, $self->data_dbc requires the -db_conn parameter to be defined on the command-line');
437 if( !$self->{
'_cached_db_signature'} or ($self->{
'_cached_db_signature'} ne $given_signature) ) {
438 $self->{
'_cached_db_signature'} = $given_signature;
439 $self->{
'_cached_data_dbc'} = go_figure_dbc( $given_db_conn );
442 return $self->{
'_cached_data_dbc'};
446 =head2 run_system_command
448 Title : run_system_command
449 Arg[1] : (
string or arrayref) Command to be run
450 Arg[2] : (hashref, optional) Options, amongst:
451 - use_bash_pipefail: when enabled, a command with pipes will require all sides to succeed
452 - use_bash_errexit: when enabled, will stop at the first failure (otherwise commands such as
"do_something_that_fails; do_something_that_succeeds" would
return 0)
453 - timeout: the maximum number of seconds the command can run
for. Will
return the exit code -2
if the command has to be aborted
454 Usage : my $return_code = $self->run_system_command(
'script.sh with many_arguments'); # Command as a single
string 455 my $return_code = $self->run_system_command([
'script.sh',
'arg1',
'arg2']); # Command as an array-ref
456 my ($return_code, $stderr, $string_command) = $self->run_system_command([
'script.sh',
'arg1',
'arg2']); # Same in list-context. $string_command will be
"script.sh arg1 arg2" 457 my $return_code = $self->run_system_command(
'script1.sh with many_arguments | script2.sh', {
'use_bash_pipefail' => 1}); # Command with pipes evaluated in a bash
"pipefail" environment
458 Function: Runs a command given as a single-
string or an array-ref. The second argument is a list of options
459 Returns : Returns the
return-code in scalar context, or a triplet (
return-code, standard-error, command) in list context
463 sub run_system_command {
464 my ($self, $cmd, $options) = @_;
466 require Capture::Tiny;
469 my ($join_needed, $flat_cmd) = join_command_args($cmd);
472 my $need_bash = $options->{
'use_bash_pipefail'} || $options->{
'use_bash_errexit'};
474 @cmd_to_run = (
'bash',
475 $options->{
'use_bash_pipefail'} ? (
'-o' =>
'pipefail') : (),
476 $options->{
'use_bash_errexit'} ? (
'-o' =>
'errexit') : (),
479 # Let's use the array if possible, it saves us from running a shell 480 @cmd_to_run = $join_needed ? $flat_cmd : (ref($cmd) ? @$cmd : $cmd)
483 $self->say_with_header(
"Command given: " . stringify($cmd));
484 $self->say_with_header(
"Command to run: " . stringify(\@cmd_to_run));
486 $self->dbc and $self->dbc->disconnect_if_idle(); # release
this connection
for the duration of system() call
490 # Capture:Tiny has weird behavior if 'require'd instead of 'use'd 491 # see, for example,http://www.perlmonks.org/?node_id=870439 492 my $starttime = time() * 1000;
493 my ($stdout, $stderr) = Capture::Tiny::tee(sub {
494 $return_value = timeout( sub {system(@cmd_to_run)}, $options->{
'timeout'} );
496 die sprintf(
"Could not run '%s', got %s\nSTDERR %s\n", $flat_cmd, $return_value, $stderr)
if $return_value && $options->{die_on_failure};
498 return ($return_value, $stderr, $flat_cmd, $stdout, time()*1000-$starttime)
if wantarray;
499 return $return_value;
506 Function: Returns the
AnalysisJob to be run by
this process
507 Subclasses should treat
this as a read_only
object.
516 if(my $job = $self->{
'_input_job'} = shift) {
517 throw(
"Not a Bio::EnsEMBL::Hive::AnalysisJob object") unless ($job->isa(
"Bio::EnsEMBL::Hive::AnalysisJob"));
520 return $self->{
'_input_job'};
524 # ##################### subroutines that link through to Job's methods ######################### 529 return $self->input_job->
input_id(@_);
535 return $self->input_job->param(@_);
541 my $prev_transient_error = $self->input_job->transient_error(); # make a note of previously
set transience status
542 $self->input_job->transient_error(0); # make sure
if we die in param_required it is not
transient 544 my $value = $self->input_job->param_required(@_);
546 $self->input_job->transient_error($prev_transient_error); # restore the previous transience status
553 return $self->input_job->param_exists(@_);
556 sub param_is_defined {
559 return $self->input_job->param_is_defined(@_);
562 sub param_substitute {
565 return $self->input_job->param_substitute(@_);
568 sub dataflow_output_id {
571 # Let's not spend time stringifying a large object if it's not going to be printed anyway 572 $self->say_with_header(
'Dataflow on branch #' . ($_[1]
573 return $self->input_job->dataflow_output_id(@_);
577 =head2 dataflow_output_ids_from_json
579 Title : dataflow_output_ids_from_json
581 Arg[2] : (optional) Branch number, defaults to 1 (see L<AnalysisJob::dataflow_output_id>)
582 Function: Wrapper around L<dataflow_output_id> that takes the output_ids from a JSON file.
583 Each line in the JSON file is expected to be a complete JSON structure, which
584 may be prefixed with a branch number
588 sub dataflow_output_ids_from_json {
589 my ($self, $filename, $default_branch) = @_;
591 my $json_formatter = JSON->new()->indent(0);
593 open(my $fh,
'<', $filename) or die
"Could not open '$filename' because: $!";
594 while (my $l = $fh->getline()) {
596 my $branch = $default_branch;
598 if ($l =~ /^(-?\d+)\s+(.*)$/) {
602 my $hash = $json_formatter->decode($json);
603 push @output_job_ids, @{ $self->dataflow_output_id($hash, $branch) };
606 return \@output_job_ids;
613 Bio::EnsEMBL::Hive::Utils::throw( $msg ); #
this module doesn
't import 'throw' to avoid namespace clash 617 =head2 complete_early 619 Arg[1] : (string) message 620 Arg[2] : (integer, optional) branch number 621 Description : Ends the job with the given message, whilst marking the job as complete 622 Dataflows to the given branch right before if a branch number if given, 623 in which case the autoflow is disabled too. 624 Returntype : This function does not return 629 my ($self, $msg, $branch_code) = @_; 631 if (defined $branch_code) { 632 $self->dataflow_output_id(undef, $branch_code); 633 $self->input_job->autoflow(0); 635 $self->input_job->incomplete(0); 636 $msg .= "\n" unless $msg =~ /\n$/; 644 Function: Gets/sets flag for debug level. Set through Worker/runWorker.pl 645 Subclasses should treat as a read_only variable. 653 return $self->worker->debug(@_); 657 =head2 worker_temp_directory 659 Title : worker_temp_directory 660 Function: Returns a path to a directory on the local /tmp disk 661 which the subclass can use as temporary file space. 662 This directory is made the first time the function is called. 663 It persists for as long as the worker is alive. This allows 664 multiple jobs run by the worker to potentially share temp data. 665 For example the worker (which is a single Analysis) might need 666 to dump a datafile file which is needed by all jobs run through 667 this analysis. The process can first check the worker_temp_directory 668 for the file and dump it if it is missing. This way the first job 669 run by the worker will do the dump, but subsequent jobs can reuse the 671 Usage : $tmp_dir = $self->worker_temp_directory; 672 Returns : <string> path to a local (/tmp) directory 676 sub worker_temp_directory { 679 unless(defined($self->{'_tmp_dir
'}) and (-e $self->{'_tmp_dir
'})) { 680 $self->{'_tmp_dir
'} = $self->worker_temp_directory_name(); 681 mkdir($self->{'_tmp_dir
'}, 0777); 682 throw("unable to create a writable directory ".$self->{'_tmp_dir
'}) unless(-w $self->{'_tmp_dir
'}); 684 return $self->{'_tmp_dir
'}; 687 sub worker_temp_directory_name { 690 return $self->worker->temp_directory_name; 694 =head2 cleanup_worker_temp_directory 696 Title : cleanup_worker_temp_directory 697 Function: Cleans up the directory on the local /tmp disk that is used for the 698 worker. It can be used to remove files left there by previous jobs. 699 Usage : $self->cleanup_worker_temp_directory; 703 sub cleanup_worker_temp_directory { 706 my $tmp_dir = $self->worker_temp_directory_name(); 708 remove_tree($tmp_dir, {error => undef});
public Bio::EnsEMBL::Hive::Worker worker()
public Bio::EnsEMBL::Hive::DBSQL::DBAdaptor db()