30 This module mainly implements python's counterpart of GuestProcess. Read 31 the later for more information about the JSON protocol used to communicate. 35 """Dummy class to hold job-related information""" 38 class CompleteEarlyException(Exception):
39 """Can be raised by a derived class of BaseRunnable to indicate an early successful termination""" 42 """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination""" 44 class HiveJSONMessageException(Exception):
45 """Raised when we could not parse the JSON message coming from GuestProcess""" 48 """Raised when the process has lost the communication pipe with the Perl side""" 53 """This is the counterpart of GuestProcess. Note that most of the methods 54 are private to be hidden in the derived classes. 56 This class can be used as a base-class for people to redefine fetch_input(), 57 run() and/or write_output() (and/or pre_cleanup(), post_cleanup()). 58 Jobs are supposed to raise CompleteEarlyException in case they complete before 59 reaching. They can also raise JobFailedException to indicate a general failure 65 def __init__(self, read_fileno, write_fileno, debug):
67 self.
__read_pipe = os.fdopen(read_fileno, mode=
'rb', buffering=0)
68 self.
__write_pipe = os.fdopen(write_fileno, mode=
'wb', buffering=0)
69 self.
__pid = os.getpid()
75 print(
"PYTHON {0}".format(self.
__pid), *args, file=sys.stderr)
80 """seralizes the message in JSON and send it to the parent process""" 81 def default_json_encoder(o):
82 self.
__print_debug(
"Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
83 return 'UNSERIALIZABLE OBJECT' 84 j = json.dumps({
'event': event,
'content': content}, indent=
None, default=default_json_encoder)
89 except BrokenPipeError
as e:
93 """Sends a response message to the parent process""" 97 self.
__write_pipe.write(bytes(
'{"response": "' + str(response) +
'"}\n',
'utf-8'))
98 except BrokenPipeError
as e:
102 """Read a message from the parent and parse it""" 107 return json.loads(l.decode())
108 except BrokenPipeError
as e:
110 except ValueError
as e:
112 raise HiveJSONMessageException
from e
115 """Send a message and expects a response to be 'OK'""" 118 if response[
'response'] !=
'OK':
122 """Simple loop: wait for job parameters, do the job's life-cycle""" 129 if 'input_job' not in config:
130 self.
__print_debug(
"no params, this is the end of the wrapper")
135 """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent""" 143 for x
in [
'dbID',
'input_id',
'retry_count']:
144 setattr(self.
input_job, x, config[
'input_job'][x])
150 self.
debug = config[
'debug']
153 steps = [
'fetch_input',
'run' ]
155 steps.insert(0,
'pre_cleanup')
156 if config[
'execute_writes']:
157 steps.append(
'write_output')
158 steps.append(
'post_healthcheck')
163 died_somewhere =
False 167 except CompleteEarlyException
as e:
168 self.
warning(e.args[0]
if len(e.args)
else repr(e),
False)
169 except LostHiveConnectionException
as e:
173 died_somewhere =
True 178 except LostHiveConnectionException
as e:
182 died_somewhere =
True 185 job_end_structure = {
'complete' :
not died_somewhere,
'job': {},
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}}
186 for x
in [
'autoflow',
'lethal_for_worker',
'transient_error' ]:
187 job_end_structure[
'job'][x] = getattr(self.
input_job, x)
191 """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup". 192 We only the call the method if it exists to save a trip to the database.""" 193 if hasattr(self, method):
195 getattr(self, method)()
198 """Remove "skipped_traces" lines from the stack trace (the eHive part)""" 199 (etype, value, tb) = sys.exc_info()
200 s1 = traceback.format_exception_only(etype, value)
201 l = traceback.extract_tb(tb)[skipped_traces:]
202 s2 = traceback.format_list(l)
203 return "".join(s1+s2)
209 def warning(self, message, is_error = False):
210 """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not""" 213 def dataflow(self, output_ids, branch_name_or_code = 1):
214 """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns""" 215 if branch_name_or_code == 1:
217 self.
__send_message(
'DATAFLOW', {
'output_ids': output_ids,
'branch_name_or_code': branch_name_or_code,
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}})
221 """Returns the full path of the temporary directory created by the worker. 222 Runnables can implement "worker_temp_directory_name()" to return the name 223 they would like to use 226 template_name = self.worker_temp_directory_name()
if hasattr(self,
'worker_temp_directory_name')
else None 235 """Returns the defaults parameters for this runnable""" 239 """Returns the value of the parameter "param_name" or raises an exception 240 if anything wrong happens or the value is None. The exception is 241 marked as non-transient.""" 244 v = self.
__params.get_param(param_name)
250 def param(self, param_name, *args):
251 """When called as a setter: sets the value of the parameter "param_name". 252 When called as a getter: returns the value of the parameter "param_name". 253 It does not raise an exception if the parameter (or another one in the 254 substitution stack) is undefined""" 257 return self.
__params.set_param(param_name, args[0])
261 return self.
__params.get_param(param_name)
262 except KeyError
as e:
263 warnings.warn(
"parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e),
eHive.Params.ParamWarning, 2)
267 """Returns True if the parameter exists and can be successfully 268 substituted, None if the substitution fails, False if it is missing""" 269 if not self.
__params.has_param(param_name):
278 """Returns True if the parameter exists and can be successfully 279 substituted to a defined value, None if the substitution fails, 280 False if it is missing or evaluates as None""" 286 return self.
__params.get_param(param_name)
is not None 291 def test_job_param(self):
297 j = FakeRunnableWithParams({
305 self.assertIs( j.param_exists(
'a'),
True,
'"a" exists' )
306 self.assertIs( j.param_exists(
'b'),
True,
'"b" exists' )
307 self.assertIs( j.param_exists(
'c'),
None,
'"c"\'s existence is unclear' )
308 self.assertIs( j.param_exists(
'd'),
False,
'"d" doesn\'t exist' )
313 self.assertIs( j.param_is_defined(
'a'),
True,
'"a" is defined' )
314 self.assertIs( j.param_is_defined(
'b'),
False,
'"b" is not defined' )
315 self.assertIs( j.param_is_defined(
'c'),
None,
'"c"\'s defined-ness is unclear' )
316 self.assertIs( j.param_is_defined(
'd'),
False,
'"d" is not defined (it doesn\'t exist)' )
318 j.param_is_defined(
'e')
321 self.assertIs( j.param(
'a'), 3,
'"a" is 3' )
322 self.assertIs( j.param(
'b'),
None,
'"b" is None' )
324 self.assertIs( j.param(
'c'),
None,
'"c"\'s value is unclear' )
326 self.assertIs( j.param(
'd'),
None,
'"d" is not defined (it doesn\'t exist)' )
331 self.assertIs( j.param_required(
'a'), 3,
'"a" is 3' )
333 j.param_required(
'b')
334 with self.assertRaises(KeyError):
335 j.param_required(
'c')
336 with self.assertRaises(KeyError):
337 j.param_required(
'd')
339 j.param_required(
'e')
def __read_message(self)
Read a message from the parent and parse it.
def param_exists(self, param_name)
Returns True if the parameter exists and can be successfully substituted, None if the substitution fa...
__created_worker_temp_directory
def __print_debug(self, args)
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
def param_defaults(self)
Returns the defaults parameters for this runnable.
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK'.
Raised when the process has lost the communication pipe with the Perl side.
Equivalent of eHive's Param module.
This is the counterpart of GuestProcess.
Raised when we could not parse the JSON message coming from GuestProcess.
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination...
def __process_life_cycle(self)
Simple loop: wait for job parameters, do the job's life-cycle.
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Raised when a parameter cannot be required because it is null (None)
def param(self, param_name, args)
When called as a setter: sets the value of the parameter "param_name".
def __traceback(self, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
def __job_life_cycle(self, config)
Job's life-cycle.
def __init__(self, read_fileno, write_fileno, debug)
Used by Process.BaseRunnable.
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
def param_is_defined(self, param_name)
Returns True if the parameter exists and can be successfully substituted to a defined value...
Dummy class to hold job-related information.
Raised when parameters depend on each other, forming a loop.
def __send_response(self, response)
Sends a response message to the parent process.