18 All testing functions, e.g. how to test a Runnable
26 from .params
import ParamContainer
27 from .process
import Job, CompleteEarlyException
28 from .utils
import find_module
31 WarningEvent = collections.namedtuple(
'WarningEvent', [
'message',
'is_error'])
32 DataflowEvent = collections.namedtuple(
'DataflowEvent', [
'output_ids',
'branch_name_or_code'])
33 CompleteEarlyEvent = collections.namedtuple(
'CompleteEarlyEvent', [
'message'])
34 FailureEvent = collections.namedtuple(
'FailureEvent', [
'exception',
'args'])
37 def testRunnable(testcase, runnableClass, inputParameters, refEvents, config=None):
38 """Method to test a Runnable
41 testcase: instance of unittest.TestCase, which is used to do the actual tests.
42 runnableClass: Runnable being tested. Can be a string of the actual type.
43 inputParameters: dictionary of input parameters. Will override the Runnable's
44 param_defaults() dictionary.
45 refEvents: list of "events" the Runnable is expected to raise (in the right
46 order). Accepted events are
51 config: extra configuration options, given as a dictionary. Accepted keys are
52 - is_retry: bool or int, default False.
53 whether the job is considered a retry (i.e. whether
54 pre_cleanup should run).
55 - no_write: bool, default False.
56 whether write_output is skipped.
57 - no_cleanup: bool, default False.
58 whether the temporary directory is removed at the end
60 - debug: int, default 0.
62 - test_autoflow: bool, default not set.
63 when set, check that this is the final value of the
64 job's autoflow attribute.
65 - test_lethal_for_worker: bool, default not set.
66 when set, check that this is the final value
67 of the job's lethal_for_worker attribute.
68 - test_transient_error: bool, default not set.
69 when set, check that this is the final value
70 of the job's lethal_for_worker attribute.
74 if isinstance(runnableClass, str):
77 class RunnableTester(runnableClass):
78 """Helper class to provide a test-enabled version of the requested Runnable"""
81 """Entry point of RunnableTester. Run everything in order"""
83 self.__job_life_cycle()
86 def __configure(self):
87 """Initialise all the parameters the Runnable may need"""
90 self.__config = config
or {}
91 self.__refEvents = refEvents.copy()
95 paramsDict.update(runnable.param_defaults())
96 paramsDict.update(inputParameters)
98 self._BaseRunnable__params = params
103 job.input_id = str(inputParameters)
104 job.retry_count = int(self.__config.get(
'is_retry',
False))
106 job.lethal_for_worker =
False
107 job.transient_error =
True
110 self.debug = self.__config.get(
'debug', 0)
112 def __job_life_cycle(self):
113 """Run the job's life cycle. This must match BaseRunnable.__job_life_cycle"""
116 steps = [
'fetch_input',
'run']
117 if self.input_job.retry_count:
118 steps.insert(0,
'pre_cleanup')
119 if not self.__config.get(
'no_write'):
120 steps.append(
'write_output')
121 steps.append(
'post_healthcheck')
124 self.__created_worker_temp_directory =
None
128 self.__run_method_if_exists(s)
129 except CompleteEarlyException
as e:
132 self.__compare_next_event(event)
133 except Exception
as e:
134 self.__handle_exception(e)
137 self.__run_method_if_exists(
'post_cleanup')
138 except Exception
as e:
139 self.__handle_exception(e)
141 if not self.__config.get(
'no_cleanup'):
142 self.__cleanup_worker_temp_directory()
144 def __run_method_if_exists(self, method):
145 """Run the method (one of "fetch_input", "run", "write_output",
146 etc) if defined in the Runnable."""
147 if hasattr(self, method):
148 getattr(self, method)()
150 def __handle_exception(self, e):
151 """Capture and check the Runnable's own exceptions whilst letting
152 the testcase's exceptions pass through"""
153 if any(f
for f
in traceback.extract_tb(e.__traceback__)
if f[2] ==
'__compare_next_event'):
158 self.__compare_next_event(event)
160 def __final_tests(self):
161 """Extra tests once the job has ended"""
162 testcase.assertFalse(self.__refEvents, msg=
'The job has now ended and {} events have not been emitted'.format(len(self.__refEvents)))
165 for attr
in [
'autoflow',
'lethal_for_worker',
'transient_error']:
166 tattr =
"test_" + attr
167 if tattr
in self.__config:
168 testcase.assertEqual(getattr(self.input_job, attr), self.__config[tattr], msg=
'Final value of {}'.format(attr))
173 def worker_temp_directory(self):
174 """Provide a temporary directory for the duration of the test.
175 This functionality was handled by the Perl side (via GuestProcess
176 but has to be reimplemented."""
177 if self.__created_worker_temp_directory
is None:
178 self.__created_worker_temp_directory = tempfile.mkdtemp()
179 return self.__created_worker_temp_directory
181 def __cleanup_worker_temp_directory(self):
182 """Remove the temporary directory created by worker_temp_directory.
183 Again, this was handled by the Perl side but has to be
185 if self.__created_worker_temp_directory:
186 shutil.rmtree(self.__created_worker_temp_directory)
188 def warning(self, message, is_error=False):
189 """Test that the warning event generated is expected"""
191 self.__compare_next_event(event)
193 def dataflow(self, output_ids, branch_name_or_code=1):
194 """Test that the dataflow event generated is expected"""
195 if branch_name_or_code == 1:
196 self.input_job.autoflow =
False
198 self.__compare_next_event(event)
201 def __compare_next_event(self, event):
202 """Helper method for warning and dataflow.
203 Check that the event that has been generated is expected."""
204 testcase.assertTrue(self.__refEvents, msg=
'No more events are expected but {} was raised'.format(event))
205 testcase.assertEqual(event, self.__refEvents.pop(0))
209 runnable = RunnableTester.__new__(RunnableTester)