10 run jobs (runnables) written in a different language
14 Upon initialisation, GuestProcess forks, and the child process executes the wrapper that
15 will allow running Runnables of the other language. The communication is ensured by two
16 pipes and is schematically similar to running
"| wrapper |", except that GuestProcess
17 uses non-standard file descriptors, thus allowing the Runnable to still use std{in,out,err}.
19 The wrapper receives the two file-numbers that it is meant to use (one
for reading data
20 from GuestProcess, and one to send data to GuestProcess). All the messages are passed
21 around in single-line JSON structures. The protocol is described below
using the convention:
22 ---> represents a message sent to the child process,
23 <--- represents a message sent by the child process
25 The initialisation (in the constructor) consists in checking that both sides spek the same
26 version of the protocol:
27 <--- {
"version":
"XXX" }
29 GuestProcess will bail out
if the response is not
"OK" 31 Then, the child process (i.e. the runnable) will send its
default parameters to GuestProcess.
32 This fills the usual param_defaults() section of the Runnable:
33 <--- { ... param_defaults ... }
36 The child process then goes to sleep, waiting
for jobs to be seeded. Meanwhile,
37 GuestProcess enters a number of life_cycle() executions (as triggered by Worker).
38 Each one first sends a JSON
object to the child process to initialize the job parameters
41 "parameters": { ... the unsubstituted job parameters as compiled by Worker ... },
47 "execute_writes": [1|0],
52 From
this point, GuestProcess acts as a server, listening to events sent by the child.
53 Events are JSON objects composed of an
"event" field (the name of the event) and a
54 "content" field (the payload). Events can be of the following kinds (with the expected
55 response from GuestProcess):
57 <--- JOB_STATUS_UPDATE
65 "is_error": [
true|
false],
72 "branch_name_or_code": XXX,
73 "output_ids": an array or a hash,
75 "substituted": { ... the parameters that are currently substituted ... }
76 "unsubstituted": { ... the parameters that have not yet been substituted ... }
79 ---> dbIDs of the jobs that have been created
81 <--- WORKER_TEMP_DIRECTORY
83 ---> returns the temporary directory of the worker
88 "complete": [
true|
false],
90 "autoflow": [
true|
false],
91 "lethal_for_worker": [
true|
false],
92 "transient_error": [
true|
false],
95 "substituted": { ... the parameters that are currently substituted ... }
96 "unsubstituted": { ... the parameters that have not yet been substituted ... }
104 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
105 Copyright [2016-2022] EMBL-European Bioinformatics Institute
107 Licensed under the Apache License,
Version 2.0 (the
"License"); you may not use
this file except in compliance with the License.
108 You may obtain a copy of the License at
112 Unless required by applicable law or agreed to in writing, software distributed under the License
113 is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
114 See the License
for the specific language governing permissions and limitations under the License.
118 Please subscribe to the
Hive mailing list: http:
123 package Bio::EnsEMBL::Hive::GuestProcess;
133 use base (
'Bio::EnsEMBL::Hive::Process');
136 # -------------------------------------- <versioning of the GuestProcess interface> ------------------------------------------------------- 138 our $GUESTPROCESS_PROTOCOL_VERSION =
'3'; # Make sure you change
this number whenever an incompatible change is introduced
141 =head2 get_protocol_version
144 Description : Returns the version number of the communication protocol
149 sub get_protocol_version {
150 return $GUESTPROCESS_PROTOCOL_VERSION;
153 sub check_version_compatibility {
154 my ($self, $other_version) = @_;
156 my $gpv = $self->get_protocol_version();
157 # warn "$self : GPV='$gpv', MV='$other_version'\n"; 159 return ((defined $other_version) and ($other_version=~/^$gpv\./)) ? 1 : 0;
162 # -------------------------------------- </versioning of the GuestProcess interface> ------------------------------------------------------ 167 Arg[1] : $language: the programming language the external runnable is in
168 Arg[2] : $module: the name of the runnable (usually a package name)
170 Description : Constructor
172 Exceptions :
if $language or $module is not defined properly or
if the pipes /
173 child process could not be created
179 my ($class, $debug, $language, $module) = @_;
181 die
"GuestProcess must be told which language to interface with" unless $language;
183 my $wrapper = _get_wrapper_for_language($language);
184 die
"GuestProcess must be told which module to run" unless $module;
186 my ($PARENT_RDR, $PARENT_WTR, $CHILD_WTR,$CHILD_RDR);
187 pipe($PARENT_RDR, $CHILD_WTR) or die
'Could not create a pipe to send data to the child !';
188 pipe($CHILD_RDR, $PARENT_WTR) or die
'Could not create a pipe to get data from the child !';;
190 my $protocol_debug = ($debug && ($debug > 1)); # Only advanced levels of debug will show the
GuestProcess protocol messages
191 if ($protocol_debug) {
192 print
"PARENT_RDR is ", fileno($PARENT_RDR),
"\n";
193 print
"PARENT_WTR is ", fileno($PARENT_WTR),
"\n";
194 print
"CHILD_RDR is ", fileno($CHILD_RDR),
"\n";
195 print
"CHILD_WTR is ", fileno($CHILD_WTR),
"\n";
204 print
"parent is PID $$\n" if $protocol_debug;
206 die
"cannot fork: $!" unless defined $pid;
210 print
"child is PID $$\n" if $protocol_debug;
212 # Do not close the non-standard file descriptors on exec(): the child process will need them ! 214 my $flags = fcntl($PARENT_RDR, F_GETFD, 0);
215 fcntl($PARENT_RDR, F_SETFD, $flags & ~FD_CLOEXEC);
216 $flags = fcntl($PARENT_WTR, F_GETFD, 0);
217 fcntl($PARENT_WTR, F_SETFD, $flags & ~FD_CLOEXEC);
219 exec($wrapper,
'run', $module, fileno($PARENT_RDR), fileno($PARENT_WTR), $debug
223 $CHILD_WTR->autoflush(1);
225 my $self = bless {}, $class;
227 $self->child_out($CHILD_RDR);
228 $self->child_in($CHILD_WTR);
229 $self->child_pid($pid);
230 $self->json_formatter( JSON->new()->indent(0) );
231 $self->{
'_protocol_debug'} = $protocol_debug; # controls the
GuestProcess protocol, not the worker
234 my $other_version = $self->read_message()->{content};
235 if (!$self->check_version_compatibility($other_version)) {
236 $self->send_response(
'NO');
237 die
"eHive's protocol version is '".$self->get_protocol_version.
"' but the wrapper's is '$other_version'\n";
239 $self->send_response(
'OK');
242 $self->print_debug(
"BEFORE READ PARAM_DEFAULTS");
243 $self->param_defaults( $self->read_message()->{content} );
244 $self->send_response(
'OK');
246 $self->print_debug(
"INIT DONE");
252 =head2 _get_wrapper_for_language
255 Description : Finds the wrapper that understands the given language
257 Exceptions : Can die
if the wrapper doesn
't exist 261 sub _get_wrapper_for_language { 264 my $wrapper = $ENV{'EHIVE_WRAPPER_
'.(uc $language)} # User-overriden wrapper 265 || sprintf('%s/wrappers/%s/wrapper
', $ENV{'EHIVE_ROOT_DIR
'}, $language); # Embedded wrapper 266 if (not -e $wrapper) { 267 die "$language is currently not supported\n"; 268 } elsif (not -s $wrapper) { 269 die "The wrapper '$wrapper
' is an empty file !\n"; 270 } elsif (not -x $wrapper) { 271 die "No permissions to execute the wrapper '$wrapper
'\n"; 277 =head2 _get_all_registered_wrappers 279 Example : my $all_languages = Bio::EnsEMBL::Hive::GuestProcess::_get_all_registered_wrappers() 280 Description : Lists all the languages and wrappers that are registered (either 281 under via a EHIVE_WRAPPER environment variable, or via a "wrapper" 282 file under $EHIVE_ROOT_DIR/wrappers/). 283 Returntype : Hashref { String => String } 288 sub _get_all_registered_wrappers { 290 foreach my $variable (keys %ENV) { 291 if ($variable =~ /^EHIVE_WRAPPER_(.*)$/) { 292 $all_found{lc $1} = $ENV{$variable}; 295 foreach my $wrapper (glob $ENV{'EHIVE_ROOT_DIR
'}.'/wrappers
public void print_debug()
public String get_protocol_version()
protected String _get_wrapper_for_language()
public Bio::EnsEMBL::Hive::GuestProcess new()