def allocDelayProg(size, duration):
"""Python program as str that allocates object of the specified size
in Gigabytes and then waits for the specified time
size - allocation size, bytes; None to skip allocation
duration - execution duration, sec; None to skip the sleep
return - Python program as str
"""
return """from __future__ import print_function, division # Required for stderr output, must be the first import
import sys
import time
import array
if {size} is not None:
a = array.array('b', [0])
asize = sys.getsizeof(a)
# Note: allocate at least empty size, i.e. empty list
buffer = a * int(max({size} - asize + 1, 0))
#if _DEBUG_TRACE:
# print(''.join((' allocDelayProg(), allocated ', str(sys.getsizeof(buffer))
# , ' bytes for ', str({duration}),' sec')), file=sys.stderr)
if {duration} is not None:
time.sleep({duration})
""".format(size=size, duration=duration)
python类skip()的实例源码
def skip_if_no_uuid(f):
"""Decorator to skip a test if uuid is not supported by Py/PG."""
@wraps(f)
def skip_if_no_uuid_(self):
try:
import uuid # noqa
except ImportError:
return self.skipTest("uuid not available in this Python version")
try:
cur = self.conn.cursor()
cur.execute("select typname from pg_type where typname = 'uuid'")
has = cur.fetchone()
finally:
self.conn.rollback()
if has:
return f(self)
else:
return self.skipTest("uuid type not available on the server")
return skip_if_no_uuid_
def skip_if_no_uuid(f):
"""Decorator to skip a test if uuid is not supported by Py/PG."""
@wraps(f)
def skip_if_no_uuid_(self):
try:
import uuid # noqa
except ImportError:
return self.skipTest("uuid not available in this Python version")
try:
cur = self.conn.cursor()
cur.execute("select typname from pg_type where typname = 'uuid'")
has = cur.fetchone()
finally:
self.conn.rollback()
if has:
return f(self)
else:
return self.skipTest("uuid type not available on the server")
return skip_if_no_uuid_
def test_callback_is_run(self):
await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
exchange_name="fake_exch",
exchange_type="direct",
routing_keys=["fake_routing_key"],
callback = self.fake_callback)
await self.GIVEN_MessagePublished(exchange_name="fake_exch",
msg="fake_message",
routing_key="fake_routing_key")
await self.WHEN_ProcessEventsNTimes(2)
self.THEN_CallbackReceivesMessage("fake_message")
# @unittest.skip("skipped")
def test_callback_is_not_run_if_routing_key_mismatch(self):
fake_callback = Mock()
await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
exchange_name="fake_exch",
exchange_type="direct",
routing_keys=["fake_routing_key"],
callback = fake_callback)
await self.GIVEN_MessagePublished(exchange_name="fake_exch",
msg="fake_message",
routing_key="another_routing_key")
await self.WHEN_ProcessEventsOnce()
self.THEN_CallbackIsNotRun(fake_callback)
# @unittest.skip("skipped")
def test_callback_run_if_multiple_routing_keys(self):
await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
exchange_name="fake_exch",
exchange_type="direct",
routing_keys=["fake_routing_key","fake_routing_key2"],
callback = self.fake_callback)
await self.GIVEN_MessagePublished(exchange_name="fake_exch",
msg="fake_message",
routing_key="fake_routing_key2")
await self.WHEN_ProcessEventsOnce()
self.THEN_CallbackReceivesMessage("fake_message")
# @unittest.skip("skipped")
def test_callback_run_if_two_exclusive_queues_registered(self):
await self.GIVEN_ConsumerRegistered(queue_name = None,
exchange_name="fake_exch",
exchange_type="direct",
routing_keys=["fake_routing_key"],
callback = self.fake_callback)
await self.GIVEN_ConsumerRegistered(queue_name = None,
exchange_name="fake_exch",
exchange_type="direct",
routing_keys=["fake_routing_key"],
callback = self.fake_callback2)
await self.GIVEN_MessagePublished(exchange_name="fake_exch",
msg="fake_message",
routing_key="fake_routing_key")
await self.WHEN_ProcessEventsOnce()
self.THEN_CallbackReceivesMessage("fake_message")
self.THEN_Callback2ReceivesMessage("fake_message")
# @unittest.skip("skipped")
def test_routes_to_all_consumer_queues(self):
await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue1",
exchange_name="fake_exch",
exchange_type="fanout",
routing_keys="",
callback = self.fake_callback)
await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue2",
exchange_name="fake_exch",
exchange_type="fanout",
routing_keys="",
callback = self.fake_callback2)
await self.GIVEN_MessagePublished(exchange_name="fake_exch",
msg="fake_msg",
routing_key="")
await self.WHEN_ProcessEventsNTimes(1)
self.THEN_CallbackReceivesMessage("fake_msg")
self.THEN_Callback2ReceivesMessage("fake_msg")
# @unittest.skip("skipped")
def test_direct_runs_callback(self):
await self.GIVEN_ProducerRegistered(exchange_name="fake_direct_exch",
exchange_type="direct")
await self.GIVEN_ConsumerRegistered(queue_name="fake_direct_consumer_queue",
exchange_name="fake_direct_exch",
exchange_type="direct",
routing_keys=["fake_routing_key"],
callback = self.fake_callback)
await self.GIVEN_MessagePublished(exchange_name="fake_direct_exch",
msg="fake_message",
routing_key="fake_routing_key")
await self.WHEN_ProcessEventsOnce()
self.THEN_CallbackReceivesMessage("fake_message")
# @unittest.skip("skipped")
def test_topic_runs_callback(self):
await self.GIVEN_ProducerRegistered(exchange_name="fake_topic_exch",
exchange_type="topic")
await self.GIVEN_ConsumerRegistered(queue_name="fake_topic_consumer_queue",
exchange_name="fake_topic_exch",
exchange_type="topic",
routing_keys=["*.red","ball.*"],
callback = self.fake_callback)
await self.GIVEN_MessagePublished(exchange_name="fake_topic_exch",
msg="fake_message",
routing_key="apple.red")
await self.WHEN_ProcessEventsOnce()
self.THEN_CallbackReceivesMessage("fake_message")
# @unittest.skip("skipped")
def test_skipping(self):
class Foo(unittest.TestCase):
def test_skip_me(self):
self.skipTest("skip")
events = []
result = LoggingResult(events)
test = Foo("test_skip_me")
test.run(result)
self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
self.assertEqual(result.skipped, [(test, "skip")])
# Try letting setUp skip the test now.
class Foo(unittest.TestCase):
def setUp(self):
self.skipTest("testing")
def test_nothing(self): pass
events = []
result = LoggingResult(events)
test = Foo("test_nothing")
test.run(result)
self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
self.assertEqual(result.skipped, [(test, "testing")])
self.assertEqual(result.testsRun, 1)
def test_decorated_skip(self):
def decorator(func):
def inner(*a):
return func(*a)
return inner
class Foo(unittest.TestCase):
@decorator
@unittest.skip('testing')
def test_1(self):
pass
result = unittest.TestResult()
test = Foo("test_1")
suite = unittest.TestSuite([test])
suite.run(result)
self.assertEqual(result.skipped, [(test, "testing")])
def test_class_not_setup_or_torndown_when_skipped(self):
class Test(unittest.TestCase):
classSetUp = False
tornDown = False
@classmethod
def setUpClass(cls):
Test.classSetUp = True
@classmethod
def tearDownClass(cls):
Test.tornDown = True
def test_one(self):
pass
Test = unittest.skip("hop")(Test)
self.runTests(Test)
self.assertFalse(Test.classSetUp)
self.assertFalse(Test.tornDown)
def test_notification_pub_sub_mix(self):
"""
Test notification messaging pattern mixed with basic pub/sub calls.
"""
self.m.register_notification_endpoint(self._simple_subscribe_cbf1, "test.notification1")
self.m.subscribe(self._simple_subscribe_cbf1, "test.notification2")
time.sleep(0.5) # give broker some time to register subscriptions
# send publish to notify endpoint
self.m.publish("test.notification1", "my-notification1")
self.assertEqual(self.wait_for_messages()[0], "my-notification1")
# send notify to subscribe endpoint
self.m.notify("test.notification2", "my-notification2")
#res = self.wait_for_messages(n_messages=2)
self.assertTrue(self.wait_for_particular_messages("my-notification1"))
self.assertTrue(self.wait_for_particular_messages("my-notification2"))
#@unittest.skip("disabled")
def test_double_subscriptions(self):
"""
Ensure that messages are delivered to all subscriptions of a topic.
(e.g. identifies queue setup problems)
:return:
"""
self.m.subscribe(self._simple_subscribe_cbf1, "test.interleave")
self.m.subscribe(self._simple_subscribe_cbf2, "test.interleave")
time.sleep(0.5)
# send publish to notify endpoint
self.m.publish("test.interleave", "my-notification1")
# enusre that it is received by each subscription
self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=0))
self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=1))
#@unittest.skip("disabled")
def test_strings(self):
clause = self.parser.parse_line_clause_body('X is \'bar\', S is format_str(\'test %d %s foo\', 42, X)')
solutions = self.rt.search(clause)
self.assertEqual (solutions[0]['S'].s, 'test 42 bar foo')
clause = self.parser.parse_line_clause_body('X is \'foobar\', sub_string(X, 0, 2, _, Y)')
solutions = self.rt.search(clause)
self.assertEqual (solutions[0]['Y'].s, 'fo')
clause = self.parser.parse_line_clause_body('atom_chars(foo, X), atom_chars(Y, "bar").')
solutions = self.rt.search(clause)
self.assertEqual (solutions[0]['X'].s, 'foo')
self.assertEqual (solutions[0]['Y'].name, 'bar')
# @unittest.skip("temporarily disabled")
def test_setz_multi(self):
# self.rt.set_trace(True)
clause = self.parser.parse_line_clause_body('I is ias2, setz(ias (I, a, _), a), setz(ias (I, b, _), b), setz(ias (I, c, _), c), ias(I, X, Y). ')
solutions = self.rt.search(clause)
logging.debug(repr(solutions))
self.assertEqual (len(solutions), 3)
self.assertEqual (solutions[0]['X'].name, 'a')
self.assertEqual (solutions[0]['Y'].name, 'a')
self.assertEqual (solutions[1]['X'].name, 'b')
self.assertEqual (solutions[1]['Y'].name, 'b')
self.assertEqual (solutions[2]['X'].name, 'c')
self.assertEqual (solutions[2]['Y'].name, 'c')
# @unittest.skip("temporarily disabled")
def test_custom_builtins(self):
global recorded_moves
self.parser.compile_file('samples/hanoi2.pl', UNITTEST_MODULE)
clause = self.parser.parse_line_clause_body('move(3,left,right,center)')
logging.debug('clause: %s' % clause)
# register our custom builtin
recorded_moves = []
self.rt.register_builtin('record_move', record_move)
solutions = self.rt.search(clause)
logging.debug('solutions: %s' % repr(solutions))
self.assertEqual (len(solutions), 1)
self.assertEqual (len(recorded_moves), 7)
#@unittest.skip("temporarily disabled")
def test_double_negation(self):
self.parser.compile_file('samples/not_test.pl', UNITTEST_MODULE)
clause = self.parser.parse_line_clause_body('not(not(chancellor(helmut_kohl))).')
logging.debug('clause: %s' % clause)
solutions = self.rt.search(clause, {})
logging.debug('solutions: %s' % repr(solutions))
self.assertEqual (len(solutions), 1)
clause = self.parser.parse_line_clause_body('not(not(chancellor(angela_merkel))).')
logging.debug('clause: %s' % clause)
solutions = self.rt.search(clause, {})
logging.debug('solutions: %s' % repr(solutions))
self.assertEqual (len(solutions), 1)
clause = self.parser.parse_line_clause_body('not(not(chancellor(X))).')
logging.debug('clause: %s' % clause)
solutions = self.rt.search(clause, {})
logging.debug('solutions: %s' % repr(solutions))
self.assertEqual (len(solutions), 2)
# @unittest.skip("temporarily disabled")
def test_parse_line_clauses(self):
line = 'time_span(TE) :- date_time_stamp(+(D, 1.0)).'
tree = self.parser.parse_line_clauses(line)
logging.debug (unicode(tree[0].body))
self.assertEqual (tree[0].body.name, 'date_time_stamp')
self.assertEqual (tree[0].head.name, 'time_span')
line = 'time_span(tomorrow, TS, TE) :- context(currentTime, T), stamp_date_time(T, date(Y, M, D, H, Mn, S, "local")), date_time_stamp(date(Y, M, +(D, 1.0), 0.0, 0.0, 0.0, "local"), TS), date_time_stamp(date(Y, M, +(D, 1.0), 23.0, 59.0, 59.0, "local"), TE).'
tree = self.parser.parse_line_clauses(line)
logging.debug (unicode(tree[0].body))
self.assertEqual (tree[0].head.name, 'time_span')
self.assertEqual (tree[0].body.name, 'and')
self.assertEqual (len(tree[0].body.args), 4)
# @unittest.skip("temporarily disabled")
def test_cut(self):
self.parser.compile_file('samples/cut_test.pl', UNITTEST_MODULE)
# self.rt.set_trace(True)
clause = self.parser.parse_line_clause_body(u'bar(R, X)')
logging.debug(u'clause: %s' % clause)
solutions = self.rt.search(clause)
logging.debug('solutions: %s' % repr(solutions))
self.assertEqual (len(solutions), 4)
self.assertEqual (solutions[0]['R'].s, "one")
self.assertEqual (solutions[1]['R'].s, "two")
self.assertEqual (solutions[2]['R'].s, "many")
self.assertEqual (solutions[3]['R'].s, "many")
# @unittest.skip("temporarily disabled")
def test_create_signal_graph(self):
node_signal = sptgraph.create_node_signal(gen_signal(), 'baseID', 'layer', False)
for is_directed in [True, False]:
g = utils.networkx_to_graphlab(gen_graph(is_directed))
sg = sptgraph.merge_signal_on_graph(g, node_signal, 'baseID', 'layer', use_fast=False, verbose=False)
# Node 5 is never activated
self.assertEqual(4, len(sg.vertices))
if not is_directed:
self.assertEqual(8, len(sg.edges))
else:
self.assertEqual(4, len(sg.edges))
actual_columns = set(sg.vertices.column_names())
expected_columns = {'layers', 'node_weight', '__id'}
self.assertItemsEqual(expected_columns, actual_columns)
# @unittest.skip('Skipping aggregate layers')
def test_aggregate_layers(self):
signal = gl.SFrame(gen_signal())
nb_layers = signal['layer'].max() + 1 # starts at 0
# Python 'slow'
original = sptgraph.create_node_signal(signal, 'baseID', 'layer', False)
# Fast c++ version
res = sptgraph_fast.aggregate_layers(signal, 'baseID', 'layer', nb_layers)
# Transform output to compare
l1 = original['layers'].apply(int)
l2 = res['layers'].apply(utils.reform_layer_int_from_blocks)
m = l1 == l2
self.assertTrue(m.all(), 'Layers should be equal')
# @unittest.skip('Skipping rebuilt_bitset')
def test_directed_no_self_edge(self):
# Directed no self-edge
g = sptgraph.create_spatio_temporal_graph(gen_graph(True), gen_signal(), False, verbose=False)
# The graph has only 1 connected component
h = components.find_connected_components(g)
cc = components.create_component_sframe(h)
comps = components.extract_components(h, cc)
self.assertEqual(1, len(comps))
# We remove the edge (7, 12) to create 2 weakly connected components
nodes = g.vertices
edges = g.edges.add_row_number('eid')
to_remove = g.get_edges(7, 12)
edges = edges[edges['eid'] != to_remove['eid'][0]]
g = gl.SGraph(nodes, edges)
h = components.find_connected_components(g)
cc = components.create_component_sframe(h)
comps = components.extract_components(h, cc)
self.assertEqual(2, len(comps))
# @unittest.skip('Skipping undirected self edge')
def test_bayes_factor_delta(self):
ndecimals = 5
res = self.getExperiment(['normal_same']).delta(method='bayes_factor', num_iters=2000)
variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants')
aStats = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics')
self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals)
self.assertEqual(aStats['stop'], True, ndecimals)
self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals)
self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals)
self.assertNumericalEqual(aStats['confidence_interval'][1]['value'], 0.07127, ndecimals)
self.assertEqual(aStats['treatment_sample_size'], 6108)
self.assertEqual(aStats['control_sample_size'], 3892)
self.assertNumericalEqual(aStats['treatment_mean'], 0.025219, ndecimals)
self.assertNumericalEqual(aStats['control_mean'], -0.007833, ndecimals)
self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals)
# @unittest.skip("sometimes takes too much time")
def test_bayes_precision_delta(self):
ndecimals = 5
res = self.getExperiment(['normal_same']).delta(method='bayes_precision', num_iters=2000)
variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants')
aStats = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics')
self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals)
self.assertEqual(aStats['stop'], True, ndecimals)
self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals)
self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals)
self.assertNumericalEqual(aStats['confidence_interval'][1]['value'], 0.07127, ndecimals)
self.assertEqual(aStats['treatment_sample_size'], 6108)
self.assertEqual(aStats['control_sample_size'], 3892)
self.assertNumericalEqual(aStats['treatment_mean'], 0.025219, ndecimals)
self.assertNumericalEqual(aStats['control_mean'], -0.007833, ndecimals)
self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals)
# @unittest.skip("sometimes takes too much time")
def test_bayes_factor(self):
"""
Check the Bayes factor function.
"""
res = es.bayes_factor(self.rand_s1, self.rand_s2, num_iters=2000)
self.assertEqual (res['stop'], True)
self.assertAlmostEqual (res['delta'], -0.15887364780635896)
value025 = find_list_of_dicts_element(res['confidence_interval'], 'percentile', 2.5, 'value')
value975 = find_list_of_dicts_element(res['confidence_interval'], 'percentile', 97.5, 'value')
self.assertAlmostEqual (value025, -0.24293384641452503)
self.assertAlmostEqual (value975, -0.075064346336461404)
self.assertEqual (res['treatment_sample_size'], 1000)
self.assertEqual (res['control_sample_size'], 1000)
self.assertAlmostEqual (res['treatment_mean'], -0.045256707490195384)
self.assertAlmostEqual (res['control_mean'], 0.11361694031616358)
# @unittest.skip("sometimes takes too much time")
def test_get_by_user(self):
response = self.admin_open(self.item_url('FooBarUser'))
self.assertAPIError(response, 404, 'UserNotFound')
response = self.admin_open(self.item_url(self.user.username))
self.assert200(response) # only Admin has permission
validator = UsageResponseValidator()
if not validator.validate_get(response.json):
self.fail(validator.errors)
# first user only
self.assertEqual(len(response.json['data']['ip_usage']), 2)
# first user only
self.assertEqual(len(response.json['data']['pd_usage']), 2)
# @unittest.skip('')
def skip(reason, condition=None):
"""
Mark a test case or method for skipping.
@param reason: message
@type reason: str
@param condition: skip only if the specified condition is True
@type condition: bool/expression
"""
if isinstance(reason, types.FunctionType):
raise TypeError('skip: no reason specified')
if condition is None:
return unittest.skip(reason)
else:
return unittest.skipIf(condition, reason)
def test_download_one_file(self):
try:
shutil.rmtree("/tmp/nitty_test")
except:
pass
remote_path = "/alice/cern.ch/user/c/cbourjau/nitty_test/AnalysisResults.root"
utils.download_file(remote_path,
"/tmp/nitty_test/AnalysisResults.root")
self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root"))
shutil.rmtree("/tmp/nitty_test")
# without specifiying dest file name:
utils.download_file(remote_path,
"/tmp/nitty_test/")
self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root"))
shutil.rmtree("/tmp/nitty_test")
# @skip("Skip download test")