ensembl-hive-python3  2.5
Process.py
Go to the documentation of this file.
1 
2 # Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
3 # Copyright [2016-2022] EMBL-European Bioinformatics Institute
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import eHive.Params
18 
19 import os
20 import sys
21 import json
22 import numbers
23 import unittest
24 import warnings
25 import traceback
26 
27 __version__ = "3.0"
28 
29 __doc__ = """
30 This module mainly implements python's counterpart of GuestProcess. Read
31 the later for more information about the JSON protocol used to communicate.
32 """
33 
34 class Job(object):
35  """Dummy class to hold job-related information"""
36  pass
37 
38 class CompleteEarlyException(Exception):
39  """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
40  pass
41 class JobFailedException(Exception):
42  """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
43  pass
44 class HiveJSONMessageException(Exception):
45  """Raised when we could not parse the JSON message coming from GuestProcess"""
46  pass
47 class LostHiveConnectionException(Exception):
48  """Raised when the process has lost the communication pipe with the Perl side"""
49  pass
50 
51 
52 class BaseRunnable(object):
53  """This is the counterpart of GuestProcess. Note that most of the methods
54  are private to be hidden in the derived classes.
55 
56  This class can be used as a base-class for people to redefine fetch_input(),
57  run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
58  Jobs are supposed to raise CompleteEarlyException in case they complete before
59  reaching. They can also raise JobFailedException to indicate a general failure
60  """
61 
62  # Private BaseRunnable interface
63 
64 
65  def __init__(self, read_fileno, write_fileno, debug):
66  # We need the binary mode to disable the buffering
67  self.__read_pipe = os.fdopen(read_fileno, mode='rb', buffering=0)
68  self.__write_pipe = os.fdopen(write_fileno, mode='wb', buffering=0)
69  self.__pid = os.getpid()
70  self.debug = debug
72 
73  def __print_debug(self, *args):
74  if self.debug > 1:
75  print("PYTHON {0}".format(self.__pid), *args, file=sys.stderr)
76 
77  # FIXME: we can probably merge __send_message and __send_response
78 
79  def __send_message(self, event, content):
80  """seralizes the message in JSON and send it to the parent process"""
81  def default_json_encoder(o):
82  self.__print_debug("Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
83  return 'UNSERIALIZABLE OBJECT'
84  j = json.dumps({'event': event, 'content': content}, indent=None, default=default_json_encoder)
85  self.__print_debug('__send_message:', j)
86  # UTF8 encoding has never been tested. Just hope it works :)
87  try:
88  self.__write_pipe.write(bytes(j+"\n", 'utf-8'))
89  except BrokenPipeError as e:
90  raise LostHiveConnectionException("__write_pipe") from None
91 
92  def __send_response(self, response):
93  """Sends a response message to the parent process"""
94  self.__print_debug('__send_response:', response)
95  # Like above, UTF8 encoding has never been tested. Just hope it works :)
96  try:
97  self.__write_pipe.write(bytes('{"response": "' + str(response) + '"}\n', 'utf-8'))
98  except BrokenPipeError as e:
99  raise LostHiveConnectionException("__write_pipe") from None
100 
101  def __read_message(self):
102  """Read a message from the parent and parse it"""
103  try:
104  self.__print_debug("__read_message ...")
105  l = self.__read_pipe.readline()
106  self.__print_debug(" ... -> ", l[:-1].decode())
107  return json.loads(l.decode())
108  except BrokenPipeError as e:
109  raise LostHiveConnectionException("__read_pipe") from None
110  except ValueError as e:
111  # HiveJSONMessageException is a more meaningful name than ValueError
112  raise HiveJSONMessageException from e
113 
114  def __send_message_and_wait_for_OK(self, event, content):
115  """Send a message and expects a response to be 'OK'"""
116  self.__send_message(event, content)
117  response = self.__read_message()
118  if response['response'] != 'OK':
119  raise HiveJSONMessageException("Received '{0}' instead of OK".format(response))
120 
121  def __process_life_cycle(self):
122  """Simple loop: wait for job parameters, do the job's life-cycle"""
123  self.__send_message_and_wait_for_OK('VERSION', __version__)
124  self.__send_message_and_wait_for_OK('PARAM_DEFAULTS', self.param_defaults())
126  while True:
127  self.__print_debug("waiting for instructions")
128  config = self.__read_message()
129  if 'input_job' not in config:
130  self.__print_debug("no params, this is the end of the wrapper")
131  return
132  self.__job_life_cycle(config)
134  def __job_life_cycle(self, config):
135  """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
136  self.__print_debug("__life_cycle")
137 
138  # Params
139  self.__params = eHive.Params.ParamContainer(config['input_job']['parameters'], self.debug > 1)
140 
141  # Job attributes
142  self.input_job = Job()
143  for x in ['dbID', 'input_id', 'retry_count']:
144  setattr(self.input_job, x, config['input_job'][x])
145  self.input_job.autoflow = True
146  self.input_job.lethal_for_worker = False
147  self.input_job.transient_error = True
148 
149  # Worker attributes
150  self.debug = config['debug']
152  # Which methods should be run
153  steps = [ 'fetch_input', 'run' ]
154  if self.input_job.retry_count > 0:
155  steps.insert(0, 'pre_cleanup')
156  if config['execute_writes']:
157  steps.append('write_output')
158  steps.append('post_healthcheck')
159  self.__print_debug("steps to run:", steps)
160  self.__send_response('OK')
161 
162  # The actual life-cycle
163  died_somewhere = False
164  try:
165  for s in steps:
166  self.__run_method_if_exists(s)
167  except CompleteEarlyException as e:
168  self.warning(e.args[0] if len(e.args) else repr(e), False)
169  except LostHiveConnectionException as e:
170  # Mothing we can do, let's just exit
171  raise
172  except:
173  died_somewhere = True
174  self.warning( self.__traceback(2), True)
175 
176  try:
177  self.__run_method_if_exists('post_cleanup')
178  except LostHiveConnectionException as e:
179  # Mothing we can do, let's just exit
180  raise
181  except:
182  died_somewhere = True
183  self.warning( self.__traceback(2), True)
184 
185  job_end_structure = {'complete' : not died_somewhere, 'job': {}, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}}
186  for x in [ 'autoflow', 'lethal_for_worker', 'transient_error' ]:
187  job_end_structure['job'][x] = getattr(self.input_job, x)
188  self.__send_message_and_wait_for_OK('JOB_END', job_end_structure)
189 
190  def __run_method_if_exists(self, method):
191  """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
192  We only the call the method if it exists to save a trip to the database."""
193  if hasattr(self, method):
194  self.__send_message_and_wait_for_OK('JOB_STATUS_UPDATE', method)
195  getattr(self, method)()
196 
197  def __traceback(self, skipped_traces):
198  """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
199  (etype, value, tb) = sys.exc_info()
200  s1 = traceback.format_exception_only(etype, value)
201  l = traceback.extract_tb(tb)[skipped_traces:]
202  s2 = traceback.format_list(l)
203  return "".join(s1+s2)
204 
206  # Public BaseRunnable interface
207 
208 
209  def warning(self, message, is_error = False):
210  """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
211  self.__send_message_and_wait_for_OK('WARNING', {'message': message, 'is_error': is_error})
213  def dataflow(self, output_ids, branch_name_or_code = 1):
214  """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
215  if branch_name_or_code == 1:
216  self.input_job.autoflow = False
217  self.__send_message('DATAFLOW', {'output_ids': output_ids, 'branch_name_or_code': branch_name_or_code, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}})
218  return self.__read_message()['response']
219 
220  def worker_temp_directory(self):
221  """Returns the full path of the temporary directory created by the worker.
222  Runnables can implement "worker_temp_directory_name()" to return the name
223  they would like to use
224  """
226  template_name = self.worker_temp_directory_name() if hasattr(self, 'worker_temp_directory_name') else None
227  self.__send_message('WORKER_TEMP_DIRECTORY', template_name)
228  self.__created_worker_temp_directory = self.__read_message()['response']
231  # Param interface
232 
233 
234  def param_defaults(self):
235  """Returns the defaults parameters for this runnable"""
236  return {}
237 
238  def param_required(self, param_name):
239  """Returns the value of the parameter "param_name" or raises an exception
240  if anything wrong happens or the value is None. The exception is
241  marked as non-transient."""
242  t = self.input_job.transient_error
243  self.input_job.transient_error = False
244  v = self.__params.get_param(param_name)
245  if v is None:
246  raise eHive.Params.NullParamException(param_name)
247  self.input_job.transient_error = t
248  return v
249 
250  def param(self, param_name, *args):
251  """When called as a setter: sets the value of the parameter "param_name".
252  When called as a getter: returns the value of the parameter "param_name".
253  It does not raise an exception if the parameter (or another one in the
254  substitution stack) is undefined"""
255  # As a setter
256  if len(args):
257  return self.__params.set_param(param_name, args[0])
258 
259  # As a getter
260  try:
261  return self.__params.get_param(param_name)
262  except KeyError as e:
263  warnings.warn("parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e), eHive.Params.ParamWarning, 2)
264  return None
265 
266  def param_exists(self, param_name):
267  """Returns True if the parameter exists and can be successfully
268  substituted, None if the substitution fails, False if it is missing"""
269  if not self.__params.has_param(param_name):
270  return False
271  try:
272  self.__params.get_param(param_name)
273  return True
274  except KeyError:
275  return None
276 
277  def param_is_defined(self, param_name):
278  """Returns True if the parameter exists and can be successfully
279  substituted to a defined value, None if the substitution fails,
280  False if it is missing or evaluates as None"""
281  e = self.param_exists(param_name)
282  if not e:
283  # False or None
284  return e
285  try:
286  return self.__params.get_param(param_name) is not None
287  except KeyError:
288  return False
290 class RunnableTest(unittest.TestCase):
291  def test_job_param(self):
292  class FakeRunnableWithParams(BaseRunnable):
293  def __init__(self, d):
294  self._BaseRunnable__params = eHive.Params.ParamContainer(d)
295  self.input_job = Job()
296  self.input_job.transient_error = True
297  j = FakeRunnableWithParams({
298  'a': 3,
299  'b': None,
300  'c': '#other#',
301  'e': '#e#'
302  })
303 
304  # param_exists
305  self.assertIs( j.param_exists('a'), True, '"a" exists' )
306  self.assertIs( j.param_exists('b'), True, '"b" exists' )
307  self.assertIs( j.param_exists('c'), None, '"c"\'s existence is unclear' )
308  self.assertIs( j.param_exists('d'), False, '"d" doesn\'t exist' )
309  with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
310  j.param_exists('e')
311 
312  # param_is_defined
313  self.assertIs( j.param_is_defined('a'), True, '"a" is defined' )
314  self.assertIs( j.param_is_defined('b'), False, '"b" is not defined' )
315  self.assertIs( j.param_is_defined('c'), None, '"c"\'s defined-ness is unclear' )
316  self.assertIs( j.param_is_defined('d'), False, '"d" is not defined (it doesn\'t exist)' )
318  j.param_is_defined('e')
319 
320  # param
321  self.assertIs( j.param('a'), 3, '"a" is 3' )
322  self.assertIs( j.param('b'), None, '"b" is None' )
323  with self.assertWarns(eHive.Params.ParamWarning):
324  self.assertIs( j.param('c'), None, '"c"\'s value is unclear' )
325  with self.assertWarns(eHive.Params.ParamWarning):
326  self.assertIs( j.param('d'), None, '"d" is not defined (it doesn\'t exist)' )
327  with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
328  j.param('e')
329 
330  # param_required
331  self.assertIs( j.param_required('a'), 3, '"a" is 3' )
332  with self.assertRaises(eHive.Params.NullParamException):
333  j.param_required('b')
334  with self.assertRaises(KeyError):
335  j.param_required('c')
336  with self.assertRaises(KeyError):
337  j.param_required('d')
338  with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
339  j.param_required('e')
340 
def __read_message(self)
Read a message from the parent and parse it.
Definition: Process.py:111
def param_exists(self, param_name)
Returns True if the parameter exists and can be successfully substituted, None if the substitution fa...
Definition: Process.py:289
def __print_debug(self, args)
Definition: Process.py:79
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
Definition: Process.py:260
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
Definition: Process.py:87
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
Definition: Process.py:205
def param_defaults(self)
Returns the defaults parameters for this runnable.
Definition: Process.py:253
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK'.
Definition: Process.py:125
Raised when the process has lost the communication pipe with the Perl side.
Definition: Process.py:53
Equivalent of eHive's Param module.
Definition: Params.py:64
This is the counterpart of GuestProcess.
Definition: Process.py:66
Raised when we could not parse the JSON message coming from GuestProcess.
Definition: Process.py:49
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Definition: Process.py:225
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination...
Definition: Process.py:45
def __process_life_cycle(self)
Simple loop: wait for job parameters, do the job's life-cycle.
Definition: Process.py:133
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Definition: Process.py:241
Raised when a parameter cannot be required because it is null (None)
Definition: Params.py:57
def param(self, param_name, args)
When called as a setter: sets the value of the parameter "param_name".
Definition: Process.py:274
def __traceback(self, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
Definition: Process.py:212
def __job_life_cycle(self, config)
Job's life-cycle.
Definition: Process.py:147
def __init__(self, read_fileno, write_fileno, debug)
Definition: Process.py:71
Used by Process.BaseRunnable.
Definition: Params.py:32
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
Definition: Process.py:230
def param_is_defined(self, param_name)
Returns True if the parameter exists and can be successfully substituted to a defined value...
Definition: Process.py:302
Dummy class to hold job-related information.
Definition: Process.py:36
Raised when parameters depend on each other, forming a loop.
Definition: Params.py:52
def __send_response(self, response)
Sends a response message to the parent process.
Definition: Process.py:101