python类skip()的实例源码

mpetests.py 文件源码 项目:PyExPool 作者: eXascaleInfolab 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def allocDelayProg(size, duration):
    """Python program as str that allocates object of the specified size
    in Gigabytes and then waits for the specified time

    size  - allocation size, bytes; None to skip allocation
    duration  - execution duration, sec; None to skip the sleep

    return  - Python program as str
    """
    return """from __future__ import print_function, division  # Required for stderr output, must be the first import
import sys
import time
import array

if {size} is not None:
    a = array.array('b', [0])
    asize = sys.getsizeof(a)
    # Note: allocate at least empty size, i.e. empty list
    buffer = a * int(max({size} - asize + 1, 0))
    #if _DEBUG_TRACE:
    # print(''.join(('  allocDelayProg(), allocated ', str(sys.getsizeof(buffer))
    #   , ' bytes for ', str({duration}),' sec')), file=sys.stderr)
if {duration} is not None:
    time.sleep({duration})
""".format(size=size, duration=duration)
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid             # noqa
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid             # noqa
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
test_unit.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_callback_is_run(self):
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback)
        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_message",
                                    routing_key="fake_routing_key")
        await self.WHEN_ProcessEventsNTimes(2)

        self.THEN_CallbackReceivesMessage("fake_message")



    # @unittest.skip("skipped")
test_unit.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_callback_is_not_run_if_routing_key_mismatch(self):
        fake_callback = Mock()
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_message",
                                    routing_key="another_routing_key")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackIsNotRun(fake_callback)


    # @unittest.skip("skipped")
test_unit.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_callback_run_if_multiple_routing_keys(self):
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key","fake_routing_key2"],
                                      callback = self.fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                        msg="fake_message",
                                        routing_key="fake_routing_key2")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")


    # @unittest.skip("skipped")
test_unit.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_callback_run_if_two_exclusive_queues_registered(self):
        await self.GIVEN_ConsumerRegistered(queue_name = None,
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback)

        await self.GIVEN_ConsumerRegistered(queue_name = None,
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback2)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_message",
                                    routing_key="fake_routing_key")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")
        self.THEN_Callback2ReceivesMessage("fake_message")


    # @unittest.skip("skipped")
test_unit.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_routes_to_all_consumer_queues(self):
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue1",
                                      exchange_name="fake_exch",
                                      exchange_type="fanout",
                                      routing_keys="",
                                      callback = self.fake_callback)

        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue2",
                                      exchange_name="fake_exch",
                                      exchange_type="fanout",
                                      routing_keys="",
                                      callback = self.fake_callback2)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_msg",
                                    routing_key="")

        await self.WHEN_ProcessEventsNTimes(1)

        self.THEN_CallbackReceivesMessage("fake_msg")
        self.THEN_Callback2ReceivesMessage("fake_msg")


# @unittest.skip("skipped")
test_integration.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_direct_runs_callback(self):
        await self.GIVEN_ProducerRegistered(exchange_name="fake_direct_exch",
                                      exchange_type="direct")

        await self.GIVEN_ConsumerRegistered(queue_name="fake_direct_consumer_queue",
                                      exchange_name="fake_direct_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_direct_exch",
                                    msg="fake_message",
                                    routing_key="fake_routing_key")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")

    # @unittest.skip("skipped")
test_integration.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_topic_runs_callback(self):
        await self.GIVEN_ProducerRegistered(exchange_name="fake_topic_exch",
                                      exchange_type="topic")

        await self.GIVEN_ConsumerRegistered(queue_name="fake_topic_consumer_queue",
                                      exchange_name="fake_topic_exch",
                                      exchange_type="topic",
                                      routing_keys=["*.red","ball.*"],
                                      callback = self.fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_topic_exch",
                                    msg="fake_message",
                                    routing_key="apple.red")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")

    # @unittest.skip("skipped")
test_skipping.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_skipping(self):
        class Foo(unittest.TestCase):
            def test_skip_me(self):
                self.skipTest("skip")
        events = []
        result = LoggingResult(events)
        test = Foo("test_skip_me")
        test.run(result)
        self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
        self.assertEqual(result.skipped, [(test, "skip")])

        # Try letting setUp skip the test now.
        class Foo(unittest.TestCase):
            def setUp(self):
                self.skipTest("testing")
            def test_nothing(self): pass
        events = []
        result = LoggingResult(events)
        test = Foo("test_nothing")
        test.run(result)
        self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
        self.assertEqual(result.skipped, [(test, "testing")])
        self.assertEqual(result.testsRun, 1)
test_skipping.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_decorated_skip(self):
        def decorator(func):
            def inner(*a):
                return func(*a)
            return inner

        class Foo(unittest.TestCase):
            @decorator
            @unittest.skip('testing')
            def test_1(self):
                pass

        result = unittest.TestResult()
        test = Foo("test_1")
        suite = unittest.TestSuite([test])
        suite.run(result)
        self.assertEqual(result.skipped, [(test, "testing")])
test_setups.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_class_not_setup_or_torndown_when_skipped(self):
        class Test(unittest.TestCase):
            classSetUp = False
            tornDown = False
            @classmethod
            def setUpClass(cls):
                Test.classSetUp = True
            @classmethod
            def tearDownClass(cls):
                Test.tornDown = True
            def test_one(self):
                pass

        Test = unittest.skip("hop")(Test)
        self.runTests(Test)
        self.assertFalse(Test.classSetUp)
        self.assertFalse(Test.tornDown)
test_messaging.py 文件源码 项目:son-mano-framework 作者: sonata-nfv 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_notification_pub_sub_mix(self):
        """
        Test notification messaging pattern mixed with basic pub/sub calls.
        """
        self.m.register_notification_endpoint(self._simple_subscribe_cbf1, "test.notification1")
        self.m.subscribe(self._simple_subscribe_cbf1, "test.notification2")
        time.sleep(0.5)  # give broker some time to register subscriptions
        # send publish to notify endpoint
        self.m.publish("test.notification1", "my-notification1")
        self.assertEqual(self.wait_for_messages()[0], "my-notification1")
        # send notify to subscribe endpoint
        self.m.notify("test.notification2", "my-notification2")
        #res = self.wait_for_messages(n_messages=2)
        self.assertTrue(self.wait_for_particular_messages("my-notification1"))
        self.assertTrue(self.wait_for_particular_messages("my-notification2"))

    #@unittest.skip("disabled")
test_messaging.py 文件源码 项目:son-mano-framework 作者: sonata-nfv 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_double_subscriptions(self):
        """
        Ensure that messages are delivered to all subscriptions of a topic.
        (e.g. identifies queue setup problems)
        :return:
        """
        self.m.subscribe(self._simple_subscribe_cbf1, "test.interleave")
        self.m.subscribe(self._simple_subscribe_cbf2, "test.interleave")
        time.sleep(0.5)
        # send publish to notify endpoint
        self.m.publish("test.interleave", "my-notification1")
        # enusre that it is received by each subscription
        self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=0))
        self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=1))

    #@unittest.skip("disabled")
test_builtins.py 文件源码 项目:zamia-prolog 作者: gooofy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_strings(self):

        clause = self.parser.parse_line_clause_body('X is \'bar\', S is format_str(\'test %d %s foo\', 42, X)')
        solutions = self.rt.search(clause)
        self.assertEqual (solutions[0]['S'].s, 'test 42 bar foo')

        clause = self.parser.parse_line_clause_body('X is \'foobar\', sub_string(X, 0, 2, _, Y)')
        solutions = self.rt.search(clause)
        self.assertEqual (solutions[0]['Y'].s, 'fo')

        clause = self.parser.parse_line_clause_body('atom_chars(foo, X), atom_chars(Y, "bar").')
        solutions = self.rt.search(clause)
        self.assertEqual (solutions[0]['X'].s, 'foo')
        self.assertEqual (solutions[0]['Y'].name, 'bar')

    # @unittest.skip("temporarily disabled")
test_builtins.py 文件源码 项目:zamia-prolog 作者: gooofy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_setz_multi(self):

        # self.rt.set_trace(True)

        clause = self.parser.parse_line_clause_body('I is ias2, setz(ias (I, a, _), a), setz(ias (I, b, _), b), setz(ias (I, c, _), c), ias(I, X, Y). ')
        solutions = self.rt.search(clause)
        logging.debug(repr(solutions))
        self.assertEqual (len(solutions), 3)
        self.assertEqual (solutions[0]['X'].name, 'a')
        self.assertEqual (solutions[0]['Y'].name, 'a')
        self.assertEqual (solutions[1]['X'].name, 'b')
        self.assertEqual (solutions[1]['Y'].name, 'b')
        self.assertEqual (solutions[2]['X'].name, 'c')
        self.assertEqual (solutions[2]['Y'].name, 'c')

    # @unittest.skip("temporarily disabled")
test_embedding.py 文件源码 项目:zamia-prolog 作者: gooofy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_custom_builtins(self):

        global recorded_moves

        self.parser.compile_file('samples/hanoi2.pl', UNITTEST_MODULE)

        clause = self.parser.parse_line_clause_body('move(3,left,right,center)')
        logging.debug('clause: %s' % clause)

        # register our custom builtin
        recorded_moves = []
        self.rt.register_builtin('record_move', record_move)

        solutions = self.rt.search(clause)
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 1)

        self.assertEqual (len(recorded_moves), 7)

    #@unittest.skip("temporarily disabled")
test_negation.py 文件源码 项目:zamia-prolog 作者: gooofy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_double_negation(self):

        self.parser.compile_file('samples/not_test.pl', UNITTEST_MODULE)

        clause = self.parser.parse_line_clause_body('not(not(chancellor(helmut_kohl))).')
        logging.debug('clause: %s' % clause)
        solutions = self.rt.search(clause, {})
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 1)

        clause = self.parser.parse_line_clause_body('not(not(chancellor(angela_merkel))).')
        logging.debug('clause: %s' % clause)
        solutions = self.rt.search(clause, {})
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 1)

        clause = self.parser.parse_line_clause_body('not(not(chancellor(X))).')
        logging.debug('clause: %s' % clause)
        solutions = self.rt.search(clause, {})
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 2)

    # @unittest.skip("temporarily disabled")
test_basics.py 文件源码 项目:zamia-prolog 作者: gooofy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_parse_line_clauses(self):

        line = 'time_span(TE) :- date_time_stamp(+(D, 1.0)).'

        tree = self.parser.parse_line_clauses(line)
        logging.debug (unicode(tree[0].body))
        self.assertEqual (tree[0].body.name, 'date_time_stamp')
        self.assertEqual (tree[0].head.name, 'time_span')

        line = 'time_span(tomorrow, TS, TE) :- context(currentTime, T), stamp_date_time(T, date(Y, M, D, H, Mn, S, "local")), date_time_stamp(date(Y, M, +(D, 1.0), 0.0, 0.0, 0.0, "local"), TS), date_time_stamp(date(Y, M, +(D, 1.0), 23.0, 59.0, 59.0, "local"), TE).'

        tree = self.parser.parse_line_clauses(line)
        logging.debug (unicode(tree[0].body))
        self.assertEqual (tree[0].head.name, 'time_span')
        self.assertEqual (tree[0].body.name, 'and')
        self.assertEqual (len(tree[0].body.args), 4)

    # @unittest.skip("temporarily disabled")
test_basics.py 文件源码 项目:zamia-prolog 作者: gooofy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_cut(self):

        self.parser.compile_file('samples/cut_test.pl', UNITTEST_MODULE)

        # self.rt.set_trace(True)

        clause = self.parser.parse_line_clause_body(u'bar(R, X)')
        logging.debug(u'clause: %s' % clause)
        solutions = self.rt.search(clause)
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 4)
        self.assertEqual (solutions[0]['R'].s, "one")
        self.assertEqual (solutions[1]['R'].s, "two")
        self.assertEqual (solutions[2]['R'].s, "many")
        self.assertEqual (solutions[3]['R'].s, "many")

    # @unittest.skip("temporarily disabled")
test_sptgraphfast.py 文件源码 项目:sptgraph 作者: epfl-lts2 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_create_signal_graph(self):
        node_signal = sptgraph.create_node_signal(gen_signal(), 'baseID', 'layer', False)

        for is_directed in [True, False]:
            g = utils.networkx_to_graphlab(gen_graph(is_directed))
            sg = sptgraph.merge_signal_on_graph(g, node_signal, 'baseID', 'layer', use_fast=False, verbose=False)

            # Node 5 is never activated
            self.assertEqual(4, len(sg.vertices))

            if not is_directed:
                self.assertEqual(8, len(sg.edges))
            else:
                self.assertEqual(4, len(sg.edges))

            actual_columns = set(sg.vertices.column_names())
            expected_columns = {'layers', 'node_weight', '__id'}
            self.assertItemsEqual(expected_columns, actual_columns)

    # @unittest.skip('Skipping aggregate layers')
test_sptgraphfast.py 文件源码 项目:sptgraph 作者: epfl-lts2 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_aggregate_layers(self):
        signal = gl.SFrame(gen_signal())
        nb_layers = signal['layer'].max() + 1  # starts at 0

        # Python 'slow'
        original = sptgraph.create_node_signal(signal, 'baseID', 'layer', False)
        # Fast c++ version
        res = sptgraph_fast.aggregate_layers(signal, 'baseID', 'layer', nb_layers)

        # Transform output to compare
        l1 = original['layers'].apply(int)
        l2 = res['layers'].apply(utils.reform_layer_int_from_blocks)
        m = l1 == l2
        self.assertTrue(m.all(), 'Layers should be equal')

    # @unittest.skip('Skipping rebuilt_bitset')
test_components.py 文件源码 项目:sptgraph 作者: epfl-lts2 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_directed_no_self_edge(self):
        # Directed no self-edge
        g = sptgraph.create_spatio_temporal_graph(gen_graph(True), gen_signal(), False, verbose=False)

        # The graph has only 1 connected component
        h = components.find_connected_components(g)
        cc = components.create_component_sframe(h)
        comps = components.extract_components(h, cc)
        self.assertEqual(1, len(comps))

        # We remove the edge (7, 12) to create 2 weakly connected components
        nodes = g.vertices
        edges = g.edges.add_row_number('eid')
        to_remove = g.get_edges(7, 12)
        edges = edges[edges['eid'] != to_remove['eid'][0]]
        g = gl.SGraph(nodes, edges)

        h = components.find_connected_components(g)
        cc = components.create_component_sframe(h)
        comps = components.extract_components(h, cc)
        self.assertEqual(2, len(comps))

    # @unittest.skip('Skipping undirected self edge')
test_experiment.py 文件源码 项目:expan 作者: zalando 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_bayes_factor_delta(self):
        ndecimals = 5
        res = self.getExperiment(['normal_same']).delta(method='bayes_factor', num_iters=2000)

        variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants')
        aStats   = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics')
        self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals)

        self.assertEqual(aStats['stop'],      True, ndecimals)
        self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals)

        self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals)
        self.assertNumericalEqual(aStats['confidence_interval'][1]['value'],  0.07127, ndecimals)

        self.assertEqual(aStats['treatment_sample_size'], 6108)
        self.assertEqual(aStats['control_sample_size'],   3892)

        self.assertNumericalEqual(aStats['treatment_mean'],  0.025219, ndecimals)
        self.assertNumericalEqual(aStats['control_mean'],   -0.007833, ndecimals)

        self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals)


    # @unittest.skip("sometimes takes too much time")
test_experiment.py 文件源码 项目:expan 作者: zalando 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_bayes_precision_delta(self):
        ndecimals = 5
        res = self.getExperiment(['normal_same']).delta(method='bayes_precision', num_iters=2000)

        variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants')
        aStats   = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics')
        self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals)

        self.assertEqual(aStats['stop'], True, ndecimals)
        self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals)

        self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals)
        self.assertNumericalEqual(aStats['confidence_interval'][1]['value'],  0.07127, ndecimals)

        self.assertEqual(aStats['treatment_sample_size'], 6108)
        self.assertEqual(aStats['control_sample_size'],   3892)

        self.assertNumericalEqual(aStats['treatment_mean'],  0.025219, ndecimals)
        self.assertNumericalEqual(aStats['control_mean'],   -0.007833, ndecimals)

        self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals)


    # @unittest.skip("sometimes takes too much time")
test_early_stopping.py 文件源码 项目:expan 作者: zalando 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_bayes_factor(self):
        """
        Check the Bayes factor function.
        """
        res = es.bayes_factor(self.rand_s1, self.rand_s2, num_iters=2000)

        self.assertEqual       (res['stop'],                  True)
        self.assertAlmostEqual (res['delta'],                -0.15887364780635896)
        value025 = find_list_of_dicts_element(res['confidence_interval'], 'percentile',  2.5, 'value')
        value975 = find_list_of_dicts_element(res['confidence_interval'], 'percentile', 97.5, 'value')
        self.assertAlmostEqual (value025,                     -0.24293384641452503)
        self.assertAlmostEqual (value975,                     -0.075064346336461404)
        self.assertEqual       (res['treatment_sample_size'],  1000)
        self.assertEqual       (res['control_sample_size'],    1000)
        self.assertAlmostEqual (res['treatment_mean'],        -0.045256707490195384)
        self.assertAlmostEqual (res['control_mean'],           0.11361694031616358)

    # @unittest.skip("sometimes takes too much time")
test_usage.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_get_by_user(self):
        response = self.admin_open(self.item_url('FooBarUser'))
        self.assertAPIError(response, 404, 'UserNotFound')

        response = self.admin_open(self.item_url(self.user.username))
        self.assert200(response)  # only Admin has permission

        validator = UsageResponseValidator()
        if not validator.validate_get(response.json):
            self.fail(validator.errors)

        # first user only
        self.assertEqual(len(response.json['data']['ip_usage']), 2)
        # first user only
        self.assertEqual(len(response.json['data']['pd_usage']), 2)

    # @unittest.skip('')
__init__.py 文件源码 项目:CSB 作者: csb-toolbox 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def skip(reason, condition=None):
    """
    Mark a test case or method for skipping.

    @param reason: message
    @type reason: str
    @param condition: skip only if the specified condition is True
    @type condition: bool/expression
    """
    if isinstance(reason, types.FunctionType):
        raise TypeError('skip: no reason specified')

    if condition is None:
        return unittest.skip(reason)
    else:
        return unittest.skipIf(condition, reason)
tests.py 文件源码 项目:nittygriddy 作者: cbourjau 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_download_one_file(self):
        try:
            shutil.rmtree("/tmp/nitty_test")
        except:
            pass
        remote_path = "/alice/cern.ch/user/c/cbourjau/nitty_test/AnalysisResults.root"
        utils.download_file(remote_path,
                            "/tmp/nitty_test/AnalysisResults.root")
        self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root"))
        shutil.rmtree("/tmp/nitty_test")
        # without specifiying dest file name:
        utils.download_file(remote_path,
                            "/tmp/nitty_test/")
        self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root"))
        shutil.rmtree("/tmp/nitty_test")

    # @skip("Skip download test")


问题


面经


文章

微信
公众号

扫码关注公众号