|
ensembl-hive-python3
2.8.1
|
Go to the documentation of this file.
18 This module mainly implements python's counterpart of GuestProcess. Read
19 the later for more information about the JSON protocol used to communicate.
35 """Dummy class to hold job-related information"""
38 class CompleteEarlyException(Exception):
39 """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
42 """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
45 """Raised when we could not parse the JSON message coming from GuestProcess"""
48 """Raised when the process has lost the communication pipe with the Perl side"""
53 """This is the counterpart of GuestProcess. Note that most of the methods
54 are private to be hidden in the derived classes.
56 This class can be used as a base-class for people to redefine fetch_input(),
57 run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
58 Jobs are supposed to raise CompleteEarlyException in case they complete before
59 reaching. They can also raise JobFailedException to indicate a general failure
65 def __init__(self, read_fileno, write_fileno, debug):
67 self.
__read_pipe = os.fdopen(read_fileno, mode=
'rb', buffering=0)
68 self.
__write_pipe = os.fdopen(write_fileno, mode=
'wb', buffering=0)
75 print(
"PYTHON {0}".format(self.
__pid), *args, file=sys.stderr)
80 """seralizes the message in JSON and send it to the parent process"""
81 def default_json_encoder(o):
82 self.
__print_debug(
"Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
83 return 'UNSERIALIZABLE OBJECT'
84 j = json.dumps({
'event': event,
'content': content}, indent=
None, default=default_json_encoder)
89 except BrokenPipeError:
93 """Sends a response message to the parent process"""
97 self.
__write_pipe.write(bytes(
'{"response": "' + str(response) +
'"}\n',
'utf-8'))
98 except BrokenPipeError:
102 """Read a message from the parent and parse it"""
107 return json.loads(l.decode())
108 except BrokenPipeError:
110 except ValueError
as e:
112 raise HiveJSONMessageException
from e
115 """Send a message and expects a response to be 'OK'"""
118 if response[
'response'] !=
'OK':
122 """Simple loop: wait for job parameters, do the job's life-cycle"""
129 if 'input_job' not in config:
130 self.
__print_debug(
"no params, this is the end of the wrapper")
135 """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
143 for x
in [
'dbID',
'input_id',
'retry_count']:
144 setattr(self.
input_job, x, config[
'input_job'][x])
150 self.
debug = config[
'debug']
153 steps = [
'fetch_input',
'run' ]
155 steps.insert(0,
'pre_cleanup')
156 if config[
'execute_writes']:
157 steps.append(
'write_output')
158 steps.append(
'post_healthcheck')
163 died_somewhere =
False
167 except CompleteEarlyException
as e:
168 self.
warning(e.args[0]
if len(e.args)
else repr(e),
False)
169 except LostHiveConnectionException
as e:
172 except Exception
as e:
173 died_somewhere =
True
178 except LostHiveConnectionException
as e:
181 except Exception
as e:
182 died_somewhere =
True
185 job_end_structure = {
'complete' :
not died_somewhere,
'job': {},
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}}
186 for x
in [
'autoflow',
'lethal_for_worker',
'transient_error' ]:
187 job_end_structure[
'job'][x] = getattr(self.
input_job, x)
191 """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
192 We only the call the method if it exists to save a trip to the database."""
193 if hasattr(self, method):
195 getattr(self, method)()
198 """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
199 s1 = traceback.format_exception_only(type(exception), exception)
200 l = traceback.extract_tb(exception.__traceback__)[skipped_traces:]
201 s2 = traceback.format_list(l)
202 return "".join(s1+s2)
208 def warning(self, message, is_error = False):
209 """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
212 def dataflow(self, output_ids, branch_name_or_code = 1):
213 """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
214 if branch_name_or_code == 1:
216 self.
__send_message(
'DATAFLOW', {
'output_ids': output_ids,
'branch_name_or_code': branch_name_or_code,
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}})
220 """Returns the full path of the temporary directory created by the worker.
231 """Returns the defaults parameters for this runnable"""
235 """Returns the value of the parameter "param_name" or raises an exception
236 if anything wrong happens or the value is None. The exception is
237 marked as non-transient."""
240 v = self.
__params.get_param(param_name)
246 def param(self, param_name, *args):
247 """When called as a setter: sets the value of the parameter "param_name".
248 When called as a getter: returns the value of the parameter "param_name".
249 It does not raise an exception if the parameter (or another one in the
250 substitution stack) is undefined"""
253 return self.
__params.set_param(param_name, args[0])
257 return self.
__params.get_param(param_name)
258 except KeyError
as e:
259 warnings.warn(
"parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e),
params.ParamWarning, 2)
263 """Returns True if the parameter exists and can be successfully
264 substituted, None if the substitution fails, False if it is missing"""
265 if not self.
__params.has_param(param_name):
274 """Returns True if the parameter exists and can be successfully
275 substituted to a defined value, None if the substitution fails,
276 False if it is missing or evaluates as None"""
282 return self.
__params.get_param(param_name)
is not None
287 def test_job_param(self):
293 j = FakeRunnableWithParams({
301 self.assertIs( j.param_exists(
'a'),
True,
'"a" exists' )
302 self.assertIs( j.param_exists(
'b'),
True,
'"b" exists' )
303 self.assertIs( j.param_exists(
'c'),
None,
'"c"\'s existence is unclear' )
304 self.assertIs( j.param_exists(
'd'),
False,
'"d" doesn\'t exist' )
309 self.assertIs( j.param_is_defined(
'a'),
True,
'"a" is defined' )
310 self.assertIs( j.param_is_defined(
'b'),
False,
'"b" is not defined' )
311 self.assertIs( j.param_is_defined(
'c'),
None,
'"c"\'s defined-ness is unclear' )
312 self.assertIs( j.param_is_defined(
'd'),
False,
'"d" is not defined (it doesn\'t exist)' )
314 j.param_is_defined(
'e')
317 self.assertIs( j.param(
'a'), 3,
'"a" is 3' )
318 self.assertIs( j.param(
'b'),
None,
'"b" is None' )
320 self.assertIs( j.param(
'c'),
None,
'"c"\'s value is unclear' )
322 self.assertIs( j.param(
'd'),
None,
'"d" is not defined (it doesn\'t exist)' )
327 self.assertIs( j.param_required(
'a'), 3,
'"a" is 3' )
329 j.param_required(
'b')
330 with self.assertRaises(KeyError):
331 j.param_required(
'c')
332 with self.assertRaises(KeyError):
333 j.param_required(
'd')
335 j.param_required(
'e')
This is the counterpart of GuestProcess.
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Raised when we could not parse the JSON message coming from GuestProcess.
def param_defaults(self)
Returns the defaults parameters for this runnable.
def param_exists(self, param_name)
Returns True if the parameter exists and can be successfully substituted, None if the substitution fa...
def __print_debug(self, *args)
Can be raised by a derived class of BaseRunnable to indicate an early successful termination.
Raised when the process has lost the communication pipe with the Perl side.
Equivalent of eHive's Param module.
def __process_life_cycle(self)
Simple wait for job parameters, do the job's life-cycle.
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
def __traceback(self, exception, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
def __init__(self, read_fileno, write_fileno, debug)
__created_worker_temp_directory
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination.
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Dummy class to hold job-related information.
def __read_message(self)
Read a message from the parent and parse it.
def __job_life_cycle(self, config)
Job's life-cycle.
def __send_response(self, response)
Sends a response message to the parent process.
Raised when parameters depend on each other, forming a loop.
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
Used by process.BaseRunnable.
Raised when a parameter cannot be required because it is null (None)
def param(self, param_name, *args)
When called as a setter: sets the value of the parameter "param_name".
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK".
def param_is_defined(self, param_name)
Returns True if the parameter exists and can be successfully substituted to a defined value,...