python类ANY的实例源码

test_handlers_clusters.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_delete_cluster_member_with_container_manager(self):
        """
        Verify that delete_cluster_member handles a container manager
        """
        bus = mock.MagicMock()
        cluster = Cluster.new(
            name='test', hostset=['127.0.0.1'],
            container_manager=C.CONTAINER_MANAGER_OPENSHIFT)

        bus.storage.get_cluster.return_value = cluster
        bus.storage.save.return_value = None

        self.assertEquals(
            create_jsonrpc_response(ID, []),
            clusters.delete_cluster_member.handler(CHECK_CLUSTER_REQUEST, bus))

        # Verify we had a 'container.remove_node'
        bus.request.assert_called_with('container.remove_node', params=mock.ANY)
test_authentication_manager.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_authentication_manager_multi_complex_deny(self):
        """
        Verify AuthenticationManager handles the complex forbidden case with multiple authenticators.
        """
        start_response = mock.MagicMock()
        response_code = '402 Payment Required'
        expected_result = [bytes('$$$', 'utf8')]
        def complex_auth(environ, start_response):
            start_response(response_code, [])
            return expected_result

        self.authentication_manager.authenticators = [
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
            mock.MagicMock(authenticate=complex_auth),
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
        ]
        result = self.authentication_manager(create_environ(), start_response)
        self.assertEquals(expected_result, result)
        start_response.assert_called_once_with(response_code, mock.ANY)
test_authentication_manager.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_authentication_manager_multi_complex_allow(self):
        """
        Verify AuthenticationManager handles the complex allow case with multiple authenticators.
        """
        expected_result = [bytes('itrustyou', 'utf8')]
        def complex_auth(environ, start_response):
            start_response('200 OK', [])
            return expected_result

        start_response = mock.MagicMock()
        self.authentication_manager.authenticators = [
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
            mock.MagicMock(authenticate=complex_auth),
            mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
        ]
        result = self.authentication_manager(create_environ(), start_response)
        self.assertEquals(expected_result, result)
        start_response.assert_called_once_with('200 OK', mock.ANY)
test_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_callback_with_exception(self):
        def callback():
            raise ValueError()

        self.loop = mock.Mock()
        self.loop.call_exception_handler = mock.Mock()

        h = asyncio.Handle(callback, (), self.loop)
        h._run()

        self.loop.call_exception_handler.assert_called_with({
            'message': test_utils.MockPattern('Exception in callback.*'),
            'exception': mock.ANY,
            'handle': h,
            'source_traceback': h._source_traceback,
        })
test_algorithm.py 文件源码 项目:aioredlock 作者: joanvila 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_lock_one_retry(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (False, 1),
            (True, 1)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is True
test_algorithm.py 文件源码 项目:aioredlock 作者: joanvila 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_lock_expire_retries(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (False, 1),
            (False, 1),
            (False, 1)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is False
test_algorithm.py 文件源码 项目:aioredlock 作者: joanvila 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_lock_one_timeout(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (True, 1500),
            (True, 1)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is True
test_algorithm.py 文件源码 项目:aioredlock 作者: joanvila 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_lock_expire_retries_for_timeouts(self, lock_manager_redis_patched, locked_lock):
        lock_manager, redis = lock_manager_redis_patched
        redis.set_lock = CoroutineMock(side_effect=[
            (True, 1100),
            (True, 1001),
            (True, 2000)
        ])

        lock = await lock_manager.lock('resource')

        calls = [
            call('resource', ANY),
            call('resource', ANY),
            call('resource', ANY)
        ]
        redis.set_lock.assert_has_calls(calls)
        assert lock.resource == 'resource'
        assert lock.id == ANY
        assert lock.valid is False
qvm_backup_restore.py 文件源码 项目:qubes-core-admin-client 作者: QubesOS 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_000_simple(self, mock_backup, mock_getpass, mock_input):
        mock_getpass.return_value = 'testpass'
        mock_input.return_value = 'Y'
        vm1 = BackupVM()
        vm1.name = 'test-vm'
        vm1.backup_path = 'path/in/backup'
        vm1.template = None
        vm1.klass = 'StandaloneVM'
        vm1.label = 'red'
        mock_restore_info = {
            1: BackupRestore.VMToRestore(vm1),
        }
        mock_backup.configure_mock(**{
            'return_value.get_restore_summary.return_value': '',
            'return_value.get_restore_info.return_value': mock_restore_info,
        })
        with mock.patch('qubesadmin.tools.qvm_backup_restore.handle_broken') \
                as mock_handle_broken:
            qubesadmin.tools.qvm_backup_restore.main(['/some/path'],
                app=self.app)
            mock_handle_broken.assert_called_once_with(
                self.app, mock.ANY, mock_restore_info)
        mock_backup.assert_called_once_with(
            self.app, '/some/path', None, 'testpass')
        self.assertAllCalled()
test_api.py 文件源码 项目:mal 作者: ryukinix 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_search_found(self, mock_requests_get):
        search_query = 'test'
        search_results = MalSearchResponseBuilder()
        search_results.add_result({'title': search_query})
        mock_requests_get.return_value = mock.Mock(
            status_code=200,
            text=search_results.get_response_xml()
        )
        results = self.mal.search(search_query)

        mock_requests_get.assert_called_with(
            ANY,
            params=dict(q=search_query),
            auth=ANY,
            headers=ANY
        )

        self.assertTrue(len(results) == 1)
        first_result = results[0]
        self.assertTrue(first_result['title'] == search_query)
test_api.py 文件源码 项目:mal 作者: ryukinix 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_search_found_more_than_one(self, mock_requests_get):
        search_query = 'test'
        search_results = MalSearchResponseBuilder()
        search_results.add_result({'title': 'test1'})
        search_results.add_result({'title': 'test2'})
        mock_requests_get.return_value = mock.Mock(
            status_code=200,
            text=search_results.get_response_xml()
        )
        results = self.mal.search(search_query)

        mock_requests_get.assert_called_with(
            ANY,
            params=dict(q=search_query),
            auth=ANY,
            headers=ANY
        )

        self.assertTrue(len(results) > 1)
        for result in results:
            self.assertTrue(search_query in result['title'])
test_api.py 文件源码 项目:mal 作者: ryukinix 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_update_post(self, mock_requests_post):
        item_id = 1
        entry = {'episode': 10}
        expected_xml = '<entry><episode>10</episode></entry>'
        expected_response_code = 200

        xml_header='<?xml version="1.0" encoding="UTF-8"?>'
        mock_requests_post.return_value = mock.Mock(
            status_code=expected_response_code
        )
        result = self.mal.update(item_id, entry)

        mock_requests_post.assert_called_with(
            'https://myanimelist.net/api/animelist/update/{0}.xml'.format(
                item_id),
            data={'data': xml_header + expected_xml},
            auth=(MOCK_USER, MOCK_PASS),
            headers=ANY
        )

        self.assertTrue(result == expected_response_code)
test_scheduler.py 文件源码 项目:aiojobs 作者: aio-libs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_exception_non_waited_job(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler)
    exc = RuntimeError()

    async def coro():
        await asyncio.sleep(0, loop=loop)
        raise exc

    await scheduler.spawn(coro())
    assert len(scheduler) == 1

    await asyncio.sleep(0.05, loop=loop)

    assert len(scheduler) == 0

    expect = {'exception': exc,
              'job': mock.ANY,
              'message': 'Job processing failed'}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
test_scheduler.py 文件源码 项目:aiojobs 作者: aio-libs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_exception_on_close(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler)
    exc = RuntimeError()

    fut = asyncio.Future()

    async def coro():
        fut.set_result(None)
        raise exc

    await scheduler.spawn(coro())
    assert len(scheduler) == 1

    await scheduler.close()

    assert len(scheduler) == 0

    expect = {'exception': exc,
              'job': mock.ANY,
              'message': 'Job processing failed'}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
test_scheduler.py 文件源码 项目:aiojobs 作者: aio-libs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_timeout_on_closing(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler,
                                     close_timeout=0.01)
    fut1 = asyncio.Future()
    fut2 = asyncio.Future()

    async def coro():
        try:
            await fut1
        except asyncio.CancelledError:
            await fut2

    job = await scheduler.spawn(coro())
    await asyncio.sleep(0.001, loop=loop)
    await scheduler.close()
    assert job.closed
    assert fut1.cancelled()
    expect = {'message': 'Job closing timed out',
              'job': job,
              'exception': mock.ANY}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
test_scheduler.py 文件源码 项目:aiojobs 作者: aio-libs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_exception_on_closing(make_scheduler, loop):
    exc_handler = mock.Mock()
    scheduler = await make_scheduler(exception_handler=exc_handler)
    fut = asyncio.Future()
    exc = RuntimeError()

    async def coro():
        fut.set_result(None)
        raise exc

    job = await scheduler.spawn(coro())
    await fut
    await scheduler.close()
    assert job.closed
    expect = {'message': 'Job processing failed',
              'job': job,
              'exception': exc}
    if loop.get_debug():
        expect['source_traceback'] = mock.ANY
    exc_handler.assert_called_with(scheduler, expect)
test_events.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_callback_with_exception(self):
        def callback():
            raise ValueError()

        self.loop = mock.Mock()
        self.loop.call_exception_handler = mock.Mock()

        h = asyncio.Handle(callback, (), self.loop)
        h._run()

        self.loop.call_exception_handler.assert_called_with({
            'message': test_utils.MockPattern('Exception in callback.*'),
            'exception': mock.ANY,
            'handle': h,
            'source_traceback': h._source_traceback,
        })
test_runserver_main.py 文件源码 项目:aiohttp-devtools 作者: aio-libs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker):
    mktree(tmpworkdir, {
        'app.py': """\
from aiohttp import web

async def hello(request):
    return web.Response(text='<h1>hello world</h1>', content_type='text/html')

app = web.Application()
app.router.add_get('/', hello)
"""
    })
    asyncio.set_event_loop(loop)
    mocker.spy(loop, 'create_server')
    mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app')
    loop.call_later(0.5, loop.stop)

    config = Config(app_path='app.py')
    serve_main_app(config, '/dev/tty')

    assert loop.is_closed()
    loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128)
    mock_modify_main_app.assert_called_with(mock.ANY, config)
test_data.py 文件源码 项目:dila 作者: socialwifi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_data_preserves_translated_strings(db_connection):
    data.add_language('polish', 'pl')
    resource_pk = data.add_resource('r').pk
    string_pk = data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx')
    data.set_translated_string('pl', string_pk, translation='y', translator_comment='tcomment')
    preserved_strings = list(data.get_translated_strings('pl', resource_pk))
    assert preserved_strings == [
        dila.application.structures.TranslatedStringData(
            pk=mock.ANY,
            base_string='x',
            plural='',
            context='ctx',
            translation='y',
            comment='comment',
            translator_comment='tcomment',
            resource_pk=resource_pk,
            plural_translations=None,
        )]
test_data.py 文件源码 项目:dila 作者: socialwifi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_adding_the_same_sting(db_connection):
    data.add_language('polish', 'pl')
    resource_pk = data.add_resource('r').pk
    data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx')
    data.add_or_update_base_string(resource_pk, 'x', comment='lolz', context='ctx')
    preserved_strings = list(data.get_translated_strings('pl', resource_pk))
    assert preserved_strings == [
        dila.application.structures.TranslatedStringData(
            pk=mock.ANY,
            base_string='x',
            plural='',
            context='ctx',
            translation='',
            comment='lolz',
            translator_comment='',
            resource_pk=resource_pk,
            plural_translations=None,
        )]


问题


面经


文章

微信
公众号

扫码关注公众号