ensembl-hive  2.5
GuestProcess.pm
Go to the documentation of this file.
1 =pod
2 
3 =head1 NAME
4 
6 
7 =head1 SYNOPSIS
8 
9 This is a variant of Bio::EnsEMBL::Hive::Process that forks into a wrapper that can itself
10 run jobs (runnables) written in a different language
11 
12 =head1 DESCRIPTION
13 
14 Upon initialisation, GuestProcess forks, and the child process executes the wrapper that
15 will allow running Runnables of the other language. The communication is ensured by two
16 pipes and is schematically similar to running "| wrapper |", except that GuestProcess
17 uses non-standard file descriptors, thus allowing the Runnable to still use std{in,out,err}.
18 
19 The wrapper receives the two file-numbers that it is meant to use (one for reading data
20 from GuestProcess, and one to send data to GuestProcess). All the messages are passed
21 around in single-line JSON structures. The protocol is described below using the convention:
22  ---> represents a message sent to the child process,
23  <--- represents a message sent by the child process
24 
25 The initialisation (in the constructor) consists in checking that both sides spek the same
26 version of the protocol:
27  <--- { "version": "XXX" }
28  ---> "OK"
29 GuestProcess will bail out if the response is not "OK"
30 
31 Then, the child process (i.e. the runnable) will send its default parameters to GuestProcess.
32 This fills the usual param_defaults() section of the Runnable:
33  <--- { ... param_defaults ... }
34  ---> "OK"
35 
36 The child process then goes to sleep, waiting for jobs to be seeded. Meanwhile,
37 GuestProcess enters a number of life_cycle() executions (as triggered by Worker).
38 Each one first sends a JSON object to the child process to initialize the job parameters
39  ---> {
40  "input_job": {
41  "parameters": { ... the unsubstituted job parameters as compiled by Worker ... },
42  // followed by several attributes of the job
43  "input_id": { ... },
44  "dbID": XXX,
45  "retry_count": XXX
46  },
47  "execute_writes": [1|0],
48  "debug": XXX
49  }
50  <--- "OK"
51 
52 From this point, GuestProcess acts as a server, listening to events sent by the child.
53 Events are JSON objects composed of an "event" field (the name of the event) and a
54 "content" field (the payload). Events can be of the following kinds (with the expected
55 response from GuestProcess):
56 
57  <--- JOB_STATUS_UPDATE
58  // The content is one of "PRE_CLEANUP", "FETCH_INPUT", "RUN", "WRITE_OUTPUT", "POST_HEALTHCHECK", "POST_CLEANUP"
59  ---> "OK"
60 
61  <--- WARNING
62  // The content is a JSON object:
63  {
64  "message": "XXX",
65  "is_error": [true|false],
66  }
67  ---> "OK"
68 
69  <--- DATAFLOW
70  // The content is a JSON object:
71  {
72  "branch_name_or_code": XXX,
73  "output_ids": an array or a hash,
74  "params": {
75  "substituted": { ... the parameters that are currently substituted ... }
76  "unsubstituted": { ... the parameters that have not yet been substituted ... }
77  }
78  }
79  ---> dbIDs of the jobs that have been created
80 
81  <--- WORKER_TEMP_DIRECTORY
82  // The content is the "worker_temp_directory_name" as defined in the runnable (or null otherwise)
83  ---> returns the temporary directory of the worker
84 
85  <--- JOB_END
86  // The content is a JSON object describing the final state of the job
87  {
88  "complete": [true|false],
89  "job": {
90  "autoflow": [true|false],
91  "lethal_for_worker": [true|false],
92  "transient_error": [true|false],
93  },
94  "params": {
95  "substituted": { ... the parameters that are currently substituted ... }
96  "unsubstituted": { ... the parameters that have not yet been substituted ... }
97  }
98  }
99  ---> "OK"
100 
101 
102 =head1 LICENSE
103 
104 Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
105 Copyright [2016-2022] EMBL-European Bioinformatics Institute
106 
107 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
108 You may obtain a copy of the License at
109 
110  http://www.apache.org/licenses/LICENSE-2.0
111 
112 Unless required by applicable law or agreed to in writing, software distributed under the License
113 is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
114 See the License for the specific language governing permissions and limitations under the License.
115 
116 =head1 CONTACT
117 
118 Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
119 
120 =cut
121 
122 
123 package Bio::EnsEMBL::Hive::GuestProcess;
124 
125 use strict;
126 use warnings;
127 
128 use JSON;
129 use IO::Handle;
130 
131 use Data::Dumper;
132 
133 use base ('Bio::EnsEMBL::Hive::Process');
134 
135 
136 # -------------------------------------- <versioning of the GuestProcess interface> -------------------------------------------------------
137 
138 our $GUESTPROCESS_PROTOCOL_VERSION = '3'; # Make sure you change this number whenever an incompatible change is introduced
139 
140 
141 =head2 get_protocol_version
142 
144  Description : Returns the version number of the communication protocol
145  Returntype : String
146 
147 =cut
148 
149 sub get_protocol_version {
150  return $GUESTPROCESS_PROTOCOL_VERSION;
151 }
152 
153 sub check_version_compatibility {
154  my ($self, $other_version) = @_;
155 
156  my $gpv = $self->get_protocol_version();
157 # warn "$self : GPV='$gpv', MV='$other_version'\n";
158 
159  return ((defined $other_version) and ($other_version=~/^$gpv\./)) ? 1 : 0;
160 }
161 
162 # -------------------------------------- </versioning of the GuestProcess interface> ------------------------------------------------------
163 
164 
165 =head2 new
166 
167  Arg[1] : $language: the programming language the external runnable is in
168  Arg[2] : $module: the name of the runnable (usually a package name)
170  Description : Constructor
172  Exceptions : if $language or $module is not defined properly or if the pipes /
173  child process could not be created
174 
175 =cut
176 
177 sub new {
178 
179  my ($class, $debug, $language, $module) = @_;
180 
181  die "GuestProcess must be told which language to interface with" unless $language;
182 
183  my $wrapper = _get_wrapper_for_language($language);
184  die "GuestProcess must be told which module to run" unless $module;
185 
186  my ($PARENT_RDR, $PARENT_WTR, $CHILD_WTR,$CHILD_RDR);
187  pipe($PARENT_RDR, $CHILD_WTR) or die 'Could not create a pipe to send data to the child !';
188  pipe($CHILD_RDR, $PARENT_WTR) or die 'Could not create a pipe to get data from the child !';;
189 
190  my $protocol_debug = ($debug && ($debug > 1)); # Only advanced levels of debug will show the GuestProcess protocol messages
191  if ($protocol_debug) {
192  print "PARENT_RDR is ", fileno($PARENT_RDR), "\n";
193  print "PARENT_WTR is ", fileno($PARENT_WTR), "\n";
194  print "CHILD_RDR is ", fileno($CHILD_RDR), "\n";
195  print "CHILD_WTR is ", fileno($CHILD_WTR), "\n";
196  }
197 
198  my $pid;
199 
200  if ($pid = fork()) {
201  # In the parent
202  close $PARENT_RDR;
203  close $PARENT_WTR;
204  print "parent is PID $$\n" if $protocol_debug;
205  } else {
206  die "cannot fork: $!" unless defined $pid;
207  # In the child
208  close $CHILD_RDR;
209  close $CHILD_WTR;
210  print "child is PID $$\n" if $protocol_debug;
211 
212  # Do not close the non-standard file descriptors on exec(): the child process will need them !
213  use Fcntl;
214  my $flags = fcntl($PARENT_RDR, F_GETFD, 0);
215  fcntl($PARENT_RDR, F_SETFD, $flags & ~FD_CLOEXEC);
216  $flags = fcntl($PARENT_WTR, F_GETFD, 0);
217  fcntl($PARENT_WTR, F_SETFD, $flags & ~FD_CLOEXEC);
218 
219  exec($wrapper, 'run', $module, fileno($PARENT_RDR), fileno($PARENT_WTR), $debug//0);
220  }
221 
222 
223  $CHILD_WTR->autoflush(1);
224 
225  my $self = bless {}, $class;
226 
227  $self->child_out($CHILD_RDR);
228  $self->child_in($CHILD_WTR);
229  $self->child_pid($pid);
230  $self->json_formatter( JSON->new()->indent(0) );
231  $self->{'_protocol_debug'} = $protocol_debug; # controls the GuestProcess protocol, not the worker
232 
233  $self->print_debug('CHECK VERSION NUMBER');
234  my $other_version = $self->read_message()->{content};
235  if (!$self->check_version_compatibility($other_version)) {
236  $self->send_response('NO');
237  die "eHive's protocol version is '".$self->get_protocol_version."' but the wrapper's is '$other_version'\n";
238  } else {
239  $self->send_response('OK');
240  }
241 
242  $self->print_debug("BEFORE READ PARAM_DEFAULTS");
243  $self->param_defaults( $self->read_message()->{content} );
244  $self->send_response('OK');
245 
246  $self->print_debug("INIT DONE");
247 
248  return $self;
249 }
250 
251 
252 =head2 _get_wrapper_for_language
253 
255  Description : Finds the wrapper that understands the given language
256  Returntype : String
257  Exceptions : Can die if the wrapper doesn't exist
258 
259 =cut
260 
261 sub _get_wrapper_for_language {
262  my ($language) = @_;
263 
264  my $wrapper = $ENV{'EHIVE_WRAPPER_'.(uc $language)} # User-overriden wrapper
265  || sprintf('%s/wrappers/%s/wrapper', $ENV{'EHIVE_ROOT_DIR'}, $language); # Embedded wrapper
266  if (not -e $wrapper) {
267  die "$language is currently not supported\n";
268  } elsif (not -s $wrapper) {
269  die "The wrapper '$wrapper' is an empty file !\n";
270  } elsif (not -x $wrapper) {
271  die "No permissions to execute the wrapper '$wrapper'\n";
272  }
273  return $wrapper;
274 }
275 
276 
277 =head2 _get_all_registered_wrappers
278 
279  Example : my $all_languages = Bio::EnsEMBL::Hive::GuestProcess::_get_all_registered_wrappers()
280  Description : Lists all the languages and wrappers that are registered (either
281  under via a EHIVE_WRAPPER environment variable, or via a "wrapper"
282  file under $EHIVE_ROOT_DIR/wrappers/).
283  Returntype : Hashref { String => String }
284  Exceptions : None
285 
286 =cut
287 
288 sub _get_all_registered_wrappers {
289  my %all_found;
290  foreach my $variable (keys %ENV) {
291  if ($variable =~ /^EHIVE_WRAPPER_(.*)$/) {
292  $all_found{lc $1} = $ENV{$variable};
293  }
294  }
295  foreach my $wrapper (glob $ENV{'EHIVE_ROOT_DIR'}.'/wrappers/*/wrapper' ) {
296  $wrapper =~ /\/wrappers\/(.*)\/wrapper$/;
297  $all_found{$1} = $wrapper;
298  }
299  return \%all_found;
300 }
301 
302 
303 =head2 DESTROY
304 
305  Description : Destructor: tells the child to exit by sending an empty JSON object
306  Returntype : none
307 
308 =cut
309 
310 sub DESTROY {
311  my $self = shift;
312  $self->print_debug("DESTROY");
313  $self->child_in->print("{}\n");
314  #kill('KILL', $self->child_pid);
315 }
316 
317 
318 =head2 print_debug
319 
320  Example : $process->print_debug("debug message");
321  Description : Prints a message if $self->{'_protocol_debug'} is set
322  Returntype : none
323 
324 =cut
325 
326 sub print_debug {
327  my ($self, $msg) = @_;
328  print sprintf("PERL %d: %s\n", $self->child_pid, $msg) if $self->{'_protocol_debug'};
329 }
330 
331 ##############
332 # Attributes #
333 ##############
334 
335 
336 =head2 child_in
337 
338  Example : my $child_in = $process->child_in();
339  Example : $process->child_in(*CHILD_WTR);
340  Description : Getter/Setter for the file handle that allows talking to the
341  child process.
342  Returntype : IO::Handle
343  Exceptions : none
344 
345 =cut
346 
347 sub child_in {
348  my $self = shift;
349  $self->{'_child_in'} = shift if @_;
350  return $self->{'_child_in'};
351 }
352 
353 =head2 child_out
354 
355  Example : my $child_out = $process->child_out();
356  Example : $process->child_out(*CHILD_RDR);
357  Description : Getter/Setter for the file handle that allows receiving data
358  from the child process.
359  Returntype : IO::Handle
360  Exceptions : none
361 
362 =cut
363 
364 sub child_out {
365  my $self = shift;
366  $self->{'_child_out'} = shift if @_;
367  return $self->{'_child_out'};
368 }
369 
370 =head2 child_pid
371 
372  Example : my $child_pid = $process->child_pid();
373  Example : $process->child_pid($child_pid);
374  Description : Getter/Setter for the process ID of the child
375  Returntype : integer
376  Exceptions : none
377 
378 =cut
379 
380 sub child_pid {
381  my $self = shift;
382  $self->{'_child_pid'} = shift if @_;
383  return $self->{'_child_pid'};
384 }
385 
386 
387 =head2 json_formatter
388 
389  Example : my $json_formatter = $object_name->json_formatter();
390  Example : $object_name->json_formatter($json_formatter);
391  Description : Getter/Setter for the JSON formatter.
392  Returntype : instance of JSON
393  Exceptions : none
394 
395 =cut
396 
397 sub json_formatter {
398  my $self = shift;
399  $self->{'_json_formatter'} = shift if @_;
400  return $self->{'_json_formatter'};
401 }
402 
403 
404 ################################
405 # Communication with the child #
406 ################################
407 
408 =head2 send_message
409 
410  Example : $process->send_message($perl_structure);
411  Description : Send the Perl structure to the child process via the pipe (and
412  serialized in JSON).
413  Returntype : none
414  Exceptions : raised by JSON / IO::Handle
415 
416 =cut
417 
418 sub send_message {
419  my ($self, $struct) = @_;
420  my $j = $self->json_formatter->encode($struct);
421  $self->print_debug("send_message $j");
422  $self->child_in->print($j."\n");
423 }
424 
425 
426 =head2 send_response
427 
428  Example : $process->send_response('OK');
429  Description : Wrapper around send_message to send a response to the child.
430  Returntype : none
431  Exceptions : raised by JSON / IO::Handle
432 
433 =cut
434 
435 sub send_response {
436  my ($self, $response) = @_;
437  return $self->send_message({'response' => $response});
438 }
439 
440 
441 =head2 read_message
442 
443  Example : my $msg = $process->read_message();
444  Description : Wait for and read the next message coming from the child.
445  Again, the message itself is serialized and transmitted
446  via the pipe
447  Returntype : Perl structure
448  Exceptions : raised by JSON / IO::Handle
449 
450 =cut
451 
452 sub read_message {
453  my $self = shift;
454  my $s = $self->child_out->getline();
455  die "Did not receive any messages" unless defined $s;
456  chomp $s;
457  $self->print_debug("read_message: $s");
458  return $self->json_formatter->decode($s);
459 }
460 
461 
462 =head2 wait_for_OK
463 
464  Example : $process->wait_for_OK();
465  Description : Wait for the child process to send the OK signal
466  Returntype : none
467  Exceptions : dies if the response is not OK, or anything raised by L<read_message()>
468 
469 =cut
470 
471 sub wait_for_OK {
472  my $self = shift;
473  my $s = $self->read_message();
474  die "Response message does not look like a response" if not exists $s->{'response'};
475  die "Received response is not OK" if ref($s->{'response'}) or $s->{'response'} ne 'OK';
476 }
477 
478 
479 ###########################
480 # Hive::Process interface #
481 ###########################
482 
483 
484 =head2 param_defaults
485 
486  Example : my $param_defaults = $runnable->param_defaults();
487  Example : $runnable->param_defaults($param_defaults);
488  Description : Getter/Setter for the default parameters of this runnable.
489  Hive only uses it as a getter, but here, we need a setter to
490  define the parameters at the Perl layer once they've been
491  retrieved from the child process.
492  Returntype : Hashref
493  Exceptions : none
494 
495 =cut
496 
497 sub param_defaults {
498  my $self = shift;
499  $self->{'_param_defaults'} = shift if @_;
500  return $self->{'_param_defaults'};
501 }
502 
503 
504 =head2 life_cycle
505 
506  Example : my $partial_timings = $runnable->life_cycle();
507  Description : Runs the life-cycle of the input job and returns the timings
508  of each Runnable method (fetch_input, run, etc).
509  See the description of this module for details about the protocol
510  Returntype : Hashref
511  Exceptions : none
512 
513 =cut
514 
515 sub life_cycle {
516  my $self = shift;
517 
518  $self->print_debug("LIFE_CYCLE");
519 
520  my $job = $self->input_job();
521  my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
522  my %job_partial_timing = ();
523 
524  my %struct = (
525  input_job => {
526  parameters => $job->{_unsubstituted_param_hash},
527  input_id => $job->input_id,
528  dbID => defined $job->dbID ? $job->dbID + 0 : 0,
529  retry_count => $job->retry_count + 0,
530  },
531  execute_writes => $self->execute_writes || 0,
532  debug => $self->debug || 0,
533  );
534  $self->print_debug("SEND JOB PARAM");
535  $self->send_message(\%struct);
536  $self->wait_for_OK();
537 
538  # A simple event loop
539  while (1) {
540  $self->print_debug("WAITING IN LOOP");
541 
542  my $msg = $self->read_message;
543  my $event = $msg->{event};
544  my $content = $msg->{content};
545  $self->print_debug("processing event '$event'");
546 
547  if ($event eq 'JOB_STATUS_UPDATE') {
548  $job_partial_timing{$job->status} = $partial_stopwatch->get_elapsed() if ($job->status ne 'READY') and ($job->status ne 'CLAIMED');
549  $self->enter_status(uc $content);
550  $partial_stopwatch->restart();
551  $self->send_response('OK');
552 
553  } elsif ($event eq 'WARNING') {
554  $self->warning($content->{message}, $content->{is_error}?'WORKER_ERROR':'INFO');
555  $self->send_response('OK');
556 
557  } elsif ($event eq 'DATAFLOW') {
558  $job->{_param_hash} = $content->{params}->{substituted};
559  $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
560  my $d = $self->dataflow_output_id($content->{output_ids}, $content->{branch_name_or_code});
561  $self->send_response($d);
562 
563  } elsif ($event eq 'WORKER_TEMP_DIRECTORY') {
564  $self->{worker_temp_directory_name} = $content;
565  my $wtd = $self->worker_temp_directory;
566  $self->send_response($wtd);
567 
568  } elsif ($event eq 'JOB_END') {
569  # Especially here we need to be careful about boolean values
570  # They are coded as JSON::true and JSON::false which have
571  # different meanings in text / number contexts
572  $job->autoflow($job->autoflow and $content->{job}->{autoflow});
573  $job->lethal_for_worker($content->{job}->{lethal_for_worker}?1:0);
574  $job->transient_error($content->{job}->{transient_error}?1:0);
575  $job->{_param_hash} = $content->{params}->{substituted};
576  $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
577 
578  # This piece of code is duplicated from Process
579  if ($content->{complete}) {
580  if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
581  $self->say_with_header( ': AUTOFLOW input->output' );
582  $job->dataflow_output_id();
583  }
584 
585  my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
586  if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
587  $job->transient_error(0);
588  die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
589  }
590  } else {
591  $job->died_somewhere(1);
592  }
593  $self->send_response('OK');
594  return \%job_partial_timing;
595  } else {
596  die "Unknown event '$event' coming from the child";
597  }
598  }
599 }
600 
601 =head2 worker_temp_directory_name
602 
603  Example : $process->worker_temp_directory_name();
604  Description : Returns the name of the temp directory for this module
605  The value in $self is initialized at the WORKER_TEMP_DIRECTORY
606  event above and returned to the caller if defined. This allows
607  runnables to redefine the name
608  Returntype : string
609  Exceptions : none
610 
611 =cut
612 
613 sub worker_temp_directory_name {
614  my $self = shift;
615  return $self->{worker_temp_directory_name} if $self->{worker_temp_directory_name};
616  return $self->SUPER::worker_temp_directory_name();
617 }
618 
619 
620 
621 
622 ### Summary of Process methods ###
623 
624 ## Have to be redefined
625 # life_cycle
626 # param_defaults
627 # worker_temp_directory_name
628 
629 ## Needed, can be reused from the base class
630 # worker_temp_directory
631 # input_job
632 # execute_writes
633 # debug
634 # dataflow_output_id
635 # enter_status -> worker / say_with_header
636 # warning
637 # cleanup_worker_temp_directory
638 
639 ## Invalid in this context
640 # strict_hash_format
641 # fetch_input
642 # run
643 # write_output
644 # db
645 # dbc
646 # data_dbc
647 # input_id
648 # complete_early
649 # throw
650 
651 
652 1;
public String get_protocol_version()
protected String _get_wrapper_for_language()
public Bio::EnsEMBL::Hive::GuestProcess new()