ensembl-hive-python3  2.8.1
process.py
Go to the documentation of this file.
1 
2 # See the NOTICE file distributed with this work for additional information
3 # regarding copyright ownership.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 """
18 This module mainly implements python's counterpart of GuestProcess. Read
19 the later for more information about the JSON protocol used to communicate.
20 """
21 
22 import json
23 import os
24 import sys
25 import traceback
26 import unittest
27 import warnings
28 
29 from . import params
30 
31 __version__ = "5.0"
32 
33 
34 class Job:
35  """Dummy class to hold job-related information"""
36  pass
37 
38 class CompleteEarlyException(Exception):
39  """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
40  pass
41 class JobFailedException(Exception):
42  """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
43  pass
44 class HiveJSONMessageException(Exception):
45  """Raised when we could not parse the JSON message coming from GuestProcess"""
46  pass
47 class LostHiveConnectionException(Exception):
48  """Raised when the process has lost the communication pipe with the Perl side"""
49  pass
50 
51 
52 class BaseRunnable:
53  """This is the counterpart of GuestProcess. Note that most of the methods
54  are private to be hidden in the derived classes.
55 
56  This class can be used as a base-class for people to redefine fetch_input(),
57  run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
58  Jobs are supposed to raise CompleteEarlyException in case they complete before
59  reaching. They can also raise JobFailedException to indicate a general failure
60  """
61 
62  # Private BaseRunnable interface
63 
64 
65  def __init__(self, read_fileno, write_fileno, debug):
66  # We need the binary mode to disable the buffering
67  self.__read_pipe = os.fdopen(read_fileno, mode='rb', buffering=0)
68  self.__write_pipe = os.fdopen(write_fileno, mode='wb', buffering=0)
69  self.__pid = os.getpid()
70  self.debug = debug
72 
73  def __print_debug(self, *args):
74  if self.debug > 1:
75  print("PYTHON {0}".format(self.__pid), *args, file=sys.stderr)
76 
77  # FIXME: we can probably merge __send_message and __send_response
78 
79  def __send_message(self, event, content):
80  """seralizes the message in JSON and send it to the parent process"""
81  def default_json_encoder(o):
82  self.__print_debug("Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
83  return 'UNSERIALIZABLE OBJECT'
84  j = json.dumps({'event': event, 'content': content}, indent=None, default=default_json_encoder)
85  self.__print_debug('__send_message:', j)
86  # UTF8 encoding has never been tested. Just hope it works :)
87  try:
88  self.__write_pipe.write(bytes(j+"\n", 'utf-8'))
89  except BrokenPipeError:
90  raise LostHiveConnectionException("__write_pipe") from None
91 
92  def __send_response(self, response):
93  """Sends a response message to the parent process"""
94  self.__print_debug('__send_response:', response)
95  # Like above, UTF8 encoding has never been tested. Just hope it works :)
96  try:
97  self.__write_pipe.write(bytes('{"response": "' + str(response) + '"}\n', 'utf-8'))
98  except BrokenPipeError:
99  raise LostHiveConnectionException("__write_pipe") from None
100 
101  def __read_message(self):
102  """Read a message from the parent and parse it"""
103  try:
104  self.__print_debug("__read_message ...")
105  l = self.__read_pipe.readline()
106  self.__print_debug(" ... -> ", l[:-1].decode())
107  return json.loads(l.decode())
108  except BrokenPipeError:
109  raise LostHiveConnectionException("__read_pipe") from None
110  except ValueError as e:
111  # HiveJSONMessageException is a more meaningful name than ValueError
112  raise HiveJSONMessageException from e
113 
114  def __send_message_and_wait_for_OK(self, event, content):
115  """Send a message and expects a response to be 'OK'"""
116  self.__send_message(event, content)
117  response = self.__read_message()
118  if response['response'] != 'OK':
119  raise HiveJSONMessageException("Received '{0}' instead of OK".format(response))
120 
121  def __process_life_cycle(self):
122  """Simple loop: wait for job parameters, do the job's life-cycle"""
123  self.__send_message_and_wait_for_OK('VERSION', __version__)
124  self.__send_message_and_wait_for_OK('PARAM_DEFAULTS', self.param_defaults())
126  while True:
127  self.__print_debug("waiting for instructions")
128  config = self.__read_message()
129  if 'input_job' not in config:
130  self.__print_debug("no params, this is the end of the wrapper")
131  return
132  self.__job_life_cycle(config)
133 
134  def __job_life_cycle(self, config):
135  """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
136  self.__print_debug("__life_cycle")
137 
138  # Parameters
139  self.__params = params.ParamContainer(config['input_job']['parameters'], self.debug > 1)
140 
141  # Job attributes
142  self.input_job = Job()
143  for x in ['dbID', 'input_id', 'retry_count']:
144  setattr(self.input_job, x, config['input_job'][x])
145  self.input_job.autoflow = True
146  self.input_job.lethal_for_worker = False
147  self.input_job.transient_error = True
148 
149  # Worker attributes
150  self.debug = config['debug']
151 
152  # Which methods should be run
153  steps = [ 'fetch_input', 'run' ]
154  if self.input_job.retry_count > 0:
155  steps.insert(0, 'pre_cleanup')
156  if config['execute_writes']:
157  steps.append('write_output')
158  steps.append('post_healthcheck')
159  self.__print_debug("steps to run:", steps)
160  self.__send_response('OK')
161 
162  # The actual life-cycle
163  died_somewhere = False
164  try:
165  for s in steps:
166  self.__run_method_if_exists(s)
167  except CompleteEarlyException as e:
168  self.warning(e.args[0] if len(e.args) else repr(e), False)
169  except LostHiveConnectionException as e:
170  # Mothing we can do, let's just exit
171  raise
172  except Exception as e:
173  died_somewhere = True
174  self.warning( self.__traceback(e, 2), True)
175 
176  try:
177  self.__run_method_if_exists('post_cleanup')
178  except LostHiveConnectionException as e:
179  # Mothing we can do, let's just exit
180  raise
181  except Exception as e:
182  died_somewhere = True
183  self.warning( self.__traceback(e, 2), True)
184 
185  job_end_structure = {'complete' : not died_somewhere, 'job': {}, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}}
186  for x in [ 'autoflow', 'lethal_for_worker', 'transient_error' ]:
187  job_end_structure['job'][x] = getattr(self.input_job, x)
188  self.__send_message_and_wait_for_OK('JOB_END', job_end_structure)
189 
190  def __run_method_if_exists(self, method):
191  """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
192  We only the call the method if it exists to save a trip to the database."""
193  if hasattr(self, method):
194  self.__send_message_and_wait_for_OK('JOB_STATUS_UPDATE', method)
195  getattr(self, method)()
196 
197  def __traceback(self, exception, skipped_traces):
198  """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
199  s1 = traceback.format_exception_only(type(exception), exception)
200  l = traceback.extract_tb(exception.__traceback__)[skipped_traces:]
201  s2 = traceback.format_list(l)
202  return "".join(s1+s2)
203 
204 
205  # Public BaseRunnable interface
206 
207 
208  def warning(self, message, is_error = False):
209  """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
210  self.__send_message_and_wait_for_OK('WARNING', {'message': message, 'is_error': is_error})
211 
212  def dataflow(self, output_ids, branch_name_or_code = 1):
213  """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
214  if branch_name_or_code == 1:
215  self.input_job.autoflow = False
216  self.__send_message('DATAFLOW', {'output_ids': output_ids, 'branch_name_or_code': branch_name_or_code, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}})
217  return self.__read_message()['response']
218 
219  def worker_temp_directory(self):
220  """Returns the full path of the temporary directory created by the worker.
221  """
222  if self.__created_worker_temp_directory is None:
223  self.__send_message('WORKER_TEMP_DIRECTORY', None)
224  self.__created_worker_temp_directory = self.__read_message()['response']
226 
227  # Param interface
228 
229 
230  def param_defaults(self):
231  """Returns the defaults parameters for this runnable"""
232  return {}
233 
234  def param_required(self, param_name):
235  """Returns the value of the parameter "param_name" or raises an exception
236  if anything wrong happens or the value is None. The exception is
237  marked as non-transient."""
238  t = self.input_job.transient_error
239  self.input_job.transient_error = False
240  v = self.__params.get_param(param_name)
241  if v is None:
242  raise params.NullParamException(param_name)
243  self.input_job.transient_error = t
244  return v
245 
246  def param(self, param_name, *args):
247  """When called as a setter: sets the value of the parameter "param_name".
248  When called as a getter: returns the value of the parameter "param_name".
249  It does not raise an exception if the parameter (or another one in the
250  substitution stack) is undefined"""
251  # As a setter
252  if len(args):
253  return self.__params.set_param(param_name, args[0])
254 
255  # As a getter
256  try:
257  return self.__params.get_param(param_name)
258  except KeyError as e:
259  warnings.warn("parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e), params.ParamWarning, 2)
260  return None
261 
262  def param_exists(self, param_name):
263  """Returns True if the parameter exists and can be successfully
264  substituted, None if the substitution fails, False if it is missing"""
265  if not self.__params.has_param(param_name):
266  return False
267  try:
268  self.__params.get_param(param_name)
269  return True
270  except KeyError:
271  return None
272 
273  def param_is_defined(self, param_name):
274  """Returns True if the parameter exists and can be successfully
275  substituted to a defined value, None if the substitution fails,
276  False if it is missing or evaluates as None"""
277  e = self.param_exists(param_name)
278  if not e:
279  # False or None
280  return e
281  try:
282  return self.__params.get_param(param_name) is not None
283  except KeyError:
284  return False
285 
286 class BaseRunnableTestCase(unittest.TestCase):
287  def test_job_param(self):
288  class FakeRunnableWithParams(BaseRunnable):
289  def __init__(self, d):
290  self._BaseRunnable__params = params.ParamContainer(d)
291  self.input_job = Job()
292  self.input_job.transient_error = True
293  j = FakeRunnableWithParams({
294  'a': 3,
295  'b': None,
296  'c': '#other#',
297  'e': '#e#'
298  })
299 
300  # param_exists
301  self.assertIs( j.param_exists('a'), True, '"a" exists' )
302  self.assertIs( j.param_exists('b'), True, '"b" exists' )
303  self.assertIs( j.param_exists('c'), None, '"c"\'s existence is unclear' )
304  self.assertIs( j.param_exists('d'), False, '"d" doesn\'t exist' )
305  with self.assertRaises(params.ParamInfiniteLoopException):
306  j.param_exists('e')
307 
308  # param_is_defined
309  self.assertIs( j.param_is_defined('a'), True, '"a" is defined' )
310  self.assertIs( j.param_is_defined('b'), False, '"b" is not defined' )
311  self.assertIs( j.param_is_defined('c'), None, '"c"\'s defined-ness is unclear' )
312  self.assertIs( j.param_is_defined('d'), False, '"d" is not defined (it doesn\'t exist)' )
313  with self.assertRaises(params.ParamInfiniteLoopException):
314  j.param_is_defined('e')
315 
316  # param
317  self.assertIs( j.param('a'), 3, '"a" is 3' )
318  self.assertIs( j.param('b'), None, '"b" is None' )
319  with self.assertWarns(params.ParamWarning):
320  self.assertIs( j.param('c'), None, '"c"\'s value is unclear' )
321  with self.assertWarns(params.ParamWarning):
322  self.assertIs( j.param('d'), None, '"d" is not defined (it doesn\'t exist)' )
323  with self.assertRaises(params.ParamInfiniteLoopException):
324  j.param('e')
325 
326  # param_required
327  self.assertIs( j.param_required('a'), 3, '"a" is 3' )
328  with self.assertRaises(params.NullParamException):
329  j.param_required('b')
330  with self.assertRaises(KeyError):
331  j.param_required('c')
332  with self.assertRaises(KeyError):
333  j.param_required('d')
334  with self.assertRaises(params.ParamInfiniteLoopException):
335  j.param_required('e')
336 
eHive.process.BaseRunnable
This is the counterpart of GuestProcess.
Definition: process.py:60
eHive.process.BaseRunnable.dataflow
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
Definition: process.py:229
eHive.process.BaseRunnableTestCase
Definition: process.py:302
eHive.process.BaseRunnableTestCase.test_job_param
def test_job_param(self)
Definition: process.py:303
eHive.process.BaseRunnable.worker_temp_directory
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Definition: process.py:237
eHive.process.BaseRunnable.input_job
input_job
Definition: process.py:154
eHive.process.HiveJSONMessageException
Raised when we could not parse the JSON message coming from GuestProcess.
Definition: process.py:45
eHive.process.BaseRunnable.param_defaults
def param_defaults(self)
Returns the defaults parameters for this runnable.
Definition: process.py:247
eHive.process.BaseRunnable.param_exists
def param_exists(self, param_name)
Returns True if the parameter exists and can be successfully substituted, None if the substitution fa...
Definition: process.py:280
eHive.process.BaseRunnable.__print_debug
def __print_debug(self, *args)
Definition: process.py:73
eHive.process.CompleteEarlyException
Can be raised by a derived class of BaseRunnable to indicate an early successful termination.
Definition: process.py:39
eHive.process.LostHiveConnectionException
Raised when the process has lost the communication pipe with the Perl side.
Definition: process.py:48
eHive.params.ParamContainer
Equivalent of eHive's Param module.
Definition: params.py:57
eHive.process.BaseRunnable.__process_life_cycle
def __process_life_cycle(self)
Simple wait for job parameters, do the job's life-cycle.
Definition: process.py:132
eHive.process.BaseRunnable.param_required
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
Definition: process.py:253
eHive.process.BaseRunnable.__traceback
def __traceback(self, exception, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
Definition: process.py:214
eHive.process.BaseRunnable.__init__
def __init__(self, read_fileno, write_fileno, debug)
Definition: process.py:65
eHive.process.BaseRunnable.__created_worker_temp_directory
__created_worker_temp_directory
Definition: process.py:135
eHive.process.JobFailedException
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination.
Definition: process.py:42
eHive.process.BaseRunnable.__params
__params
Definition: process.py:151
eHive.process.BaseRunnable.__run_method_if_exists
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
Definition: process.py:206
eHive.process.BaseRunnable.warning
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Definition: process.py:225
eHive.process.Job
Dummy class to hold job-related information.
Definition: process.py:35
eHive.process.BaseRunnable.__read_message
def __read_message(self)
Read a message from the parent and parse it.
Definition: process.py:108
eHive.process.BaseRunnable.__job_life_cycle
def __job_life_cycle(self, config)
Job's life-cycle.
Definition: process.py:147
eHive.process.BaseRunnable.__read_pipe
__read_pipe
Definition: process.py:67
eHive.process.BaseRunnable.__send_response
def __send_response(self, response)
Sends a response message to the parent process.
Definition: process.py:97
eHive.params.ParamInfiniteLoopException
Raised when parameters depend on each other, forming a loop.
Definition: params.py:47
eHive.process.BaseRunnable.__send_message
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
Definition: process.py:82
eHive.params.ParamWarning
Used by process.BaseRunnable.
Definition: params.py:31
eHive.params.NullParamException
Raised when a parameter cannot be required because it is null (None)
Definition: params.py:51
eHive.process.BaseRunnable.param
def param(self, param_name, *args)
When called as a setter: sets the value of the parameter "param_name".
Definition: process.py:266
eHive.process.BaseRunnable.__pid
__pid
Definition: process.py:69
eHive.process.BaseRunnable.__send_message_and_wait_for_OK
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK".
Definition: process.py:123
eHive.process.BaseRunnable.debug
debug
Definition: process.py:70
eHive.process.BaseRunnableTestCase.input_job
input_job
Definition: process.py:307
eHive.process.BaseRunnable.param_is_defined
def param_is_defined(self, param_name)
Returns True if the parameter exists and can be successfully substituted to a defined value,...
Definition: process.py:292
eHive.process.BaseRunnableTestCase._BaseRunnable__params
_BaseRunnable__params
Definition: process.py:306
eHive.process.BaseRunnable.__write_pipe
__write_pipe
Definition: process.py:68