def test_delete_cluster_member_with_container_manager(self):
"""
Verify that delete_cluster_member handles a container manager
"""
bus = mock.MagicMock()
cluster = Cluster.new(
name='test', hostset=['127.0.0.1'],
container_manager=C.CONTAINER_MANAGER_OPENSHIFT)
bus.storage.get_cluster.return_value = cluster
bus.storage.save.return_value = None
self.assertEquals(
create_jsonrpc_response(ID, []),
clusters.delete_cluster_member.handler(CHECK_CLUSTER_REQUEST, bus))
# Verify we had a 'container.remove_node'
bus.request.assert_called_with('container.remove_node', params=mock.ANY)
python类ANY的实例源码
test_authentication_manager.py 文件源码
项目:commissaire-http
作者: projectatomic
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_authentication_manager_multi_complex_deny(self):
"""
Verify AuthenticationManager handles the complex forbidden case with multiple authenticators.
"""
start_response = mock.MagicMock()
response_code = '402 Payment Required'
expected_result = [bytes('$$$', 'utf8')]
def complex_auth(environ, start_response):
start_response(response_code, [])
return expected_result
self.authentication_manager.authenticators = [
mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
mock.MagicMock(authenticate=complex_auth),
mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
]
result = self.authentication_manager(create_environ(), start_response)
self.assertEquals(expected_result, result)
start_response.assert_called_once_with(response_code, mock.ANY)
test_authentication_manager.py 文件源码
项目:commissaire-http
作者: projectatomic
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_authentication_manager_multi_complex_allow(self):
"""
Verify AuthenticationManager handles the complex allow case with multiple authenticators.
"""
expected_result = [bytes('itrustyou', 'utf8')]
def complex_auth(environ, start_response):
start_response('200 OK', [])
return expected_result
start_response = mock.MagicMock()
self.authentication_manager.authenticators = [
mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
mock.MagicMock(authenticate=complex_auth),
mock.MagicMock(authenticate=mock.MagicMock(return_value=False)),
]
result = self.authentication_manager(create_environ(), start_response)
self.assertEquals(expected_result, result)
start_response.assert_called_once_with('200 OK', mock.ANY)
def test_callback_with_exception(self):
def callback():
raise ValueError()
self.loop = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
h = asyncio.Handle(callback, (), self.loop)
h._run()
self.loop.call_exception_handler.assert_called_with({
'message': test_utils.MockPattern('Exception in callback.*'),
'exception': mock.ANY,
'handle': h,
'source_traceback': h._source_traceback,
})
def test_lock_one_retry(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
(False, 1),
(True, 1)
])
lock = await lock_manager.lock('resource')
calls = [
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
assert lock.resource == 'resource'
assert lock.id == ANY
assert lock.valid is True
def test_lock_expire_retries(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
(False, 1),
(False, 1),
(False, 1)
])
lock = await lock_manager.lock('resource')
calls = [
call('resource', ANY),
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
assert lock.resource == 'resource'
assert lock.id == ANY
assert lock.valid is False
def test_lock_one_timeout(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
(True, 1500),
(True, 1)
])
lock = await lock_manager.lock('resource')
calls = [
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
assert lock.resource == 'resource'
assert lock.id == ANY
assert lock.valid is True
def test_lock_expire_retries_for_timeouts(self, lock_manager_redis_patched, locked_lock):
lock_manager, redis = lock_manager_redis_patched
redis.set_lock = CoroutineMock(side_effect=[
(True, 1100),
(True, 1001),
(True, 2000)
])
lock = await lock_manager.lock('resource')
calls = [
call('resource', ANY),
call('resource', ANY),
call('resource', ANY)
]
redis.set_lock.assert_has_calls(calls)
assert lock.resource == 'resource'
assert lock.id == ANY
assert lock.valid is False
def test_000_simple(self, mock_backup, mock_getpass, mock_input):
mock_getpass.return_value = 'testpass'
mock_input.return_value = 'Y'
vm1 = BackupVM()
vm1.name = 'test-vm'
vm1.backup_path = 'path/in/backup'
vm1.template = None
vm1.klass = 'StandaloneVM'
vm1.label = 'red'
mock_restore_info = {
1: BackupRestore.VMToRestore(vm1),
}
mock_backup.configure_mock(**{
'return_value.get_restore_summary.return_value': '',
'return_value.get_restore_info.return_value': mock_restore_info,
})
with mock.patch('qubesadmin.tools.qvm_backup_restore.handle_broken') \
as mock_handle_broken:
qubesadmin.tools.qvm_backup_restore.main(['/some/path'],
app=self.app)
mock_handle_broken.assert_called_once_with(
self.app, mock.ANY, mock_restore_info)
mock_backup.assert_called_once_with(
self.app, '/some/path', None, 'testpass')
self.assertAllCalled()
def test_search_found(self, mock_requests_get):
search_query = 'test'
search_results = MalSearchResponseBuilder()
search_results.add_result({'title': search_query})
mock_requests_get.return_value = mock.Mock(
status_code=200,
text=search_results.get_response_xml()
)
results = self.mal.search(search_query)
mock_requests_get.assert_called_with(
ANY,
params=dict(q=search_query),
auth=ANY,
headers=ANY
)
self.assertTrue(len(results) == 1)
first_result = results[0]
self.assertTrue(first_result['title'] == search_query)
def test_search_found_more_than_one(self, mock_requests_get):
search_query = 'test'
search_results = MalSearchResponseBuilder()
search_results.add_result({'title': 'test1'})
search_results.add_result({'title': 'test2'})
mock_requests_get.return_value = mock.Mock(
status_code=200,
text=search_results.get_response_xml()
)
results = self.mal.search(search_query)
mock_requests_get.assert_called_with(
ANY,
params=dict(q=search_query),
auth=ANY,
headers=ANY
)
self.assertTrue(len(results) > 1)
for result in results:
self.assertTrue(search_query in result['title'])
def test_update_post(self, mock_requests_post):
item_id = 1
entry = {'episode': 10}
expected_xml = '<entry><episode>10</episode></entry>'
expected_response_code = 200
xml_header='<?xml version="1.0" encoding="UTF-8"?>'
mock_requests_post.return_value = mock.Mock(
status_code=expected_response_code
)
result = self.mal.update(item_id, entry)
mock_requests_post.assert_called_with(
'https://myanimelist.net/api/animelist/update/{0}.xml'.format(
item_id),
data={'data': xml_header + expected_xml},
auth=(MOCK_USER, MOCK_PASS),
headers=ANY
)
self.assertTrue(result == expected_response_code)
def test_exception_non_waited_job(make_scheduler, loop):
exc_handler = mock.Mock()
scheduler = await make_scheduler(exception_handler=exc_handler)
exc = RuntimeError()
async def coro():
await asyncio.sleep(0, loop=loop)
raise exc
await scheduler.spawn(coro())
assert len(scheduler) == 1
await asyncio.sleep(0.05, loop=loop)
assert len(scheduler) == 0
expect = {'exception': exc,
'job': mock.ANY,
'message': 'Job processing failed'}
if loop.get_debug():
expect['source_traceback'] = mock.ANY
exc_handler.assert_called_with(scheduler, expect)
def test_exception_on_close(make_scheduler, loop):
exc_handler = mock.Mock()
scheduler = await make_scheduler(exception_handler=exc_handler)
exc = RuntimeError()
fut = asyncio.Future()
async def coro():
fut.set_result(None)
raise exc
await scheduler.spawn(coro())
assert len(scheduler) == 1
await scheduler.close()
assert len(scheduler) == 0
expect = {'exception': exc,
'job': mock.ANY,
'message': 'Job processing failed'}
if loop.get_debug():
expect['source_traceback'] = mock.ANY
exc_handler.assert_called_with(scheduler, expect)
def test_timeout_on_closing(make_scheduler, loop):
exc_handler = mock.Mock()
scheduler = await make_scheduler(exception_handler=exc_handler,
close_timeout=0.01)
fut1 = asyncio.Future()
fut2 = asyncio.Future()
async def coro():
try:
await fut1
except asyncio.CancelledError:
await fut2
job = await scheduler.spawn(coro())
await asyncio.sleep(0.001, loop=loop)
await scheduler.close()
assert job.closed
assert fut1.cancelled()
expect = {'message': 'Job closing timed out',
'job': job,
'exception': mock.ANY}
if loop.get_debug():
expect['source_traceback'] = mock.ANY
exc_handler.assert_called_with(scheduler, expect)
def test_exception_on_closing(make_scheduler, loop):
exc_handler = mock.Mock()
scheduler = await make_scheduler(exception_handler=exc_handler)
fut = asyncio.Future()
exc = RuntimeError()
async def coro():
fut.set_result(None)
raise exc
job = await scheduler.spawn(coro())
await fut
await scheduler.close()
assert job.closed
expect = {'message': 'Job processing failed',
'job': job,
'exception': exc}
if loop.get_debug():
expect['source_traceback'] = mock.ANY
exc_handler.assert_called_with(scheduler, expect)
def test_callback_with_exception(self):
def callback():
raise ValueError()
self.loop = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
h = asyncio.Handle(callback, (), self.loop)
h._run()
self.loop.call_exception_handler.assert_called_with({
'message': test_utils.MockPattern('Exception in callback.*'),
'exception': mock.ANY,
'handle': h,
'source_traceback': h._source_traceback,
})
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker):
mktree(tmpworkdir, {
'app.py': """\
from aiohttp import web
async def hello(request):
return web.Response(text='<h1>hello world</h1>', content_type='text/html')
app = web.Application()
app.router.add_get('/', hello)
"""
})
asyncio.set_event_loop(loop)
mocker.spy(loop, 'create_server')
mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app')
loop.call_later(0.5, loop.stop)
config = Config(app_path='app.py')
serve_main_app(config, '/dev/tty')
assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128)
mock_modify_main_app.assert_called_with(mock.ANY, config)
def test_data_preserves_translated_strings(db_connection):
data.add_language('polish', 'pl')
resource_pk = data.add_resource('r').pk
string_pk = data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx')
data.set_translated_string('pl', string_pk, translation='y', translator_comment='tcomment')
preserved_strings = list(data.get_translated_strings('pl', resource_pk))
assert preserved_strings == [
dila.application.structures.TranslatedStringData(
pk=mock.ANY,
base_string='x',
plural='',
context='ctx',
translation='y',
comment='comment',
translator_comment='tcomment',
resource_pk=resource_pk,
plural_translations=None,
)]
def test_adding_the_same_sting(db_connection):
data.add_language('polish', 'pl')
resource_pk = data.add_resource('r').pk
data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx')
data.add_or_update_base_string(resource_pk, 'x', comment='lolz', context='ctx')
preserved_strings = list(data.get_translated_strings('pl', resource_pk))
assert preserved_strings == [
dila.application.structures.TranslatedStringData(
pk=mock.ANY,
base_string='x',
plural='',
context='ctx',
translation='',
comment='lolz',
translator_comment='',
resource_pk=resource_pk,
plural_translations=None,
)]