def test_to_internal_value_non_existent(km_user_factory):
"""
If there is no media resource matching the provided ID, a
``ValidationError`` should be raised.
"""
with mock.patch(
'know_me.serializers.fields.MediaResourceField.context',
new_callable=mock.PropertyMock) as mock_context:
mock_context.return_value = {
'km_user': km_user_factory(),
}
field = fields.MediaResourceField()
with pytest.raises(ValidationError):
field.to_internal_value(1)
python类PropertyMock()的实例源码
def test_to_representation(media_resource_factory, serializer_context):
"""
``.to_representation()`` should return the serialized version of the
provided media resource.
"""
resource = media_resource_factory()
serializer = MediaResourceSerializer(resource, context=serializer_context)
with mock.patch(
'know_me.serializers.fields.MediaResourceField.context',
new_callable=mock.PropertyMock) as mock_context:
mock_context.return_value = serializer_context
field = fields.MediaResourceField()
assert field.to_representation(resource) == serializer.data
def test_lists_unreadable_android_directory_using_helper_method(self, mock_runner):
mock_response = mock.Mock()
mock_response.is_successful.return_value = True
type(mock_response).data = mock.PropertyMock(return_value={
'path': '/foo/bar',
'readable': False,
'writable': False,
'files': {}
})
mock_runner.return_value.get_last_message.return_value = mock_response
with capture(_ls_android, ['/foo/bar']) as o:
output = o
self.assertEqual(output, '\nReadable: No Writable: No\n')
def test_prints_ios_environment_via_platform_helpers(self, mock_runner):
mock_response = mock.Mock()
mock_response.is_successful.return_value = True
type(mock_response).data = mock.PropertyMock(return_value={'foo': '/bar'})
mock_runner.return_value.get_last_message.return_value = mock_response
with capture(_get_ios_environment) as o:
output = o
expected_output = """
Name Path
------ ------
foo /bar
"""
self.assertEqual(output, expected_output)
def test_prints_android_environment_via_platform_helpers(self, mock_runner):
mock_response = mock.Mock()
mock_response.is_successful.return_value = True
type(mock_response).data = mock.PropertyMock(return_value={'foo': '/bar'})
mock_runner.return_value.get_last_message.return_value = mock_response
with capture(_get_android_environment) as o:
output = o
expected_output = """
Name Path
------ ------
foo /bar
"""
self.assertEqual(output, expected_output)
def test_get_ssh_args_instance_name(env_mock, exit_mock):
with patch('os.getcwd') as cwd_mock:
cwd_mock.return_value = '/path/to/cwd'
project = MagicMock()
key_path_mock = PropertyMock(return_value='/path/to/key.pem')
type(project).key_path = key_path_mock
env_mock.is_initialized.return_value = True
env_mock.return_value.find_project.return_value = project
instance = MagicMock()
ip_mock = PropertyMock(return_value='0.0.0.0')
type(instance).ip = ip_mock
instance.get_user_name.return_value = 'test_user'
project.get_instance.return_value = instance
args = cli.get_ssh_args(['fooinst'])
env_mock.return_value.find_project.assert_called_with('/path/to/cwd')
project.get_instance.assert_called_with('fooinst')
key_path_mock.assert_called_with()
instance.get_user_name.assert_called_with()
ip_mock.assert_called_with()
assert len(args) == 3
assert args[0] == '/path/to/key.pem'
assert args[1] == 'test_user'
assert args[2] == '0.0.0.0'
def test_get_access_key(self, mock_session):
credentials = MagicMock()
access_key = PropertyMock(return_value='test')
mock_session.return_value.get_credentials.return_value = credentials
type(credentials).access_key = access_key
access_key = AwsClient().get_access_key()
self.assertEqual(access_key, 'test')
def test_add_lambda_permissions(self, mock_lambda_client, mock_config, mock_uuid):
type(mock_config).lambda_name = PropertyMock(return_value='test_name')
mock_uuid.return_value = 'test_uuid'
AwsClient().add_lambda_permissions('test_bucket')
self.assertEqual(mock_lambda_client.call_count, 1)
self.assertTrue(call().add_permission(Action='lambda:InvokeFunction', FunctionName='test_name', Principal='s3.amazonaws.com', SourceArn='arn:aws:s3:::test_bucket', StatementId='test_uuid')
in mock_lambda_client.mock_calls)
def setup_method(self, method):
# Closed PR already tested in TestHook.test_invalid()
pr_json = {'state': 'open',
'user': {'login': 'user'},
'head': {'repo': {'full_name': 'repo'},
'ref': 'branch'}}
self.patch_repo_config = patch.object(RepoHandler, 'get_config_value')
self.patch_pr_json = patch.object(
PullRequestHandler, 'json', new_callable=PropertyMock,
return_value=pr_json)
self.patch_pr_comments = patch.object(
PullRequestHandler, 'find_comments', return_value=[])
self.patch_pr_labels = patch.object(
PullRequestHandler, 'labels', new_callable=PropertyMock)
self.patch_submit_comment = patch.object(
PullRequestHandler, 'submit_comment', return_value='url')
self.patch_set_status = patch.object(PullRequestHandler, 'set_status')
self.patch_check_changelog = patch('changebot.blueprints.pull_request_checker.check_changelog_consistency')
self.changelog_cfg = self.patch_repo_config.start()
self.patch_pr_json.start()
self.comment_ids = self.patch_pr_comments.start()
self.labels = self.patch_pr_labels.start()
self.submit_comment = self.patch_submit_comment.start()
self.set_status = self.patch_set_status.start()
self.issues = self.patch_check_changelog.start()
def test_is_closed(self, state, answer):
with patch('changebot.github.github_api.IssueHandler.json', new_callable=PropertyMock) as mock_json: # noqa
mock_json.return_value = {'state': state}
assert self.issue.is_closed is answer
def test_company_profile_edit_condition_address_feature_flag_off(
letter_sent, preverified, expected, settings, sso_request
):
settings.FEATURE_COMPANIES_HOUSE_OAUTH2_ENABLED = False
view = views.CompanyProfileEditView()
view.request = sso_request
property_mock = PropertyMock(return_value={
'is_verification_letter_sent': letter_sent,
'verified_with_preverified_enrolment': preverified,
})
with patch.object(view, 'company_profile', new_callable=property_mock):
assert view.condition_show_address() is expected
def mock_snowboy(request):
path = (
'command_lifecycle.wakeword.SnowboyWakewordDetector.'
'wakeword_library_import_path'
)
stub = patch(path, PropertyMock(return_value='unittest.mock.Mock'))
yield stub.start()
stub.stop()
def build_request(self, method_return_value, FILES_return_value):
request = MagicMock()
method = PropertyMock(return_value=method_return_value)
type(request).method = method
FILES = PropertyMock(return_value=FILES_return_value)
type(request).FILES = FILES
return request, method, FILES
def test_get_service_input(self):
form = MagicMock()
cleaned_data = PropertyMock(return_value={})
type(form).cleaned_data = cleaned_data
view = MockView()
rv = view.get_service_input(form)
cleaned_data.assert_called_once_with()
self.assertEqual({}, rv)
def test_old_tracer_correlation( self ):
s = self.simulation
s.atoms = Mock()
s.atoms.sum_dr_squared = PropertyMock( return_value=10.0 )
s.number_of_jumps = 5
self.assertEqual( s.old_tracer_correlation, 2.0 )
def test_collective_diffusion_coefficient_per_atom( self ):
s = self.simulation
with patch( 'lattice_mc.simulation.Simulation.collective_diffusion_coefficient', new_callable=PropertyMock ) as mock_cdc:
mock_cdc.return_value = 8.0
s.number_of_atoms = 2
self.assertEqual( s.collective_diffusion_coefficient_per_atom, 4.0 )
def test_worker_thread_transfer():
s = ops.SyncCopy(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
s._transfer_queue.put(mock.MagicMock())
s._transfer_queue.put(mock.MagicMock())
s._process_synccopy_descriptor = mock.MagicMock()
s._process_synccopy_descriptor.side_effect = [None, Exception()]
with mock.patch(
'blobxfer.operations.synccopy.SyncCopy.termination_check',
new_callable=mock.PropertyMock) as patched_tc:
patched_tc.side_effect = [False, False, True]
s._worker_thread_transfer()
assert s._process_synccopy_descriptor.call_count == 2
assert len(s._exceptions) == 1
def test_check_for_uploads_from_md5():
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._md5_offload = mock.MagicMock()
u._post_md5_skip_on_check = mock.MagicMock()
with mock.patch(
'blobxfer.operations.upload.Uploader.termination_check_md5',
new_callable=mock.PropertyMock) as patched_tcm:
patched_tcm.side_effect = [False, False, False, True, True]
u._md5_offload.pop_done_queue.side_effect = [
None, mock.MagicMock(), None
]
u._check_for_uploads_from_md5()
assert u._post_md5_skip_on_check.call_count == 1
def test_worker_thread_disk():
with mock.patch(
'blobxfer.operations.download.Downloader.termination_check',
new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.disk_threads = 1
d._disk_queue = mock.MagicMock()
d._disk_queue.get.side_effect = [
(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()),
]
d._process_data = mock.MagicMock()
patched_tc.side_effect = [False, True]
d._worker_thread_disk()
assert d._process_data.call_count == 1
with mock.patch(
'blobxfer.operations.download.Downloader.termination_check',
new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.disk_threads = 1
d._disk_queue = mock.MagicMock()
d._disk_queue.get.side_effect = [
(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()),
]
d._process_data = mock.MagicMock()
d._process_data.side_effect = Exception()
patched_tc.side_effect = [False, True]
d._worker_thread_disk()
assert len(d._exceptions) == 1
def test_events_by_id_multi_page(self, mocker, page1_response, page2_response):
swf_mock = SwfMock()
swf_mock.pages['page2'] = page2_response
mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)
h = floto.History(domain='d', task_list='tl', response=page1_response)
assert h.lowest_event_id == 2
assert h.get_event(1)
assert h.lowest_event_id == 1