python类raises()的实例源码

test_yql_object.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_one(self):
        """Test that accessing one result raises exception"""
        yqlobj.one()
test_services.py 文件源码 项目:django-user-verification 作者: Fueled 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_send_verification_with_no_number(self):
        """ Test that send_verification with no number specified
            raises a ValueError
        """
        self.service.send_verification('', self.request)
test_services.py 文件源码 项目:django-user-verification 作者: Fueled 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_create_url_with_no_key(self):
        """ Test that create_url with no key
            raises ValueError
        """
        self.service.create_url(self.request, None)
test_watchers.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_bad_watch_func2(self):
        counter = 0

        @self.client.DataWatch(self.path)
        def changed(d, stat):
            if counter > 0:
                raise Exception("oops")

        raises(Exception)(changed)

        counter += 1
        self.client.set(self.path, b'asdfasdf')
test_watchers.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_bad_children_watch_func(self):
        counter = 0

        @self.client.ChildrenWatch(self.path)
        def changed(children):
            if counter > 0:
                raise Exception("oops")

        raises(Exception)(changed)
        counter += 1
        self.client.create(self.path + '/' + 'smith')
test_watchers.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_exception(self):
        from kazoo.exceptions import NoNodeError
        watcher = self._makeOne(self.client, self.path, 0.1)
        result = watcher.start()

        @raises(NoNodeError)
        def testit():
            result.get()
        testit()
test_eventlet_handler.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_with_no_block(self):
        handler = eventlet_handler.SequentialEventletHandler()

        @raises(handler.timeout_exception)
        def test_no_block(r):
            r.get(block=False)

        with start_stop_one(handler):
            r = handler.async_result()
            test_no_block(r)
            r.set(1)
            self.assertEqual(1, r.get())
test_connection.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_bad_deserialization(self):
        async_object = self.client.handler.async_result()
        self.client._queue.append(
            (Delete(self.client.chroot, -1), async_object))
        self.client._connection._write_sock.send(b'\0')

        @raises(ValueError)
        def testit():
            async_object.get()
        testit()
test_connection.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_read_only(self):
        from kazoo.exceptions import NotReadOnlyCallError
        from kazoo.protocol.states import KeeperState

        client = self.client
        states = []
        ev = threading.Event()

        @client.add_listener
        def listen(state):
            states.append(state)
            if client.client_state == KeeperState.CONNECTED_RO:
                ev.set()
        try:
            self.cluster[1].stop()
            self.cluster[2].stop()
            ev.wait(6)
            eq_(ev.is_set(), True)
            eq_(client.client_state, KeeperState.CONNECTED_RO)

            # Test read only command
            eq_(client.get_children('/'), [])

            # Test error with write command
            @raises(NotReadOnlyCallError)
            def testit():
                client.create('/fred')
            testit()

            # Wait for a ping
            time.sleep(15)
        finally:
            client.remove_listener(listen)
            self.cluster[1].run()
            self.cluster[2].run()
test_client.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_version_no_connection(self):
        @raises(ConnectionLoss)
        def testit():
            self.client.server_version()
        self.client.stop()
        testit()
test_client.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_bad_creates(self):
        args_list = [(True,), ('/smith', 0), ('/smith', b'', 'bleh'),
                     ('/smith', b'', None, 'fred'),
                     ('/smith', b'', None, True, 'fred')]

        @raises(TypeError)
        def testit(args):
            t = self.client.transaction()
            t.create(*args)

        for args in args_list:
            testit(args)
test_client.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_bad_deletes(self):
        args_list = [(True,), ('/smith', 'woops'), ]

        @raises(TypeError)
        def testit(args):
            t = self.client.transaction()
            t.delete(*args)

        for args in args_list:
            testit(args)
test_client.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_bad_sets(self):
        args_list = [(42, 52), ('/smith', False), ('/smith', b'', 'oops')]

        @raises(TypeError)
        def testit(args):
            t = self.client.transaction()
            t.set_data(*args)

        for args in args_list:
            testit(args)
test_client.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_bad_checks(self):
        args_list = [(42, 52), ('/smith', 'oops')]

        @raises(TypeError)
        def testit(args):
            t = self.client.transaction()
            t.check(*args)

        for args in args_list:
            testit(args)
test_client.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_bad_commit(self):
        t = self.client.transaction()

        @raises(ValueError)
        def testit():
            t.commit()

        t.committed = True
        testit()
test_threading_handler.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_exception_raising(self):
        h = self._makeOne()

        @raises(h.timeout_exception)
        def testit():
            raise h.timeout_exception("This is a timeout")
        testit()
test_threading_handler.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_get_with_nowait(self):
        mock_handler = mock.Mock()
        async = self._makeOne(mock_handler)
        timeout = self._makeHandler().timeout_exception

        @raises(timeout)
        def test_it():
            async.get(block=False)
        test_it()

        @raises(timeout)
        def test_nowait():
            async.get_nowait()
        test_nowait()
test_gevent_handler.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_exception_raising(self):
        h = self._makeOne()

        @raises(h.timeout_exception)
        def testit():
            raise h.timeout_exception("This is a timeout")
        testit()
test_datasets.py 文件源码 项目:deep-action-proposals 作者: escorciav 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_DatasetBase():
    ds = DatasetBase()
    nt.raises(NotImplementedError, ds.segments_info)
    nt.raises(NotImplementedError, ds.video_info)
test_integration.py 文件源码 项目:stor 作者: counsyl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_custom_encoding_text(self):
            test_file = self.test_dir / 'test_file.txt'
            with stor.open(test_file, mode='w', encoding='utf-16') as fp:
                fp.write(STRING_STRING)

            with stor.open(test_file, mode='r', encoding='utf-16') as fp:
                result = fp.read()
            assert result == STRING_STRING

            with pytest.raises(UnicodeDecodeError):
                with stor.open(test_file, mode='r', encoding='utf-8') as fp:
                    result = fp.read()


问题


面经


文章

微信
公众号

扫码关注公众号