ensembl-hive-python3  2.7.0
tests.py
Go to the documentation of this file.
1 
2 # See the NOTICE file distributed with this work for additional information
3 # regarding copyright ownership.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 """
18 All testing functions, e.g. how to test a Runnable
19 """
20 
21 import collections
22 import tempfile
23 import shutil
24 import traceback
25 
26 from .params import ParamContainer
27 from .process import Job, CompleteEarlyException
28 from .utils import find_module
29 
30 # The events that can be emitted during the execution of a job
31 WarningEvent = collections.namedtuple('WarningEvent', ['message', 'is_error'])
32 DataflowEvent = collections.namedtuple('DataflowEvent', ['output_ids', 'branch_name_or_code'])
33 CompleteEarlyEvent = collections.namedtuple('CompleteEarlyEvent', ['message'])
34 FailureEvent = collections.namedtuple('FailureEvent', ['exception', 'args'])
35 
36 
37 def testRunnable(testcase, runnableClass, inputParameters, refEvents, config=None):
38  """Method to test a Runnable
39 
40  Args:
41  testcase: instance of unittest.TestCase, which is used to do the actual tests.
42  runnableClass: Runnable being tested. Can be a string of the actual type.
43  inputParameters: dictionary of input parameters. Will override the Runnable's
44  param_defaults() dictionary.
45  refEvents: list of "events" the Runnable is expected to raise (in the right
46  order). Accepted events are
47  - WarningEvent.
48  - DataflowEvent.
49  - CompleteEarlyEvent.
50  - FailureEvent.
51  config: extra configuration options, given as a dictionary. Accepted keys are
52  - is_retry: bool or int, default False.
53  whether the job is considered a retry (i.e. whether
54  pre_cleanup should run).
55  - no_write: bool, default False.
56  whether write_output is skipped.
57  - no_cleanup: bool, default False.
58  whether the temporary directory is removed at the end
59  of the run.
60  - debug: int, default 0.
61  the debug level.
62  - test_autoflow: bool, default not set.
63  when set, check that this is the final value of the
64  job's autoflow attribute.
65  - test_lethal_for_worker: bool, default not set.
66  when set, check that this is the final value
67  of the job's lethal_for_worker attribute.
68  - test_transient_error: bool, default not set.
69  when set, check that this is the final value
70  of the job's lethal_for_worker attribute.
71  """
72 
73  # Find the actual class (type)
74  if isinstance(runnableClass, str):
75  runnableClass = find_module(runnableClass)
76 
77  class RunnableTester(runnableClass):
78  """Helper class to provide a test-enabled version of the requested Runnable"""
79 
80  def runTests(self):
81  """Entry point of RunnableTester. Run everything in order"""
82  self.__configure()
83  self.__job_life_cycle()
84  self.__final_tests()
85 
86  def __configure(self):
87  """Initialise all the parameters the Runnable may need"""
88 
89  # Copy all the input parameters in the class instance itself
90  self.__config = config or {}
91  self.__refEvents = refEvents.copy() # Don't modify the original list
92 
93  # Build the parameter hash
94  paramsDict = {}
95  paramsDict.update(runnable.param_defaults())
96  paramsDict.update(inputParameters)
97  params = ParamContainer(paramsDict)
98  self._BaseRunnable__params = params
99 
100  # Build the Job object
101  job = Job()
102  job.dbID = None
103  job.input_id = str(inputParameters) # FIXME: this should be a Perl stringification, not a Python one
104  job.retry_count = int(self.__config.get('is_retry', False))
105  job.autoflow = True
106  job.lethal_for_worker = False
107  job.transient_error = True
108  self.input_job = job
109 
110  self.debug = self.__config.get('debug', 0)
111 
112  def __job_life_cycle(self):
113  """Run the job's life cycle. This must match BaseRunnable.__job_life_cycle"""
114 
115  # Which methods should be run
116  steps = ['fetch_input', 'run']
117  if self.input_job.retry_count:
118  steps.insert(0, 'pre_cleanup')
119  if not self.__config.get('no_write'):
120  steps.append('write_output')
121  steps.append('post_healthcheck')
122 
123  # We need to manager the temp directory since GuestProcess/Worker are not around
124  self.__created_worker_temp_directory = None
125 
126  try:
127  for s in steps:
128  self.__run_method_if_exists(s)
129  except CompleteEarlyException as e:
130  # CompleteEarlyException must be declared in the test plan
131  event = CompleteEarlyEvent(e.args[0] if e.args else None)
132  self.__compare_next_event(event)
133  except Exception as e:
134  self.__handle_exception(e)
135 
136  try:
137  self.__run_method_if_exists('post_cleanup')
138  except Exception as e:
139  self.__handle_exception(e)
140 
141  if not self.__config.get('no_cleanup'):
142  self.__cleanup_worker_temp_directory()
143 
144  def __run_method_if_exists(self, method):
145  """Run the method (one of "fetch_input", "run", "write_output",
146  etc) if defined in the Runnable."""
147  if hasattr(self, method):
148  getattr(self, method)()
149 
150  def __handle_exception(self, e):
151  """Capture and check the Runnable's own exceptions whilst letting
152  the testcase's exceptions pass through"""
153  if any(f for f in traceback.extract_tb(e.__traceback__) if f[2] == '__compare_next_event'):
154  raise e
155  else:
156  # Job exception: check whether it is expected
157  event = FailureEvent(type(e), e.args)
158  self.__compare_next_event(event)
159 
160  def __final_tests(self):
161  """Extra tests once the job has ended"""
162  testcase.assertFalse(self.__refEvents, msg='The job has now ended and {} events have not been emitted'.format(len(self.__refEvents)))
163 
164  # Job attributes that the Runnable could have set and we want to test
165  for attr in ['autoflow', 'lethal_for_worker', 'transient_error']:
166  tattr = "test_" + attr
167  if tattr in self.__config:
168  testcase.assertEqual(getattr(self.input_job, attr), self.__config[tattr], msg='Final value of {}'.format(attr))
169 
170  # Overridden BaseRunnable interface
171 
172 
173  def worker_temp_directory(self):
174  """Provide a temporary directory for the duration of the test.
175  This functionality was handled by the Perl side (via GuestProcess
176  but has to be reimplemented."""
177  if self.__created_worker_temp_directory is None:
178  self.__created_worker_temp_directory = tempfile.mkdtemp()
179  return self.__created_worker_temp_directory
180 
181  def __cleanup_worker_temp_directory(self):
182  """Remove the temporary directory created by worker_temp_directory.
183  Again, this was handled by the Perl side but has to be
184  reimplemented."""
185  if self.__created_worker_temp_directory:
186  shutil.rmtree(self.__created_worker_temp_directory)
187 
188  def warning(self, message, is_error=False):
189  """Test that the warning event generated is expected"""
190  event = WarningEvent(message, is_error)
191  self.__compare_next_event(event)
192 
193  def dataflow(self, output_ids, branch_name_or_code=1):
194  """Test that the dataflow event generated is expected"""
195  if branch_name_or_code == 1:
196  self.input_job.autoflow = False
197  event = DataflowEvent(output_ids, branch_name_or_code)
198  self.__compare_next_event(event)
199  return [1]
200 
201  def __compare_next_event(self, event):
202  """Helper method for warning and dataflow.
203  Check that the event that has been generated is expected."""
204  testcase.assertTrue(self.__refEvents, msg='No more events are expected but {} was raised'.format(event))
205  testcase.assertEqual(event, self.__refEvents.pop(0))
206 
207  # Build the Runnable
208  # NOTE: not __init__ because we can't provide file descriptors, etc
209  runnable = RunnableTester.__new__(RunnableTester)
210  runnable.runTests()
eHive.tests.DataflowEvent
DataflowEvent
Definition: tests.py:32
eHive.tests.WarningEvent
WarningEvent
Definition: tests.py:31
eHive.tests.testRunnable
def testRunnable(testcase, runnableClass, inputParameters, refEvents, config=None)
Method to test a Runnable.
Definition: tests.py:71
eHive.tests.CompleteEarlyEvent
CompleteEarlyEvent
Definition: tests.py:33
eHive.params.ParamContainer
Equivalent of eHive's Param module.
Definition: params.py:57
eHive.tests.FailureEvent
FailureEvent
Definition: tests.py:34
eHive.process.Job
Dummy class to hold job-related information.
Definition: process.py:35
eHive.utils.find_module
def find_module(module_name)
Find and instantiate a Runnable, given its name.
Definition: utils.py:28