def test_checkout_exception(self):
self.c.login(username=self.user.username, password='str0ngpa$$w0rd')
# Test check out lendable raises exception
with patch.object(Lendable,
'checkout',
side_effect=Exception('Checkout Failed!')):
response = self.c.get(reverse('library:checkout',
args=['lendable']),
follow=True)
# Confirm error message displayed
message = list(response.context['messages'])[0].message
self.assertEqual(message, 'Checkout Failed!')
# Confirm lendable not created
self.assertEqual(Lendable.all_types.count(), 0)
python类object()的实例源码
def test_set_username(self):
"""Test set username method."""
# Test unicode converts to ascii J?hn to John
with patch.object(AmazonAccountUtils,
'iam_user_exists',
return_value=False):
self.aws_account._set_username()
self.assertEqual(self.aws_account.username, 'John')
# Test random username generated if iam user exists
with patch.object(AmazonAccountUtils,
'iam_user_exists',
return_value=True):
self.aws_account._set_username()
self.assertNotEqual(self.aws_account.username, 'John')
self.assertEqual(len(self.aws_account.username), 20)
# Test random username is generated when regex invalid
# after conversion. ??? converts to ???.
self.user.username = '???'
self.aws_account._set_username()
self.assertNotEqual(self.aws_account.username, '???')
self.assertEqual(len(self.aws_account.username), 20)
def setUp(self):
# Cleanup
self.addCleanup(patch.stopall)
# Request patch
self.request = patch.object(module, 'request').start()
# Various patches
self.services = patch.object(module, 'services').start()
self.original_config = dict(module.config)
module.config['STORAGE_BUCKET_NAME'] = self.bucket = 'buckbuck'
module.config['STORAGE_ACCESS_KEY_ID'] = ''
module.config['STORAGE_SECRET_ACCESS_KEY'] = ''
module.config['ACCESS_KEY_EXPIRES_IN'] = ''
module.config['STORAGE_PATH_PATTERN'] = '{owner}/{dataset}/{path}'
self.s3 = boto3.client('s3')
def async_test_scarlett_os(loop):
"""Return a ScarlettOS object pointing at test config dir."""
# loop._thread_ident = threading.get_ident()
ss = s.ScarlettSystem(loop)
ss.config.location_name = 'test scarlett'
ss.config.config_dir = get_test_config_dir()
ss.config.latitude = 32.87336
ss.config.longitude = -117.22743
ss.config.elevation = 0
ss.config.time_zone = date_utility.get_time_zone('US/Pacific')
ss.config.units = METRIC_SYSTEM
ss.config.skip_pip = True
# if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS:
# yield from loop.run_in_executor(None, loader.prepare, ss)
ss.state = s.CoreState.running
return ss
def test_close_disabled(self):
# Second case: time is beyond close deadline, and there is no comment yet
# but the global option to allow closing has not been enabled. Since there
# is no comment, the warning gets posted (rather than the 'epilogue')
self.get_issues.return_value = ['123']
self.get_label_added_date.return_value = now() - 34443
self.find_comments.return_value = []
with app.app_context():
with patch.object(app, 'stale_issue_close', False):
process_issues('repo', 'installation')
assert self.submit_comment.call_count == 1
expected = ISSUE_CLOSE_WARNING.format(pasttime='9 hours ago', futuretime='5 hours')
self.submit_comment.assert_called_with(expected)
assert self.close.call_count == 0
def setup_method(self, method):
self.patch_repo_config = patch.object(RepoHandler, 'get_config_value')
self.patch_open_pull_requests = patch.object(RepoHandler, 'open_pull_requests')
self.patch_labels = patch.object(PullRequestHandler, 'labels', new_callable=PropertyMock)
self.patch_last_commit_date = patch.object(PullRequestHandler, 'last_commit_date', new_callable=PropertyMock)
self.patch_find_comments = patch.object(PullRequestHandler, 'find_comments')
self.patch_submit_comment = patch.object(PullRequestHandler, 'submit_comment')
self.patch_close = patch.object(PullRequestHandler, 'close')
self.autoclose_stale = self.patch_repo_config.start()
self.open_pull_requests = self.patch_open_pull_requests.start()
self.labels = self.patch_labels.start()
self.last_commit_date = self.patch_last_commit_date.start()
self.find_comments = self.patch_find_comments.start()
self.submit_comment = self.patch_submit_comment.start()
self.close = self.patch_close.start()
def test_close_disabled(self):
# Time is beyond close deadline, and there is no comment yet but the
# global option to allow closing has not been enabled. Since there is no
# comment, the warning gets posted (rather than the 'epilogue')
self.open_pull_requests.return_value = ['123']
self.last_commit_date.return_value = now() - 241
self.find_comments.return_value = []
with app.app_context():
with patch.object(app, 'stale_pull_requests_close', False):
process_pull_requests('repo', 'installation')
assert self.submit_comment.call_count == 1
expected = PULL_REQUESTS_CLOSE_WARNING.format(pasttime='4 minutes', futuretime='20 seconds')
self.submit_comment.assert_called_with(expected)
assert self.close.call_count == 0
def test_request_error_handler(api_path, _error_handler=True):
client = api_path._client
with patch.object(client, 'request', side_effect=dummy) as client_request:
with patch.object(client, 'error_handler',
side_effect=dummy_error_handler) as error_handler:
client.loop.run_until_complete(
requests.Request(api_path, 'get',
_error_handling=_error_handler,
test=1, _test=2)
)
assert client_request.called_with(method='get',
url=url,
skip_params=False,
test=2,
params={'test': 1})
assert error_handler.called is _error_handler
def test_error_handler_service_unavailable():
global tries
tries = 4
async def service_unavailable(**kwargs):
global tries
tries -= 1
if tries > 0:
response = MockResponse(status=503)
raise await exceptions.throw(response)
else:
return MockResponse()
with pytest.raises(exceptions.ServiceUnavailable):
with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep:
await utils.error_handler(service_unavailable)()
assert sleep.called
await utils.error_handler(service_unavailable)()
def test_streaming_apis(dummy_client):
with patch.object(dummy_client, 'request', side_effect=dummy) as request:
await dummy_client.api.test.get()
assert request.called
with patch.object(dummy_client, 'stream_request') as request:
dummy_client.stream.test.get()
assert request.called
client = BasePeonyClient("", "", streaming_apis={'api'})
with patch.object(client, 'stream_request') as request:
client.api.test.get()
assert request.called
with patch.object(client, 'request', side_effect=dummy) as request:
await client.stream.test.get()
assert request.called
def test_request_proxy(dummy_client):
class RaiseProxy:
def __init__(self, *args, proxy=None, **kwargs):
raise RuntimeError(proxy)
async with aiohttp.ClientSession() as session:
with patch.object(session, 'request', side_effect=RaiseProxy):
try:
await dummy_client.request(method='get',
url="http://hello.com",
proxy="http://some.proxy.com",
session=session,
future=asyncio.Future())
except RuntimeError as e:
assert str(e) == "http://some.proxy.com"
def test_chunked_upload_fail(dummy_peony_client, medias):
media = medias['video']
media_data = await media.download()
chunk_size = 1024**2
dummy_request = DummyRequest(dummy_peony_client, media, chunk_size, True)
with patch.object(dummy_peony_client, 'request',
side_effect=dummy_request):
with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep:
with pytest.raises(peony.exceptions.MediaProcessingError):
await dummy_peony_client.upload_media(
media_data, chunk_size=chunk_size, chunked=True
)
sleep.assert_called_with(5)
def test_upload_from_url(dummy_peony_client, medias, media_request, url):
async def dummy_get(get_url):
assert get_url == url
return media_request
async def dummy_request(url, method, future, data=None, skip_params=None):
assert url == dummy_peony_client.upload.media.upload.url()
assert method.lower() == 'post'
assert data['media'] == media_request.content
assert skip_params
future.set_result(None)
with patch.object(dummy_peony_client, '_session') as session:
session.get = dummy_get
with patch.object(dummy_peony_client, 'request',
side_effect=dummy_request):
await dummy_peony_client.upload_media(url)
def test_stream_reconnection_disconnection(stream):
async def dummy(*args, **kwargs):
pass
turn = -1
with patch.object(stream, '_connect', side_effect=response_disconnection):
with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
async for data in stream:
assert stream._state == DISCONNECTION
turn += 1
if turn == 0:
assert data == {'connected': True}
elif turn % 2 == 1:
timeout = DISCONNECTION_TIMEOUT * (turn + 1) / 2
if timeout > MAX_DISCONNECTION_TIMEOUT:
actual = data['reconnecting_in']
assert actual == MAX_DISCONNECTION_TIMEOUT
break
assert data == {'reconnecting_in': timeout, 'error': None}
else:
assert data == {'stream_restart': True}
def test_stream_reconnection_reconnect(stream):
async def dummy(*args, **kwargs):
pass
turn = -1
with patch.object(stream, '_connect', side_effect=response_reconnection):
with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
async for data in stream:
assert stream._state == RECONNECTION
turn += 1
if turn == 0:
assert data == {'connected': True}
elif turn % 2 == 1:
timeout = RECONNECTION_TIMEOUT * 2**(turn // 2)
if timeout > MAX_RECONNECTION_TIMEOUT:
actual = data['reconnecting_in']
assert actual == MAX_RECONNECTION_TIMEOUT
break
assert data == {'reconnecting_in': timeout, 'error': None}
else:
assert data == {'stream_restart': True}
def test_stream_reconnection_enhance_your_calm(stream):
async def dummy(*args, **kwargs):
pass
turn = -1
with patch.object(stream, '_connect', side_effect=response_calm):
with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy):
async for data in stream:
assert stream._state == ENHANCE_YOUR_CALM
turn += 1
if turn >= 100:
break
if turn == 0:
assert data == {'connected': True}
elif turn % 2 == 1:
timeout = ENHANCE_YOUR_CALM_TIMEOUT * 2**(turn // 2)
assert data == {'reconnecting_in': timeout, 'error': None}
else:
assert data == {'stream_restart': True}
def test_stream_cancel(event_loop):
async def cancel(task):
await asyncio.sleep(0.001)
task.cancel()
async def test_stream_iterations(stream):
while True:
await test_stream_iteration(stream)
with aiohttp.ClientSession(loop=event_loop) as session:
client = peony.client.BasePeonyClient("", "", session=session)
context = peony.stream.StreamResponse(method='GET',
url="http://whatever.com",
client=client)
with context as stream:
with patch.object(stream, '_connect',
side_effect=stream_content):
coro = test_stream_iterations(stream)
task = event_loop.create_task(coro)
cancel_task = event_loop.create_task(cancel(task))
with aiohttp.Timeout(1):
await asyncio.wait([task, cancel_task])
def post(self, _data=None, _headers=None, **kwargs):
self.count += 1
if _data is not None:
with patch.object(oauth.aiohttp.payload, 'BytesPayload',
side_effect=dummy_func):
assert _data._gen_form_urlencoded() == b"access_token=abc"
if _headers is not None:
key = "1234567890:0987654321"
auth = base64.b64encode(key.encode('utf-8')).decode('utf-8')
assert _headers['Authorization'] == 'Basic ' + auth
# This is needed to run `test_oauth2_concurrent_refreshes`
# without that, refresh tasks would be executed sequentially
# In a sense it is a simulation of a request being fetched
await asyncio.sleep(0.001)
return {'access_token': "abc"}
def test_full_program_user_serialization_with__no_passed_course(self):
"""
Tests that full ProgramEnrollment serialization works as expected when user
has no passed courses.
"""
with patch.object(MMTrack, 'count_courses_passed', return_value=0):
self.profile.refresh_from_db()
program = self.program_enrollment.program
expected_result = {
'id': program.id,
'enrollments': self.serialized_enrollments,
'grade_average': 75,
'is_learner': True,
'num_courses_passed': 0,
'total_courses': 1
}
serialized_enrollments = UserProgramSearchSerializer.serialize(self.program_enrollment)
assert serialized_enrollments == expected_result
def test_create(self):
"""Integration that asserts routing a message to the create channel.
Asserts response is correct and an object is created.
"""
json_content = self._send_and_consume("websocket.receive", self._build_message("testmodel", {
'action': 'create',
'pk': None,
'request_id': 'client-request-id',
'data': {'name': 'some-thing'}
}))
# it should create an object
self.assertEqual(TestModel.objects.count(), 1)
expected = {
'action': 'create',
'data': TestModelSerializer(TestModel.objects.first()).data,
'errors': [],
'request_id': 'client-request-id',
'response_status': 201
}
# it should respond with the serializer.data
self.assertEqual(json_content['payload'], expected)
def test_create_failure(self):
"""Integration that asserts error handling of a message to the create channel."""
json_content = self._send_and_consume('websocket.receive', self._build_message("testmodel", {
'action': 'create',
'pk': None,
'request_id': 'client-request-id',
'data': {},
}))
# it should not create an object
self.assertEqual(TestModel.objects.count(), 0)
expected = {
'action': 'create',
'data': None,
'request_id': 'client-request-id',
'errors': [{'name': ['This field is required.']}],
'response_status': 400
}
# it should respond with an error
self.assertEqual(json_content['payload'], expected)
def test_bad_permission_reply(self):
mock_has_perm = Mock()
mock_has_perm.return_value = False
with patch.object(TestModelResourceBinding, 'has_permission', mock_has_perm):
json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel',{
'action': 'named_detail',
'pk': 546,
'data': {},
'request_id': 'client-request-id'
}))
expected = {
'action': 'named_detail',
'errors': ['Permission Denied'],
'data': None,
'response_status': 401,
'request_id': 'client-request-id'
}
self.assertEqual(json_content['payload'], expected)
self.assertEqual(mock_has_perm.called, True)
def test_is_authenticated_permission(self):
with patch.object(TestModelResourceBinding, 'permission_classes', (IsAuthenticated,)):
content = {
'action': 'test_list',
'pk': None,
'data': {},
'request_id': 'client-request-id',
}
json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content))
# It should block the request
self.assertEqual(json_content['payload']['response_status'], 401)
user = User.objects.create(username="testuser", password="123")
self.client.force_login(user)
self.client._session_cookie = True
json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content))
self.assertEqual(json_content['payload']['response_status'], 200)
def test_parent_inheritance(aiohttp_session):
"""Test the first api object creation."""
mock_kwargs = {
'arg1': 10,
'arg2': 20,
}
child_kwargs = {
'arg3': 30,
}
parent = base.BaseApiObject(None, request_kwargs=mock_kwargs,
client_session=aiohttp_session)
child = base.BaseApiObject(parent, request_kwargs=child_kwargs)
assert child.client_session == aiohttp_session
assert child.loop == aiohttp_session.loop
assert child.request_kwargs == {
'arg1': 10,
'arg2': 20,
'arg3': 30,
}
def test_result_list(session):
"""Test the result object."""
obj = base.ResultList(parent=session, item_class=MockDataObject, resp={
"_metadata": {
"count": 2,
"next": None,
"previous": None,
},
"results": [
{
"attr1": "value1",
"attr2": "value2",
}, {
"attr1": "value3"
}],
})
next_list = session.loop.run_until_complete(obj.get_next())
previous_list = session.loop.run_until_complete(obj.get_previous())
assert obj.next is None
assert obj.previous is None
assert next_list is None
assert previous_list is None
assert len(obj) == 2
assert sorted([item.attr1 for item in obj]) == sorted(["value1", "value3"])
def test_fetch_by_trigger(self):
client = Client(self.api_url)
contact_manager = EventManager(client)
trigger_id = '1'
trigger = Trigger(client, 'Name', ['tag'], ['target'], 0, 1, id=trigger_id)
with patch.object(client, 'get', return_value={'list': []}) as get_mock:
contact_manager.fetch_by_trigger(trigger)
self.assertTrue(get_mock.called)
expected_request_data = {
'p': 0,
'size': MAX_FETCH_LIMIT
}
get_mock.assert_called_with('event/' + trigger_id, params=expected_request_data)
def test_fetch_by_id(self):
client = Client(self.api_url)
trigger_manager = TriggerManager(client)
trigger_id = '1'
state = {
'state': 'OK',
'trigger_id': trigger_id
}
trigger = {
'id': trigger_id,
'name': 'trigger_name',
'tags': ['tag'],
'targets': ['pattern'],
'warn_value': 0,
'error_value': 1
}
with patch.object(client, 'get', side_effect=[state, trigger]) as get_mock:
trigger = trigger_manager.fetch_by_id(trigger_id)
self.assertTrue(get_mock.called)
self.assertEqual(trigger_id, trigger.id)
def test_add_bad_response(self):
client = Client(self.api_url)
contact_manager = ContactManager(client)
contact_value = '#channel'
with patch.object(client, 'put', return_value={}) as put_mock, \
patch.object(client, 'get', return_value={'contacts': []}) as get_mock:
with self.assertRaises(ResponseStructureError):
contact_manager.add(contact_value, CONTACT_SLACK)
self.assertTrue(put_mock.called)
self.assertTrue(get_mock.called)
expected_request_data = {
'value': contact_value,
'type': CONTACT_SLACK
}
put_mock.assert_called_with('contact', json=expected_request_data)
def test_get_invalid_response(self):
def get(url, params, **kwargs):
return FakeResponse()
response = FakeResponse()
with patch.object(requests, 'get', side_effects=get, return_value=response) as mock_get:
test_path = 'test_path'
client = Client(TEST_API_URL, AUTH_HEADERS)
with self.assertRaises(InvalidJSONError):
client.get(test_path)
self.assertTrue(mock_get.called)
expected_url_call = TEST_API_URL + '/' + test_path
mock_get.assert_called_with(expected_url_call, headers=AUTH_HEADERS, auth=None)
def test_put_invalid_response(self):
test_data = {'test': 'test'}
def put(url, data, **kwargs):
return FakeResponse()
response = FakeResponse()
with patch.object(requests, 'put', side_effects=put, return_value=response) as mock_put:
test_path = 'test_path'
client = Client(TEST_API_URL, AUTH_HEADERS)
with self.assertRaises(InvalidJSONError):
client.put(test_path, data=test_data)
self.assertTrue(mock_put.called)
expected_url_call = TEST_API_URL + '/' + test_path
mock_put.assert_called_with(expected_url_call, data=test_data, headers=AUTH_HEADERS, auth=None)