ensembl-hive-python3  2.8.1
TestRunnable.py
Go to the documentation of this file.
1 
2 # See the NOTICE file distributed with this work for additional information
3 # regarding copyright ownership.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 # standaloneJob.pl eHive.examples.TestRunnable -language python3
18 
19 import os
20 import subprocess
21 import unittest
22 
23 import eHive
24 
26  """Simple Runnable to test as a standaloneJob"""
27 
28  def param_defaults(self):
29  return {
30  'alpha' : 37,
31  'beta' : 78,
32  'gamma' : '#alpha#',
33  'delta' : 'one#hash',
34  }
35 
36  def fetch_input(self):
37  self.warning("Fetch the world !")
38  print("alpha is", self.param_required('alpha'))
39  print("beta is", self.param_required('beta'))
41  print("my directory name is", self.temp_dir)
42  print("gamma is", self.param_required('gamma'))
43  print("delta is", self.param_required('delta'))
44 
45  def run(self):
46  self.warning("Run the world !")
47  s = self.param('alpha') + self.param('beta')
48  print("set gamma to", s)
49  if s == 0:
50  raise eHive.CompleteEarlyException('Nothing to do')
51  if s > 255:
52  raise OverflowError(s)
53  self.param('gamma', s)
54  self.greeting_path = os.path.join(self.temp_dir, "hello")
55  subprocess.check_call(["touch", self.greeting_path])
56 
57  def write_output(self):
58  self.warning("Write to the world !")
59  print("gamma is", self.param('gamma'))
60  self.dataflow( {'gamma': self.param('gamma')}, 2 )
61  print("Greetings in place:", os.path.exists(self.greeting_path))
62 
63 
64 class TestRunnableTestCase(unittest.TestCase):
65 
66  def test_TestRunnable(self):
67  eHive.testRunnable(self,
68  TestRunnable,
69  {},
70  [
71  eHive.WarningEvent('Fetch the world !', is_error=False),
72  eHive.WarningEvent('Run the world !', is_error=False),
73  eHive.WarningEvent('Write to the world !', is_error=False),
74  eHive.DataflowEvent({'gamma': 115}, branch_name_or_code=2),
75  ],
76  )
77  eHive.testRunnable(self,
78  TestRunnable,
79  {
80  'alpha': 237,
81  },
82  [
83  eHive.WarningEvent('Fetch the world !', is_error=False),
84  eHive.WarningEvent('Run the world !', is_error=False),
85  eHive.FailureEvent(OverflowError, (315,)),
86  ],
87  )
88  eHive.testRunnable(self,
89  TestRunnable,
90  {
91  'beta': -37,
92  },
93  [
94  eHive.WarningEvent('Fetch the world !', is_error=False),
95  eHive.WarningEvent('Run the world !', is_error=False),
96  eHive.CompleteEarlyEvent('Nothing to do'),
97  ],
98  )
eHive.process.BaseRunnable
This is the counterpart of GuestProcess.
Definition: process.py:60
eHive.process.BaseRunnable.dataflow
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
Definition: process.py:229
eHive.process.BaseRunnable.worker_temp_directory
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Definition: process.py:237
eHive.process.CompleteEarlyException
Can be raised by a derived class of BaseRunnable to indicate an early successful termination.
Definition: process.py:39
eHive.process.BaseRunnable.param_required
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
Definition: process.py:253
eHive.examples.TestRunnable.TestRunnableTestCase
Definition: TestRunnable.py:64
eHive.process.BaseRunnable.warning
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Definition: process.py:225
eHive.examples.TestRunnable.TestRunnable.temp_dir
temp_dir
Definition: TestRunnable.py:40
eHive.examples.TestRunnable.TestRunnable.write_output
def write_output(self)
Definition: TestRunnable.py:57
eHive.process.BaseRunnable.param
def param(self, param_name, *args)
When called as a setter: sets the value of the parameter "param_name".
Definition: process.py:266
eHive.examples.TestRunnable.TestRunnable.run
def run(self)
Definition: TestRunnable.py:45
eHive.examples.TestRunnable.TestRunnableTestCase.test_TestRunnable
def test_TestRunnable(self)
Definition: TestRunnable.py:66
eHive.examples.TestRunnable.TestRunnable.fetch_input
def fetch_input(self)
Definition: TestRunnable.py:36
eHive.examples.TestRunnable.TestRunnable.greeting_path
greeting_path
Definition: TestRunnable.py:54
eHive.examples.TestRunnable.TestRunnable.param_defaults
def param_defaults(self)
Returns the defaults parameters for this runnable.
Definition: TestRunnable.py:28
eHive.examples.TestRunnable.TestRunnable
Simple Runnable to test as a standaloneJo.
Definition: TestRunnable.py:26