ensembl-hive  2.6
GuestProcess.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 SYNOPSIS
8 
9 This is a variant of Bio::EnsEMBL::Hive::Process that forks into a wrapper that can itself
10 run jobs (runnables) written in a different language
11 
12 =head1 DESCRIPTION
13 
14 Upon initialisation, GuestProcess forks, and the child process executes the wrapper that
15 will allow running Runnables of the other language. The communication is ensured by two
16 pipes and is schematically similar to running "| wrapper |", except that GuestProcess
17 uses non-standard file descriptors, thus allowing the Runnable to still use std{in,out,err}.
18 
19 The wrapper receives the two file-numbers that it is meant to use (one for reading data
20 from GuestProcess, and one to send data to GuestProcess). All the messages are passed
21 around in single-line JSON structures. The protocol is described below using the convention:
22  ---> represents a message sent to the child process,
23  <--- represents a message sent by the child process
24 
25 The initialisation (in the constructor) consists in checking that both sides spek the same
26 version of the protocol:
27  <--- { "version": "XXX" }
28  ---> "OK"
29 GuestProcess will bail out if the response is not "OK"
30 
31 Then, the child process (i.e. the runnable) will send its default parameters to GuestProcess.
32 This fills the usual param_defaults() section of the Runnable:
33  <--- { ... param_defaults ... }
34  ---> "OK"
35 
36 The child process then goes to sleep, waiting for jobs to be seeded. Meanwhile,
37 GuestProcess enters a number of life_cycle() executions (as triggered by Worker).
38 Each one first sends a JSON object to the child process to initialize the job parameters
39  ---> {
40  "input_job": {
41  "parameters": { ... the unsubstituted job parameters as compiled by Worker ... },
42  // followed by several attributes of the job
43  "input_id": { ... },
44  "dbID": XXX,
45  "retry_count": XXX
46  },
47  "execute_writes": [1|0],
48  "debug": XXX
49  }
50  <--- "OK"
51 
52 From this point, GuestProcess acts as a server, listening to events sent by the child.
53 Events are JSON objects composed of an "event" field (the name of the event) and a
54 "content" field (the payload). Events can be of the following kinds (with the expected
55 response from GuestProcess):
56 
57  <--- JOB_STATUS_UPDATE
58  // The content is one of "PRE_CLEANUP", "FETCH_INPUT", "RUN", "WRITE_OUTPUT", "POST_HEALTHCHECK", "POST_CLEANUP"
59  ---> "OK"
60 
61  <--- WARNING
62  // The content is a JSON object:
63  {
64  "message": "XXX",
65  "is_error": [true|false],
66  }
67  ---> "OK"
68 
69  <--- DATAFLOW
70  // The content is a JSON object:
71  {
72  "branch_name_or_code": XXX,
73  "output_ids": an array or a hash,
74  "params": {
75  "substituted": { ... the parameters that are currently substituted ... }
76  "unsubstituted": { ... the parameters that have not yet been substituted ... }
77  }
78  }
79  ---> dbIDs of the jobs that have been created
80 
81  <--- WORKER_TEMP_DIRECTORY
82  // No content needed (ignored)
83  ---> returns the temporary directory of the worker
84 
85  <--- JOB_END
86  // The content is a JSON object describing the final state of the job
87  {
88  "complete": [true|false],
89  "job": {
90  "autoflow": [true|false],
91  "lethal_for_worker": [true|false],
92  "transient_error": [true|false],
93  },
94  "params": {
95  "substituted": { ... the parameters that are currently substituted ... }
96  "unsubstituted": { ... the parameters that have not yet been substituted ... }
97  }
98  }
99  ---> "OK"
100 
101 
102 =head1 LICENSE
103 
104 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
105 Copyright [2016-2024] EMBL-European Bioinformatics Institute
106 
107 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
108 You may obtain a copy of the License at
109 
110  http://www.apache.org/licenses/LICENSE-2.0
111 
112 Unless required by applicable law or agreed to in writing, software distributed under the License
113 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
114 See the License for the specific language governing permissions and limitations under the License.
115 
116 =head1 CONTACT
117 
118 Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
119 
120 =cut
121 
122 
123 package Bio::EnsEMBL::Hive::GuestProcess;
124 
125 use strict;
126 use warnings;
127 
128 use JSON;
129 use IO::Handle;
130 
131 use Data::Dumper;
132 
133 use base ('Bio::EnsEMBL::Hive::Process');
134 
135 
136 # -------------------------------------- <versioning of the GuestProcess interface> -------------------------------------------------------
137 
138 our $GUESTPROCESS_PROTOCOL_VERSION = '5'; # Make sure you change this number whenever an incompatible change is introduced
139 
140 
141 =head2 get_protocol_version
142 
144  Description : Returns the version number of the communication protocol
145  Returntype : String
146 
147 =cut
148 
149 sub get_protocol_version {
150  return $GUESTPROCESS_PROTOCOL_VERSION;
151 }
152 
153 sub check_version_compatibility {
154  my ($self, $other_version) = @_;
155 
156  my $gpv = $self->get_protocol_version();
157 # warn "$self : GPV='$gpv', MV='$other_version'\n";
158 
159  return ((defined $other_version) and ($other_version=~/^$gpv\./)) ? 1 : 0;
160 }
161 
162 # -------------------------------------- </versioning of the GuestProcess interface> ------------------------------------------------------
163 
164 
165 =head2 new
166 
167  Arg[1] : $language: the programming language the external runnable is in
168  Arg[2] : $module: the name of the runnable (usually a package name)
170  Description : Constructor
172  Exceptions : if $language or $module is not defined properly or if the pipes /
173  child process could not be created
174 
175 =cut
176 
177 sub new {
178 
179  my ($class, $debug, $language, $module) = @_;
180 
181  die "GuestProcess must be told which language to interface with" unless $language;
182 
183  my $wrapper = _get_wrapper_for_language($language);
184  die "GuestProcess must be told which module to run" unless $module;
185 
186  my ($PARENT_RDR, $PARENT_WTR, $CHILD_WTR,$CHILD_RDR);
187  pipe($PARENT_RDR, $CHILD_WTR) or die 'Could not create a pipe to send data to the child !';
188  pipe($CHILD_RDR, $PARENT_WTR) or die 'Could not create a pipe to get data from the child !';;
189 
190  my $protocol_debug = ($debug && ($debug > 1)); # Only advanced levels of debug will show the GuestProcess protocol messages
191  if ($protocol_debug) {
192  print "PARENT_RDR is ", fileno($PARENT_RDR), "\n";
193  print "PARENT_WTR is ", fileno($PARENT_WTR), "\n";
194  print "CHILD_RDR is ", fileno($CHILD_RDR), "\n";
195  print "CHILD_WTR is ", fileno($CHILD_WTR), "\n";
196  }
197 
198  my $pid;
199 
200  if ($pid = fork()) {
201  # In the parent
202  close $PARENT_RDR;
203  close $PARENT_WTR;
204  print "parent is PID $$\n" if $protocol_debug;
205  } else {
206  die "cannot fork: $!" unless defined $pid;
207  # In the child
208  close $CHILD_RDR;
209  close $CHILD_WTR;
210  print "child is PID $$\n" if $protocol_debug;
211 
212  # Do not close the non-standard file descriptors on exec(): the child process will need them !
213  use Fcntl;
214  my $flags = fcntl($PARENT_RDR, F_GETFD, 0);
215  fcntl($PARENT_RDR, F_SETFD, $flags & ~FD_CLOEXEC);
216  $flags = fcntl($PARENT_WTR, F_GETFD, 0);
217  fcntl($PARENT_WTR, F_SETFD, $flags & ~FD_CLOEXEC);
218 
219  exec($wrapper, 'run', $module, fileno($PARENT_RDR), fileno($PARENT_WTR), $debug//0);
220  }
221 
222 
223  $CHILD_WTR->autoflush(1);
224 
225  my $self = bless {}, $class;
226 
227  $self->child_out($CHILD_RDR);
228  $self->child_in($CHILD_WTR);
229  $self->child_pid($pid);
230  $self->json_formatter( JSON->new()->indent(0) );
231  $self->{'_protocol_debug'} = $protocol_debug; # controls the GuestProcess protocol, not the worker
232 
233  $self->print_debug('CHECK VERSION NUMBER');
234  my $other_version = $self->read_message()->{content};
235  if (!$self->check_version_compatibility($other_version)) {
236  $self->send_response('NO');
237  die "eHive's protocol version is '".$self->get_protocol_version."' but the wrapper's is '$other_version'\n";
238  } else {
239  $self->send_response('OK');
240  }
241 
242  $self->print_debug("BEFORE READ PARAM_DEFAULTS");
243  $self->param_defaults( $self->read_message()->{content} );
244  $self->send_response('OK');
245 
246  $self->print_debug("INIT DONE");
247 
248  return $self;
249 }
250 
251 
252 =head2 _get_wrapper_for_language
253 
255  Description : Finds the wrapper that understands the given language
256  Returntype : String
257  Exceptions : Can die if the wrapper doesn't exist
258 
259 =cut
260 
261 sub _get_wrapper_for_language {
262  my ($language) = @_;
263 
264  my $wrapper = $ENV{'EHIVE_WRAPPER_'.(uc $language)} # User-overriden wrapper
265  || sprintf('%s/wrappers/%s/wrapper', $ENV{'EHIVE_ROOT_DIR'}, $language); # Embedded wrapper
266  if (not -e $wrapper) {
267  die "The path '$wrapper' doesn't exist !\n";
268  } elsif (not -s $wrapper) {
269  die "The wrapper '$wrapper' is an empty file !\n";
270  } elsif (not -x $wrapper) {
271  die "No permissions to execute the wrapper '$wrapper'\n";
272  }
273  return $wrapper;
274 }
275 
276 
277 =head2 _get_all_registered_wrappers
278 
279  Example : my $all_languages = Bio::EnsEMBL::Hive::GuestProcess::_get_all_registered_wrappers()
280  Description : Lists all the languages and wrappers that are registered (either
281  under via a EHIVE_WRAPPER environment variable, or via a "wrapper"
282  file under $EHIVE_ROOT_DIR/wrappers/).
283  Note that those wrappers are not necessarily usable, as 1) _get_wrapper_for_language
284  performs additional checks and 2) the version numbers have to match
285  Returntype : Hashref { String => String }
286  Exceptions : None
287 
288 =cut
289 
290 sub _get_all_registered_wrappers {
291  my %all_found;
292  foreach my $variable (keys %ENV) {
293  if ($variable =~ /^EHIVE_WRAPPER_(.*)$/) {
294  $all_found{lc $1} = $ENV{$variable};
295  }
296  }
297  foreach my $wrapper (glob $ENV{'EHIVE_ROOT_DIR'}.'/wrappers/*/wrapper' ) {
298  $wrapper =~ /\/wrappers\/(.*)\/wrapper$/;
299  $all_found{$1} = $wrapper;
300  }
301  return \%all_found;
302 }
303 
304 
305 =head2 get_wrapper_version
306 
307  Example : Bio::EnsEMBL::Hive::GuestProcess::get_wrapper_version();
308  Description : Ask the wrapper what version it is on. The major number is expected to match $GUESTPROCESS_PROTOCOL_VERSION
309  Returntype : String
310  Exceptions : Die if there is no wrapper
311 
312 =cut
313 
314 sub get_wrapper_version {
315  my $language = shift;
316  my $wrapper = _get_wrapper_for_language($language);
317  my $version = `$wrapper version 2> /dev/null`;
318  chomp $version;
319  return $version;
320 }
321 
322 
323 =head2 assert_runnable_exists
324 
325  Example : Bio::EnsEMBL::Hive::GuestProcess::assert_runnable_exists('python3', 'eHive.examples.TestRunnable');
326  Description : Ask the wrapper to check whether the runnable exists (can be loaded)
327  Returntype : None
328  Exceptions : Die if there is no wrapper or the runnable can't be loaded
329 
330 =cut
331 
332 sub assert_runnable_exists {
333  my ($language, $runnable_module_name) = @_;
334  my $wrapper = _get_wrapper_for_language($language);
335  if (system($wrapper, 'check_exists', $runnable_module_name)) {
336  die "The runnable module '$runnable_module_name' cannot be loaded or compiled\n";
337  }
338 }
339 
340 
341 =head2 build_wrapper_for_language
342 
343  Example : Bio::EnsEMBL::Hive::GuestProcess::build_wrapper_for_language('java');
344  Description : Ask the wrapper to build all the necessary code to run Runnables of this language
345  Returntype : None
346  Exceptions : Die if there is no wrapper or the build fails
347 
348 =cut
349 
350 sub build_wrapper_for_language {
351  my $language = shift;
352  my $wrapper = _get_wrapper_for_language($language);
353  if (system($wrapper, 'build')) {
354  die "The $language wrapper cannot be built\n";
355  }
356 }
357 
358 
359 =head2 DESTROY
360 
361  Description : Destructor: tells the child to exit by sending an empty JSON object
362  Returntype : none
363 
364 =cut
365 
366 sub DESTROY {
367  my $self = shift;
368  $self->print_debug("DESTROY");
369  $self->child_in->print("{}\n");
370  #kill('KILL', $self->child_pid);
371 }
372 
373 
374 =head2 print_debug
375 
376  Example : $process->print_debug("debug message");
377  Description : Prints a message if $self->{'_protocol_debug'} is set
378  Returntype : none
379 
380 =cut
381 
382 sub print_debug {
383  my ($self, $msg) = @_;
384  print sprintf("PERL %d: %s\n", $self->child_pid, $msg) if $self->{'_protocol_debug'};
385 }
386 
387 ##############
388 # Attributes #
389 ##############
390 
391 
392 =head2 child_in
393 
394  Example : my $child_in = $process->child_in();
395  Example : $process->child_in(*CHILD_WTR);
396  Description : Getter/Setter for the file handle that allows talking to the
397  child process.
398  Returntype : IO::Handle
399  Exceptions : none
400 
401 =cut
402 
403 sub child_in {
404  my $self = shift;
405  $self->{'_child_in'} = shift if @_;
406  return $self->{'_child_in'};
407 }
408 
409 =head2 child_out
410 
411  Example : my $child_out = $process->child_out();
412  Example : $process->child_out(*CHILD_RDR);
413  Description : Getter/Setter for the file handle that allows receiving data
414  from the child process.
415  Returntype : IO::Handle
416  Exceptions : none
417 
418 =cut
419 
420 sub child_out {
421  my $self = shift;
422  $self->{'_child_out'} = shift if @_;
423  return $self->{'_child_out'};
424 }
425 
426 =head2 child_pid
427 
428  Example : my $child_pid = $process->child_pid();
429  Example : $process->child_pid($child_pid);
430  Description : Getter/Setter for the process ID of the child
431  Returntype : integer
432  Exceptions : none
433 
434 =cut
435 
436 sub child_pid {
437  my $self = shift;
438  $self->{'_child_pid'} = shift if @_;
439  return $self->{'_child_pid'};
440 }
441 
442 
443 =head2 json_formatter
444 
445  Example : my $json_formatter = $object_name->json_formatter();
446  Example : $object_name->json_formatter($json_formatter);
447  Description : Getter/Setter for the JSON formatter.
448  Returntype : instance of JSON
449  Exceptions : none
450 
451 =cut
452 
453 sub json_formatter {
454  my $self = shift;
455  $self->{'_json_formatter'} = shift if @_;
456  return $self->{'_json_formatter'};
457 }
458 
459 
460 ################################
461 # Communication with the child #
462 ################################
463 
464 =head2 send_message
465 
466  Example : $process->send_message($perl_structure);
467  Description : Send the Perl structure to the child process via the pipe (and
468  serialized in JSON).
469  Returntype : none
470  Exceptions : raised by JSON / IO::Handle
471 
472 =cut
473 
474 sub send_message {
475  my ($self, $struct) = @_;
476  my $j = $self->json_formatter->encode($struct);
477  $self->print_debug("send_message $j");
478  $self->child_in->print($j."\n");
479 }
480 
481 
482 =head2 send_response
483 
484  Example : $process->send_response('OK');
485  Description : Wrapper around send_message to send a response to the child.
486  Returntype : none
487  Exceptions : raised by JSON / IO::Handle
488 
489 =cut
490 
491 sub send_response {
492  my ($self, $response) = @_;
493  return $self->send_message({'response' => $response});
494 }
495 
496 
497 =head2 read_message
498 
499  Example : my $msg = $process->read_message();
500  Description : Wait for and read the next message coming from the child.
501  Again, the message itself is serialized and transmitted
502  via the pipe
503  Returntype : Perl structure
504  Exceptions : raised by JSON / IO::Handle
505 
506 =cut
507 
508 sub read_message {
509  my $self = shift;
510  my $s = $self->child_out->getline();
511  die "Did not receive any messages" unless defined $s;
512  chomp $s;
513  $self->print_debug("read_message: $s");
514  return $self->json_formatter->decode($s);
515 }
516 
517 
518 =head2 wait_for_OK
519 
520  Example : $process->wait_for_OK();
521  Description : Wait for the child process to send the OK signal
522  Returntype : none
523  Exceptions : dies if the response is not OK, or anything raised by L<read_message()>
524 
525 =cut
526 
527 sub wait_for_OK {
528  my $self = shift;
529  my $s = $self->read_message();
530  die "Response message does not look like a response" if not exists $s->{'response'};
531  die "Received response is not OK" if ref($s->{'response'}) or $s->{'response'} ne 'OK';
532 }
533 
534 
535 ###########################
536 # Hive::Process interface #
537 ###########################
538 
539 
540 =head2 param_defaults
541 
542  Example : my $param_defaults = $runnable->param_defaults();
543  Example : $runnable->param_defaults($param_defaults);
544  Description : Getter/Setter for the default parameters of this runnable.
545  Hive only uses it as a getter, but here, we need a setter to
546  define the parameters at the Perl layer once they've been
547  retrieved from the child process.
548  Returntype : Hashref
549  Exceptions : none
550 
551 =cut
552 
553 sub param_defaults {
554  my $self = shift;
555  $self->{'_param_defaults'} = shift if @_;
556  return $self->{'_param_defaults'};
557 }
558 
559 
560 =head2 life_cycle
561 
562  Example : my $partial_timings = $runnable->life_cycle();
563  Description : Runs the life-cycle of the input job and returns the timings
564  of each Runnable method (fetch_input, run, etc).
565  See the description of this module for details about the protocol
566  Returntype : Hashref
567  Exceptions : none
568 
569 =cut
570 
571 sub life_cycle {
572  my $self = shift;
573 
574  $self->print_debug("LIFE_CYCLE");
575 
576  my $job = $self->input_job();
577  my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
578  my %job_partial_timing = ();
579 
580  my %struct = (
581  input_job => {
582  parameters => $job->{_unsubstituted_param_hash},
583  input_id => $job->input_id,
584  dbID => defined $job->dbID ? $job->dbID + 0 : 0,
585  retry_count => $job->retry_count + 0,
586  },
587  execute_writes => $self->execute_writes || 0,
588  debug => $self->debug || 0,
589  );
590  $self->print_debug("SEND JOB PARAM");
591  $self->send_message(\%struct);
592  $self->wait_for_OK();
593 
594  # A simple event loop
595  while (1) {
596  $self->print_debug("WAITING IN LOOP");
597 
598  my $msg = $self->read_message;
599  my $event = $msg->{event};
600  my $content = $msg->{content};
601  $self->print_debug("processing event '$event'");
602 
603  if ($event eq 'JOB_STATUS_UPDATE') {
604  $job_partial_timing{$job->status} = $partial_stopwatch->get_elapsed() if ($job->status ne 'READY') and ($job->status ne 'CLAIMED');
605  $self->enter_status(uc $content);
606  $partial_stopwatch->restart();
607  $self->send_response('OK');
608 
609  } elsif ($event eq 'WARNING') {
610  $self->warning($content->{message}, $content->{is_error}?'WORKER_ERROR':'INFO');
611  $self->send_response('OK');
612 
613  } elsif ($event eq 'DATAFLOW') {
614  $job->{_param_hash} = $content->{params}->{substituted};
615  $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
616  my $d = $self->dataflow_output_id($content->{output_ids}, $content->{branch_name_or_code});
617  $self->send_response($d);
618 
619  } elsif ($event eq 'WORKER_TEMP_DIRECTORY') {
620  my $wtd = $self->worker_temp_directory;
621  $self->send_response($wtd);
622 
623  } elsif ($event eq 'JOB_END') {
624  # Especially here we need to be careful about boolean values
625  # They are coded as JSON::true and JSON::false which have
626  # different meanings in text / number contexts
627  $job->autoflow($job->autoflow and $content->{job}->{autoflow});
628  $job->lethal_for_worker($content->{job}->{lethal_for_worker}?1:0);
629  $job->transient_error($content->{job}->{transient_error}?1:0);
630  $job->{_param_hash} = $content->{params}->{substituted};
631  $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
632 
633  # This piece of code is duplicated from Process
634  if ($content->{complete}) {
635  if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
636  $self->say_with_header( ': AUTOFLOW input->output' );
637  $job->dataflow_output_id();
638  }
639 
640  my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
641  if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
642  $job->transient_error(0);
643  die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
644  }
645  } else {
646  $job->died_somewhere(1);
647  }
648  $self->send_response('OK');
649  return \%job_partial_timing;
650  } else {
651  die "Unknown event '$event' coming from the child";
652  }
653  }
654 }
655 
656 
657 ### Summary of Process methods ###
658 
659 ## Have to be redefined
660 # life_cycle
661 # param_defaults
662 
663 ## Needed, can be reused from the base class
664 # worker_temp_directory
665 # input_job
666 # execute_writes
667 # debug
668 # dataflow_output_id
669 # enter_status -> worker / say_with_header
670 # warning
671 # cleanup_worker_temp_directory
672 
673 ## Invalid in this context
674 # strict_hash_format
675 # fetch_input
676 # run
677 # write_output
678 # db
679 # dbc
680 # data_dbc
681 # input_id
682 # complete_early
683 # throw
684 
685 
686 1;
Bio::EnsEMBL::Hive::GuestProcess
Definition: GuestProcess.pm:105
Bio::EnsEMBL::Hive::GuestProcess::_get_wrapper_for_language
protected String _get_wrapper_for_language()
Bio::EnsEMBL::Hive::Version
Definition: Version.pm:19
Bio::EnsEMBL::Hive::GuestProcess::get_protocol_version
public String get_protocol_version()
Bio::EnsEMBL::Hive::Process
Definition: Process.pm:77
debug
public debug()
run
public run()
Bio::EnsEMBL::Hive::GuestProcess::new
public Bio::EnsEMBL::Hive::GuestProcess new()
Bio::EnsEMBL::Hive
Definition: Hive.pm:38
Bio::EnsEMBL::Hive::GuestProcess::print_debug
public void print_debug()