10 run jobs (runnables) written in a different language
14 Upon initialisation, GuestProcess forks, and the child process executes the wrapper that
15 will allow running Runnables of the other language. The communication is ensured by two
16 pipes and is schematically similar to running
"| wrapper |", except that GuestProcess
17 uses non-standard file descriptors, thus allowing the Runnable to still use std{in,out,err}.
19 The wrapper receives the two file-numbers that it is meant to use (one
for reading data
20 from GuestProcess, and one to send data to GuestProcess). All the messages are passed
21 around in single-line JSON structures. The protocol is described below
using the convention:
22 ---> represents a message sent to the child process,
23 <--- represents a message sent by the child process
25 The initialisation (in the constructor) consists in checking that both sides spek the same
26 version of the protocol:
27 <--- {
"version":
"XXX" }
29 GuestProcess will bail out
if the response is not
"OK"
31 Then, the child process (i.e. the runnable) will send its
default parameters to GuestProcess.
32 This fills the usual param_defaults() section of the Runnable:
33 <--- { ... param_defaults ... }
36 The child process then goes to sleep, waiting
for jobs to be seeded. Meanwhile,
37 GuestProcess enters a number of life_cycle() executions (as triggered by Worker).
38 Each one first sends a JSON
object to the child process to initialize the job parameters
41 "parameters": { ... the unsubstituted job parameters as compiled by Worker ... },
47 "execute_writes": [1|0],
52 From
this point, GuestProcess acts as a server, listening to events sent by the child.
53 Events are JSON objects composed of an
"event" field (the name of the event) and a
54 "content" field (the payload). Events can be of the following kinds (with the expected
55 response from GuestProcess):
57 <--- JOB_STATUS_UPDATE
65 "is_error": [
true|
false],
72 "branch_name_or_code": XXX,
73 "output_ids": an array or a hash,
75 "substituted": { ... the parameters that are currently substituted ... }
76 "unsubstituted": { ... the parameters that have not yet been substituted ... }
79 ---> dbIDs of the jobs that have been created
81 <--- WORKER_TEMP_DIRECTORY
83 ---> returns the temporary directory of the worker
88 "complete": [
true|
false],
90 "autoflow": [
true|
false],
91 "lethal_for_worker": [
true|
false],
92 "transient_error": [
true|
false],
95 "substituted": { ... the parameters that are currently substituted ... }
96 "unsubstituted": { ... the parameters that have not yet been substituted ... }
104 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
105 Copyright [2016-2024] EMBL-European Bioinformatics Institute
107 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
108 You may obtain a copy of the License at
112 Unless required by applicable law or agreed to in writing, software distributed under the License
113 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
114 See the License
for the specific language governing permissions and limitations under the License.
118 Please subscribe to the
Hive mailing list: http:
123 package Bio::EnsEMBL::Hive::GuestProcess;
133 use base (
'Bio::EnsEMBL::Hive::Process');
136 # -------------------------------------- <versioning of the GuestProcess interface> -------------------------------------------------------
138 our $GUESTPROCESS_PROTOCOL_VERSION =
'5'; # Make sure you change
this number whenever an incompatible change is introduced
141 =head2 get_protocol_version
144 Description : Returns the version number of the communication protocol
149 sub get_protocol_version {
150 return $GUESTPROCESS_PROTOCOL_VERSION;
153 sub check_version_compatibility {
154 my ($self, $other_version) = @_;
156 my $gpv = $self->get_protocol_version();
157 # warn "$self : GPV='$gpv', MV='$other_version'\n";
159 return ((defined $other_version) and ($other_version=~/^$gpv\./)) ? 1 : 0;
162 # -------------------------------------- </versioning of the GuestProcess interface> ------------------------------------------------------
167 Arg[1] : $language: the programming language the external runnable is in
168 Arg[2] : $module: the name of the runnable (usually a package name)
170 Description : Constructor
172 Exceptions :
if $language or $module is not defined properly or
if the pipes /
173 child process could not be created
179 my ($class, $debug, $language, $module) = @_;
181 die
"GuestProcess must be told which language to interface with" unless $language;
183 my $wrapper = _get_wrapper_for_language($language);
184 die
"GuestProcess must be told which module to run" unless $module;
186 my ($PARENT_RDR, $PARENT_WTR, $CHILD_WTR,$CHILD_RDR);
187 pipe($PARENT_RDR, $CHILD_WTR) or die
'Could not create a pipe to send data to the child !';
188 pipe($CHILD_RDR, $PARENT_WTR) or die
'Could not create a pipe to get data from the child !';;
190 my $protocol_debug = ($debug && ($debug > 1)); # Only advanced levels of
debug will show the
GuestProcess protocol messages
191 if ($protocol_debug) {
192 print
"PARENT_RDR is ", fileno($PARENT_RDR),
"\n";
193 print
"PARENT_WTR is ", fileno($PARENT_WTR),
"\n";
194 print
"CHILD_RDR is ", fileno($CHILD_RDR),
"\n";
195 print
"CHILD_WTR is ", fileno($CHILD_WTR),
"\n";
204 print
"parent is PID $$\n" if $protocol_debug;
206 die
"cannot fork: $!" unless defined $pid;
210 print
"child is PID $$\n" if $protocol_debug;
212 # Do not close the non-standard file descriptors on exec(): the child process will need them !
214 my $flags = fcntl($PARENT_RDR, F_GETFD, 0);
215 fcntl($PARENT_RDR, F_SETFD, $flags & ~FD_CLOEXEC);
216 $flags = fcntl($PARENT_WTR, F_GETFD, 0);
217 fcntl($PARENT_WTR, F_SETFD, $flags & ~FD_CLOEXEC);
219 exec($wrapper,
'run', $module, fileno($PARENT_RDR), fileno($PARENT_WTR), $debug
223 $CHILD_WTR->autoflush(1);
225 my $self = bless {}, $class;
227 $self->child_out($CHILD_RDR);
228 $self->child_in($CHILD_WTR);
229 $self->child_pid($pid);
230 $self->json_formatter( JSON->new()->indent(0) );
231 $self->{
'_protocol_debug'} = $protocol_debug; # controls the
GuestProcess protocol, not the worker
234 my $other_version = $self->read_message()->{content};
235 if (!$self->check_version_compatibility($other_version)) {
236 $self->send_response(
'NO');
237 die
"eHive's protocol version is '".$self->get_protocol_version.
"' but the wrapper's is '$other_version'\n";
239 $self->send_response(
'OK');
242 $self->print_debug(
"BEFORE READ PARAM_DEFAULTS");
243 $self->param_defaults( $self->read_message()->{content} );
244 $self->send_response(
'OK');
246 $self->print_debug(
"INIT DONE");
252 =head2 _get_wrapper_for_language
255 Description : Finds the wrapper that understands the given language
257 Exceptions : Can die
if the wrapper doesn
't exist
261 sub _get_wrapper_for_language {
264 my $wrapper = $ENV{'EHIVE_WRAPPER_
'.(uc $language)} # User-overriden wrapper
265 || sprintf('%s/wrappers/%s/wrapper
', $ENV{'EHIVE_ROOT_DIR
'}, $language); # Embedded wrapper
266 if (not -e $wrapper) {
267 die "The path '$wrapper
' doesn't exist !\n
";
268 } elsif (not -s $wrapper) {
269 die "The wrapper
'$wrapper' is an empty file !\n
";
270 } elsif (not -x $wrapper) {
271 die "No permissions to execute the wrapper
'$wrapper'\n
";
277 =head2 _get_all_registered_wrappers
279 Example : my $all_languages = Bio::EnsEMBL::Hive::GuestProcess::_get_all_registered_wrappers()
280 Description : Lists all the languages and wrappers that are registered (either
281 under via a EHIVE_WRAPPER environment variable, or via a "wrapper
"
282 file under $EHIVE_ROOT_DIR/wrappers/).
283 Note that those wrappers are not necessarily usable, as 1) _get_wrapper_for_language
284 performs additional checks and 2) the version numbers have to match
285 Returntype : Hashref { String => String }
290 sub _get_all_registered_wrappers {
292 foreach my $variable (keys %ENV) {
293 if ($variable =~ /^EHIVE_WRAPPER_(.*)$/) {
294 $all_found{lc $1} = $ENV{$variable};
297 foreach my $wrapper (glob $ENV{'EHIVE_ROOT_DIR'}.'/wrappers/*/wrapper' ) {
298 $wrapper =~ /\/wrappers\/(.*)\/wrapper$/;
299 $all_found{$1} = $wrapper;
305 =head2 get_wrapper_version
307 Example : Bio::EnsEMBL::Hive::GuestProcess::get_wrapper_version();
308 Description : Ask the wrapper what version it is on. The major number is expected to match $GUESTPROCESS_PROTOCOL_VERSION
310 Exceptions : Die if there is no wrapper
314 sub get_wrapper_version {
315 my $language = shift;
316 my $wrapper = _get_wrapper_for_language($language);
317 my $version = `$wrapper version 2> /dev/null`;
323 =head2 assert_runnable_exists
325 Example : Bio::EnsEMBL::Hive::GuestProcess::assert_runnable_exists('python3', 'eHive.examples.TestRunnable');
326 Description : Ask the wrapper to check whether the runnable exists (can be loaded)
328 Exceptions : Die if there is no wrapper or the runnable can't be loaded
332 sub assert_runnable_exists {
333 my ($language, $runnable_module_name) = @_;
334 my $wrapper = _get_wrapper_for_language($language);
335 if (system($wrapper, 'check_exists', $runnable_module_name)) {
336 die "The runnable module
'$runnable_module_name' cannot be loaded or compiled\n
";
341 =head2 build_wrapper_for_language
343 Example : Bio::EnsEMBL::Hive::GuestProcess::build_wrapper_for_language('java');
344 Description : Ask the wrapper to build all the necessary code to run Runnables of this language
346 Exceptions : Die if there is no wrapper or the build fails
350 sub build_wrapper_for_language {
351 my $language = shift;
352 my $wrapper = _get_wrapper_for_language($language);
353 if (system($wrapper, 'build')) {
354 die "The $language wrapper cannot be built\n
";
361 Description : Destructor: tells the child to exit by sending an empty JSON object
368 $self->print_debug("DESTROY
");
369 $self->child_in->print("{}\n
");
370 #kill('KILL', $self->child_pid);
376 Example : $process->print_debug("debug message
");
377 Description : Prints a message if $self->{'_protocol_debug'} is set
383 my ($self, $msg) = @_;
384 print sprintf("PERL %d: %s\n
", $self->child_pid, $msg) if $self->{'_protocol_debug'};
394 Example : my $child_in = $process->child_in();
395 Example : $process->child_in(*CHILD_WTR);
396 Description : Getter/Setter for the file handle that allows talking to the
398 Returntype : IO::Handle
405 $self->{'_child_in'} = shift if @_;
406 return $self->{'_child_in'};
411 Example : my $child_out = $process->child_out();
412 Example : $process->child_out(*CHILD_RDR);
413 Description : Getter/Setter for the file handle that allows receiving data
414 from the child process.
415 Returntype : IO::Handle
422 $self->{'_child_out'} = shift if @_;
423 return $self->{'_child_out'};
428 Example : my $child_pid = $process->child_pid();
429 Example : $process->child_pid($child_pid);
430 Description : Getter/Setter for the process ID of the child
438 $self->{'_child_pid'} = shift if @_;
439 return $self->{'_child_pid'};
443 =head2 json_formatter
445 Example : my $json_formatter = $object_name->json_formatter();
446 Example : $object_name->json_formatter($json_formatter);
447 Description : Getter/Setter for the JSON formatter.
448 Returntype : instance of JSON
455 $self->{'_json_formatter'} = shift if @_;
456 return $self->{'_json_formatter'};
460 ################################
461 # Communication with the child #
462 ################################
466 Example : $process->send_message($perl_structure);
467 Description : Send the Perl structure to the child process via the pipe (and
470 Exceptions : raised by JSON / IO::Handle
475 my ($self, $struct) = @_;
476 my $j = $self->json_formatter->encode($struct);
477 $self->print_debug("send_message $j
");
478 $self->child_in->print($j."\n
");
484 Example : $process->send_response('OK');
485 Description : Wrapper around send_message to send a response to the child.
487 Exceptions : raised by JSON / IO::Handle
492 my ($self, $response) = @_;
493 return $self->send_message({'response' => $response});
499 Example : my $msg = $process->read_message();
500 Description : Wait for and read the next message coming from the child.
501 Again, the message itself is serialized and transmitted
503 Returntype : Perl structure
504 Exceptions : raised by JSON / IO::Handle
510 my $s = $self->child_out->getline();
511 die "Did not receive any messages
" unless defined $s;
513 $self->print_debug("read_message: $s
");
514 return $self->json_formatter->decode($s);
520 Example : $process->wait_for_OK();
521 Description : Wait for the child process to send the OK signal
523 Exceptions : dies if the response is not OK, or anything raised by L<read_message()>
529 my $s = $self->read_message();
530 die "Response message does not look like a response
" if not exists $s->{'response'};
531 die "Received response is not OK
" if ref($s->{'response'}) or $s->{'response'} ne 'OK';
535 ###########################
536 # Hive::Process interface #
537 ###########################
540 =head2 param_defaults
542 Example : my $param_defaults = $runnable->param_defaults();
543 Example : $runnable->param_defaults($param_defaults);
544 Description : Getter/Setter for the default parameters of this runnable.
545 Hive only uses it as a getter, but here, we need a setter to
546 define the parameters at the Perl layer once they've been
547 retrieved from the child process.
555 $self->{'_param_defaults'} = shift if @_;
556 return $self->{'_param_defaults'};
562 Example : my $partial_timings = $runnable->life_cycle();
563 Description : Runs the life-cycle of the input job and returns the timings
564 of each Runnable method (fetch_input, run, etc).
565 See the description of this module for details about the protocol
574 $self->print_debug("LIFE_CYCLE
");
576 my $job = $self->input_job();
577 my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
578 my %job_partial_timing = ();
582 parameters => $job->{_unsubstituted_param_hash},
583 input_id => $job->input_id,
584 dbID => defined $job->dbID ? $job->dbID + 0 : 0,
585 retry_count => $job->retry_count + 0,
587 execute_writes => $self->execute_writes || 0,
588 debug => $self->debug || 0,
590 $self->print_debug("SEND JOB PARAM
");
591 $self->send_message(\%struct);
592 $self->wait_for_OK();
594 # A simple event loop
596 $self->print_debug("WAITING IN LOOP
");
598 my $msg = $self->read_message;
599 my $event = $msg->{event};
600 my $content = $msg->{content};
601 $self->print_debug("processing
event '$event'");
603 if ($event eq 'JOB_STATUS_UPDATE') {
604 $job_partial_timing{$job->status} = $partial_stopwatch->get_elapsed() if ($job->status ne 'READY') and ($job->status ne 'CLAIMED');
605 $self->enter_status(uc $content);
606 $partial_stopwatch->restart();
607 $self->send_response('OK');
609 } elsif ($event eq 'WARNING') {
610 $self->warning($content->{message}, $content->{is_error}?'WORKER_ERROR':'INFO');
611 $self->send_response('OK');
613 } elsif ($event eq 'DATAFLOW') {
614 $job->{_param_hash} = $content->{params}->{substituted};
615 $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
616 my $d = $self->dataflow_output_id($content->{output_ids}, $content->{branch_name_or_code});
617 $self->send_response($d);
619 } elsif ($event eq 'WORKER_TEMP_DIRECTORY') {
620 my $wtd = $self->worker_temp_directory;
621 $self->send_response($wtd);
623 } elsif ($event eq 'JOB_END') {
624 # Especially here we need to be careful about boolean values
625 # They are coded as JSON::true and JSON::false which have
626 # different meanings in text / number contexts
627 $job->autoflow($job->autoflow and $content->{job}->{autoflow});
628 $job->lethal_for_worker($content->{job}->{lethal_for_worker}?1:0);
629 $job->transient_error($content->{job}->{transient_error}?1:0);
630 $job->{_param_hash} = $content->{params}->{substituted};
631 $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
633 # This piece of code is duplicated from Process
634 if ($content->{complete}) {
635 if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
636 $self->say_with_header( ': AUTOFLOW input->output' );
637 $job->dataflow_output_id();
640 my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
641 if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
642 $job->transient_error(0);
643 die "There are cached semaphored fans
for which a funnel job (dataflow_rule_id(s)
".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown
";
646 $job->died_somewhere(1);
648 $self->send_response('OK');
649 return \%job_partial_timing;
651 die "Unknown
event '$event' coming from the child
";
657 ### Summary of Process methods ###
659 ## Have to be redefined
663 ## Needed, can be reused from the base class
664 # worker_temp_directory
669 # enter_status -> worker / say_with_header
671 # cleanup_worker_temp_directory
673 ## Invalid in this context