9 Abstract superclass. Each Process makes up the individual building blocks
10 of the system. Instances of these processes are created in a hive workflow
11 graph of Analysis entries that are linked together with dataflow and
14 Instances of these Processes are created by the system as work is done.
15 The newly created Process will have preset $self->
db, $self->
dbc, $self->input_id
16 and several other variables.
17 From
this input and configuration data, each Process can then proceed to
18 do something. The flow of execution within a Process is:
19 pre_cleanup() if($retry_count>0);
# clean up databases/filesystem before subsequent attempts
20 fetch_input(); # fetch the data from databases/filesystems
21 run(); # perform the
main computation
22 write_output(); # record the results in databases/filesystems
23 post_healthcheck(); # check
if we got the expected result (optional)
24 post_cleanup(); # destroy all non-trivial data structures after the job is done
25 The developer can implement their own versions of
26 pre_cleanup, fetch_input,
run, write_output, and post_cleanup to
do what they need.
28 The entire system is based around the concept of a workflow graph which
29 can split and loop back on itself. This is accomplished by dataflow
30 rules (similar to Unix pipes) that connect one Process (or analysis) to others.
31 Where a Unix command line program can send output on STDOUT STDERR pipes,
32 a hive Process has access to unlimited pipes referenced by numerical
33 branch_codes. This is accomplished within the Process via
34 $self->dataflow_output_id(...);
36 The design philosophy is that each Process does its work and creates output,
37 but it doesn
't worry about where the input came from, or where its output
38 goes. If the system has dataflow pipes connected, then the output jobs
39 have purpose, if not - the output work is thrown away. The workflow graph
40 'controls
' the behaviour of the system, not the processes. The processes just
41 need to do their job. The design of the workflow graph is based on the knowledge
42 of what each Process does so that the graph can be correctly constructed.
43 The workflow graph can be constructed a priori or can be constructed and
44 modified by intelligent Processes as the system runs.
47 The Hive is based on AI concepts and modeled on the social structure and
48 behaviour of a honey bee hive. So where a worker honey bee's purpose is
49 (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive
50 worker
's purpose is (find a job, create a Process for that job, run it,
51 drop off output job(s), repeat). While most workflow systems are based
52 on 'smart
' central controllers and external control of 'dumb
' processes,
53 the Hive is based on 'dumb
' workflow graphs and job kiosk, and 'smart
' workers
54 (autonomous agents) who are self configuring and figure out for themselves what
55 needs to be done, and then do it. The workers are based around a set of
56 emergent behaviour rules which allow a predictible system behaviour to emerge
57 from what otherwise might appear at first glance to be a chaotic system. There
58 is an inherent asynchronous disconnect between one worker and the next.
59 Work (or jobs) are simply 'posted
' on a blackboard or kiosk within the hive
60 database where other workers can find them.
61 The emergent behaviour rules of a worker are:
62 1) If a job is posted, someone needs to do it.
63 2) Don't grab something that someone
else is working on
64 3) Don
't grab more than you can handle
65 4) If you grab a job, it needs to be finished correctly
66 5) Keep busy doing work
67 6) If you fail, do the best you can to report back
69 For further reading on the AI principles employed in this design see:
70 http://en.wikipedia.org/wiki/Autonomous_Agent
71 http://en.wikipedia.org/wiki/Emergence
75 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
76 Copyright [2016-2024] EMBL-European Bioinformatics Institute
78 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
79 You may obtain a copy of the License at
81 http://www.apache.org/licenses/LICENSE-2.0
83 Unless required by applicable law or agreed to in writing, software distributed under the License
84 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
85 See the License for the specific language governing permissions and limitations under the License.
89 Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
93 The rest of the documentation details each of the object methods.
94 Internal methods are usually preceded with a _
99 package Bio::EnsEMBL::Hive::Process;
104 use File::Path qw(make_path remove_tree);
106 use Scalar::Util qw(looks_like_number);
107 use Time::HiRes qw(time);
109 use Bio::EnsEMBL::Hive::Utils ('stringify
', 'go_figure_dbc
', 'join_command_args
', 'timeout
');
110 use Bio::EnsEMBL::Hive::Utils::Stopwatch;
114 my $class = shift @_;
116 my $self = bless {}, $class;
125 my $job = $self->input_job();
126 my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
127 my %job_partial_timing = ();
129 $job->incomplete(1); # reinforce, in case the life_cycle is not run by a Worker
133 # Catch all the "warn" calls
134 #$SIG{__WARN__} = sub { $self->warning(@_) };
136 if( $self->can('pre_cleanup
') and $job->retry_count()>0 ) {
137 $self->enter_status('PRE_CLEANUP
');
141 # PRE_HEALTHCHECK can come here
143 $self->enter_status('FETCH_INPUT
');
144 $partial_stopwatch->restart();
146 $job_partial_timing{'FETCH_INPUT
'} = $partial_stopwatch->pause->get_elapsed;
148 $self->enter_status('RUN
');
149 $partial_stopwatch->restart();
151 $job_partial_timing{'RUN
'} = $partial_stopwatch->pause->get_elapsed;
153 if($self->worker->execute_writes) {
154 $self->enter_status('WRITE_OUTPUT
');
155 $partial_stopwatch->restart();
157 $job_partial_timing{'WRITE_OUTPUT
'} = $partial_stopwatch->pause->get_elapsed;
159 if( $self->can('post_healthcheck
') ) {
160 $self->enter_status('POST_HEALTHCHECK
');
161 $self->post_healthcheck;
164 $self->say_with_header( ": *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW" );
167 # Restore the default handler
168 #$SIG{__WARN__} = 'DEFAULT
';
170 if(my $life_cycle_msg = $@) {
171 $job->died_somewhere( $job->incomplete ); # it will be OR'd inside
175 if( $self->can(
'post_cleanup') ) { # may be
run to clean up memory even after partially failed attempts
177 $job->incomplete(1); # it could have been reset by a previous call to complete_early
178 $self->enter_status(
'POST_CLEANUP');
181 if(my $post_cleanup_msg = $@) {
182 $job->died_somewhere( $job->incomplete ); # it will be OR
'd inside
183 Bio::EnsEMBL::Hive::Process::warning($self, $post_cleanup_msg, $job->incomplete?'WORKER_ERROR
':'INFO
'); # In case the Runnable has redefined warning()
187 unless( $job->died_somewhere ) {
189 if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
190 $self->say_with_header(
': AUTOFLOW input->output' );
191 $job->dataflow_output_id();
194 my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
195 if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
196 $job->transient_error(0);
197 die
"The group of semaphored jobs is incomplete ! Some fan jobs (coming from dataflow_rule_id(s) ".join(
',',@zombie_funnel_dataflow_rule_ids).
") are missing a job on their funnel. Check the order of your dataflow_output_id() calls.";
202 return \%job_partial_timing;
207 sub say_with_header {
208 my ($self, $msg, $important) = @_;
213 if(my $worker = $self->worker) {
214 $worker->worker_say( $msg );
216 print
"StandaloneJob $msg\n";
223 my ($self, $status) = @_;
225 my $job = $self->input_job();
227 $job->set_and_update_status( $status );
229 if(my $worker = $self->worker) {
230 $worker->set_and_update_status(
'JOB_LIFECYCLE' ); # to ensure when_checked_in TIMESTAMP is updated
233 $self->say_with_header(
'-> '.$status );
238 my ($self, $msg, $message_class) = @_;
240 $message_class =
'WORKER_ERROR' if $message_class && looks_like_number($message_class);
241 $message_class ||=
'INFO';
244 $self->say_with_header(
"$message_class : $msg", 1 );
246 my $job = $self->input_job;
247 my $worker = $self->worker;
249 if(my $job_adaptor = ($job && $job->adaptor)) {
250 $job_adaptor->db->get_LogMessageAdaptor()->store_job_message($job->dbID, $msg, $message_class);
251 } elsif(my $worker_adaptor = ($worker && $worker->adaptor)) {
252 $worker_adaptor->db->get_LogMessageAdaptor()->store_worker_message($worker, $msg, $message_class);
257 ##########################################
259 # methods subclasses should override
260 # in order to give this process function
262 ##########################################
265 =head2 param_defaults
267 Title : param_defaults
268 Function: sublcass can define defaults
for all params used by the RunnableDB/Process
278 ## Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
291 Function: sublcass can implement functions related to data fetching.
292 Typical acivities would be to parse $self->input_id .
293 Subclasses may also want to fetch data from databases
294 or from files within
this function.
308 Function: sublcass can implement functions related to process execution.
309 Typical activities include running external programs or running
310 algorithms by calling perl methods. Process may also choose to
311 parse results into memory
if an external program was used.
325 Function: sublcass can implement functions related to storing results.
326 Typical activities including writing results into database tables
327 or into files on a shared filesystem.
339 ## Function: sublcass can implement functions related to cleaning up after running one job
340 # (destroying non-trivial data structures in memory).
350 ######################################################
352 # methods that subclasses can use to get access
353 # to hive infrastructure
355 ######################################################
361 Usage : my $worker = $self->worker;
362 Function: returns the Worker
object this Process is
run by
370 $self->{
'_worker'} = shift
if(@_);
371 return $self->{
'_worker'};
385 Usage : my $hiveDBA = $self->db;
386 Function: returns DBAdaptor to Hive database
394 return $self->worker->adaptor && $self->worker->adaptor->db(@_);
401 Usage : my $hiveDBConnection = $self->
dbc;
402 Function: returns DBConnection to Hive database
410 return $self->db && $self->db->dbc;
417 Usage : my $data_dbc = $self->data_dbc;
426 my $given_db_conn = shift @_ || ($self->param_is_defined(
'db_conn') ? $self->param(
'db_conn') : $self);
427 my $given_ref = ref( $given_db_conn );
428 my $given_signature = ($given_ref eq
'ARRAY' or $given_ref eq
'HASH') ? stringify ( $given_db_conn ) :
"$given_db_conn";
430 if (!$self->param_is_defined(
'db_conn') and !$self->db and !$self->dbc) {
431 # go_figure_dbc won't be able to create a DBConnection, so let's
432 # just print a nicer error message
433 $self->input_job->transient_error(0);
434 throw(
'In standaloneJob mode, $self->data_dbc requires the -db_conn parameter to be defined on the command-line');
437 if( !$self->{
'_cached_db_signature'} or ($self->{
'_cached_db_signature'} ne $given_signature) ) {
438 $self->{
'_cached_db_signature'} = $given_signature;
439 $self->{
'_cached_data_dbc'} = go_figure_dbc( $given_db_conn );
442 return $self->{
'_cached_data_dbc'};
446 =head2 run_system_command
448 Title : run_system_command
449 Arg[1] : (
string or arrayref) Command to be
run
450 Arg[2] : (hashref, optional) Options, amongst:
451 - use_bash_pipefail: when enabled, a command with pipes will require all sides to succeed
452 - use_bash_errexit: when enabled, will stop at the first failure (otherwise commands such as
"do_something_that_fails; do_something_that_succeeds" would
return 0)
453 - timeout: the maximum number of seconds the command can
run for. Will
return the exit code -2
if the command has to be aborted
454 Usage : my $return_code = $self->run_system_command(
'script.sh with many_arguments'); # Command as a single
string
455 my $return_code = $self->run_system_command([
'script.sh',
'arg1',
'arg2']); # Command as an array-ref
456 my ($return_code, $stderr, $string_command) = $self->run_system_command([
'script.sh',
'arg1',
'arg2']); # Same in list-context. $string_command will be
"script.sh arg1 arg2"
457 my $return_code = $self->run_system_command(
'script1.sh with many_arguments | script2.sh', {
'use_bash_pipefail' => 1}); # Command with pipes evaluated in a bash
"pipefail" environment
458 Function: Runs a command given as a single-
string or an array-ref. The second argument is a list of options
459 Returns : Returns the
return-code in scalar context, or a triplet (
return-code, standard-error, command) in list context
463 sub run_system_command {
464 my ($self, $cmd, $options) = @_;
466 require Capture::Tiny;
469 my ($join_needed, $flat_cmd) = join_command_args($cmd);
472 my $need_bash = $options->{
'use_bash_pipefail'} || $options->{
'use_bash_errexit'};
474 @cmd_to_run = (
'bash',
475 $options->{
'use_bash_pipefail'} ? (
'-o' =>
'pipefail') : (),
476 $options->{
'use_bash_errexit'} ? (
'-o' =>
'errexit') : (),
479 # Let's use the array if possible, it saves us from running a shell
480 @cmd_to_run = $join_needed ? $flat_cmd : (ref($cmd) ? @$cmd : $cmd)
483 $self->say_with_header(
"Command given: " . stringify($cmd));
484 $self->say_with_header(
"Command to run: " . stringify(\@cmd_to_run));
486 $self->dbc and $self->dbc->disconnect_if_idle(); # release
this connection
for the duration of system() call
490 # Capture:Tiny has weird behavior if 'require'd instead of 'use'd
491 # see, for example,http://www.perlmonks.org/?node_id=870439
492 my $starttime = time() * 1000;
493 my ($stdout, $stderr) = Capture::Tiny::tee(sub {
494 $return_value = timeout( sub {system(@cmd_to_run)}, $options->{
'timeout'} );
496 # FIXME: on LSF we could perhaps wait a little bit for the MEM/RUNLIMIT
497 # to really kick in, so that we don't return the wrong diagnostic
498 die sprintf(
"Could not run '%s', got %s\nSTDERR %s\n", $flat_cmd, $return_value, $stderr)
if $return_value && $options->{die_on_failure};
500 return ($return_value, $stderr, $flat_cmd, $stdout, time()*1000-$starttime)
if wantarray;
501 return $return_value;
508 Function: Returns the AnalysisJob to be
run by
this process
509 Subclasses should treat
this as a read_only
object.
518 if(my $job = $self->{
'_input_job'} = shift) {
519 throw(
"Not a Bio::EnsEMBL::Hive::AnalysisJob object") unless ($job->isa(
"Bio::EnsEMBL::Hive::AnalysisJob"));
522 return $self->{
'_input_job'};
526 # ##################### subroutines that link through to Job's methods #########################
531 return $self->input_job->input_id(@_);
537 return $self->input_job->param(@_);
543 my $prev_transient_error = $self->input_job->transient_error(); # make a note of previously set transience status
544 $self->input_job->transient_error(0); # make sure
if we die in param_required it is not
transient
546 my $value = $self->input_job->param_required(@_);
548 $self->input_job->transient_error($prev_transient_error); # restore the previous transience status
555 return $self->input_job->param_exists(@_);
558 sub param_is_defined {
561 return $self->input_job->param_is_defined(@_);
564 sub param_substitute {
567 return $self->input_job->param_substitute(@_);
570 sub dataflow_output_id {
573 # Let's not spend time stringifying a large object if it's not going to be printed anyway
574 $self->say_with_header(
'Dataflow on branch #' . ($_[1]
575 return $self->input_job->dataflow_output_id(@_);
579 =head2 dataflow_output_ids_from_json
581 Title : dataflow_output_ids_from_json
583 Arg[2] : (optional) Branch number, defaults to 1 (see L<AnalysisJob::dataflow_output_id>)
584 Function: Wrapper around L<dataflow_output_id> that takes the output_ids from a JSON file.
585 Each line in the JSON file is expected to be a complete JSON structure, which
586 may be prefixed with a branch number
590 sub dataflow_output_ids_from_json {
591 my ($self, $filename, $default_branch) = @_;
593 my $json_formatter = JSON->new()->indent(0);
595 open(my $fh,
'<', $filename) or die
"Could not open '$filename' because: $!";
596 while (my $l = $fh->getline()) {
598 my $branch = $default_branch;
600 if ($l =~ /^(-?\d+)\s+(.*)$/) {
604 my $hash = $json_formatter->decode($json);
605 push @output_job_ids, @{ $self->dataflow_output_id($hash, $branch) };
608 return \@output_job_ids;
615 Bio::EnsEMBL::Hive::Utils::throw( $msg ); #
this module doesn
't import 'throw' to avoid namespace clash
619 =head2 complete_early
621 Arg[1] : (string) message
622 Arg[2] : (integer, optional) branch number
623 Description : Ends the job with the given message, whilst marking the job as complete
624 Dataflows to the given branch right before if a branch number if given,
625 in which case the autoflow is disabled too.
626 Returntype : This function does not return
631 my ($self, $msg, $branch_code) = @_;
633 if (defined $branch_code) {
634 $self->dataflow_output_id(undef, $branch_code);
635 $self->input_job->autoflow(0);
637 $self->input_job->incomplete(0);
638 $msg .= "\n" unless $msg =~ /\n$/;
646 Function: Gets/sets flag for debug level. Set through Worker/runWorker.pl
647 Subclasses should treat as a read_only variable.
655 return $self->worker->debug(@_);
659 =head2 worker_temp_directory
661 Title : worker_temp_directory
662 Function: Returns a path to a directory on the local /tmp disk
663 which the subclass can use as temporary file space.
664 This directory is made the first time the function is called.
665 It persists for as long as the worker is alive. This allows
666 multiple jobs run by the worker to potentially share temp data.
667 For example the worker (which is a single Analysis) might need
668 to dump a datafile file which is needed by all jobs run through
669 this analysis. The process can first check the worker_temp_directory
670 for the file and dump it if it is missing. This way the first job
671 run by the worker will do the dump, but subsequent jobs can reuse the
673 Usage : $tmp_dir = $self->worker_temp_directory;
674 Returns : <string> path to a local (/tmp) directory
678 sub worker_temp_directory {
681 unless(defined($self->{'_tmp_dir
'}) and (-e $self->{'_tmp_dir
'})) {
682 $self->{'_tmp_dir
'} = $self->worker->temp_directory_name();
683 make_path( $self->{'_tmp_dir
'}, { mode => 0777 } );
684 throw("unable to create a writable directory ".$self->{'_tmp_dir
'}) unless(-w $self->{'_tmp_dir
'});
686 return $self->{'_tmp_dir
'};
690 =head2 cleanup_worker_temp_directory
692 Title : cleanup_worker_temp_directory
693 Function: Cleans up the directory on the local /tmp disk that is used for the
694 worker. It can be used to remove files left there by previous jobs.
695 Usage : $self->cleanup_worker_temp_directory;
699 sub cleanup_worker_temp_directory {
702 if(defined($self->{'_tmp_dir
'}) and (-e $self->{'_tmp_dir
'})) {
703 remove_tree($self->{'_tmp_dir
'}, {error => undef});