def test_supplier_sectors_edit_standalone_view_api_success(
mock_update_profile, has_company_client,
company_profile_sectors_standalone_data,
api_response_200,
sso_user,
):
mock_update_profile.return_value = api_response_200
url = reverse('company-edit-sectors')
has_company_client.post(url, company_profile_sectors_standalone_data)
assert mock_update_profile.call_count == 1
assert mock_update_profile.call_args == call(
sso_session_id=sso_user.session_id,
data={
'sectors': ['AGRICULTURE_HORTICULTURE_AND_FISHERIES'],
'export_destinations': ['CN', 'IN'],
'export_destinations_other': 'West Philadelphia',
}
)
python类call()的实例源码
def test_verify_company_address_end_to_end(
mock_update_profile, settings, has_company_client,
send_verification_letter_end_to_end
):
settings.FEATURE_COMPANIES_HOUSE_OAUTH2_ENABLED = True
view = views.SendVerificationLetterView
response = send_verification_letter_end_to_end()
assert response.status_code == 200
assert response.template_name == view.templates[view.SENT]
assert mock_update_profile.call_count == 1
assert mock_update_profile.call_args == call(
data={
'postal_full_name': 'Jeremy',
},
sso_session_id='213'
)
def test_wait(mocker, config): # pylint: disable=W0621
"""Assert runner waits before re-running the monitoring script"""
mock_execution = mocker.Mock()
mock_execution.returncode = Codes.CRITICAL.value
mocker.patch('subprocess.run', return_value=mock_execution)
mocker.patch('cachetclient.cachet.Components.put')
time_sleep = mocker.patch('time.sleep')
n_retries = 5
d_interval = 10
with pytest.raises(SystemExit):
unit.main(script=mocker.Mock(), component_id=99, config_file=config, retries=n_retries,
interval=d_interval)
expected_calls = [call(10), call(10), call(10), call(10), call(10)]
assert time_sleep.call_args_list == expected_calls
test_create_amazon_refresh_token.py 文件源码
项目:alexa-browser-client
作者: richtier
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_create_amazon_refresh_token_no_defaults(mock_http_server, settings):
settings.ALEXA_VOICE_SERVICE_CLIENT_ID = 'my-client-id'
settings.ALEXA_VOICE_SERVICE_CLIENT_SECRET = 'my-client-secret'
settings.ALEXA_VOICE_SERVICE_DEVICE_TYPE_ID = 'my-device-type-id'
out = StringIO()
call_command(
'create_amazon_refresh_token',
'--address=127.0.0.1',
'--port=9000',
stdout=out
)
assert mock_http_server.call_count == 1
assert mock_http_server.call_args == call(
server_address=('127.0.0.1', 9000),
RequestHandlerClass=handlers.AmazonAlexaServiceLoginHandler,
client_id='my-client-id',
client_secret='my-client-secret',
device_type_id='my-device-type-id',
)
assert 'running server on http://127.0.0.1:9000' in out.getvalue()
def test_internal_find_index_called_for_each_item(self, monkeypatch, tmp_folder):
"""Internal finding an index number whilst scanning only called within
_insert_sorted() method during async scanning. So it should be called
only per item. Testing for: def _find_index(self, summary, sort_by).
:param monkeypatch: pytest monkey patch fixture
:type monkeypatch: _pytest.monkeypatch.MonkeyPatch
:param tmp_folder: Test params and dummy test folder factory fixture pair.
:type tmp_folder: (dict, dirtools.tests.factory.DummyFolderFactory)
"""
params, factory = tmp_folder
scan = Folder(factory.path, params['sort_by'], params['level'])
find_index = Mock(return_value=0)
monkeypatch.setattr(scan, '_find_index', find_index)
for item in scan.items(humanise=False):
assert call(item, params['sort_by']) in find_index.call_args_list
def test_cubic_lattice( self, mock_lattice, mock_site ):
a, b, c, = 2, 2, 2
spacing = 1.0
mock_site.side_effect = range(2*2*2)
mock_lattice.return_value = 'bar'
lattice = init_lattice.cubic_lattice( a, b, c, spacing )
expected_site_calls = [ [ 1, np.array( [ 0., 0., 0. ] ), [ 2, 2, 3, 3, 5, 5 ], 0.0, 'L' ],
[ 2, np.array( [ 1., 0., 0. ] ), [ 1, 1, 4, 4, 6, 6 ], 0.0, 'L' ],
[ 3, np.array( [ 0., 1., 0. ] ), [ 4, 4, 1, 1, 7, 7 ], 0.0, 'L' ],
[ 4, np.array( [ 1., 1., 0. ] ), [ 3, 3, 2, 2, 8, 8 ], 0.0, 'L' ],
[ 5, np.array( [ 0., 0., 1. ] ), [ 6, 6, 7, 7, 1, 1 ], 0.0, 'L' ],
[ 6, np.array( [ 1., 0., 1. ] ), [ 5, 5, 8, 8, 2, 2 ], 0.0, 'L' ],
[ 7, np.array( [ 0., 1., 1. ] ), [ 8, 8, 5, 5, 3, 3 ], 0.0, 'L' ],
[ 8, np.array( [ 1., 1., 1. ] ), [ 7, 7, 6, 6, 4, 4 ], 0.0, 'L' ] ]
for call, e in zip( mock_site.mock_calls, expected_site_calls ):
self.assertEqual( call[1][0], e[0] ) # site number
np.testing.assert_array_equal( call[1][1], e[1] ) # site coordinates
self.assertEqual( call[1][2], e[2] ) # neighbour lists
self.assertEqual( call[1][3], e[3] ) # ?
self.assertEqual( call[1][4], e[4] ) # site label
self.assertEqual( mock_lattice.mock_calls[0][1][0], list(range(2*2*2)) )
self.assertEqual( lattice, 'bar' )
np.testing.assert_array_equal( mock_lattice.mock_calls[0][2]['cell_lengths'], np.array( [ a, b, c, ] ) )
def test_potential_jumps_if_lattice_is_mostly_empty( self, mock_Jump ): # lattice is mostly vacant
jumps = [ Mock( spec=Jump ), Mock( spec=Jump ) ]
mock_Jump.side_effect = jumps
self.lattice.number_of_occupied_sites = 1
self.lattice_number_of_sites = 4
site = Mock( spec=Site )
site.neighbours = [ 2, 3 ]
occupied_sites = [ site ]
unoccupied_sites = [ Mock( spec=Site ), Mock( spec=Site ), Mock( spec=Site ) ]
self.lattice.nn_energy = 'A'
self.lattice.cn_energies = 'B'
self.lattice.jump_lookup_table = 'C'
for s in unoccupied_sites:
s.is_occupied = False
with patch( 'lattice_mc.lattice.Lattice.occupied_sites' ) as mock_occupied_sites:
with patch( 'lattice_mc.lattice.Lattice.site_with_id' ) as mock_site_with_id:
mock_occupied_sites.return_value = occupied_sites
mock_site_with_id.side_effect = unoccupied_sites[:2]
potential_jumps = self.lattice.potential_jumps()
self.assertEqual( potential_jumps, jumps )
self.assertEqual( mock_Jump.mock_calls[0][1], ( site, unoccupied_sites[0], 'A', 'B', 'C' ) )
self.assertEqual( mock_Jump.mock_calls[1][1], ( site, unoccupied_sites[1], 'A', 'B', 'C' ) )
mock_site_with_id.assert_has_calls( [ call(2), call(3) ] )
def test_potential_jumps_if_lattice_is_mostly_filled( self, mock_Jump ): # lattice is mostly occupied
mock_Jump.side_effect = [ 'jump1', 'jump2' ]
self.lattice.number_of_occupied_sites = 3
self.lattice_number_of_sites = 4
site = Mock( spec=Site )
site.neighbours = [ 2, 3 ]
vacant_sites = [ site ]
occupied_sites = [ Mock( spec=Site ), Mock( spec=Site ), Mock( spec=Site ) ]
for s in occupied_sites:
s.is_occupied = True
self.lattice.nn_energy = 'A'
self.lattice.cn_energies = 'B'
self.lattice.jump_lookup_table = 'C'
with patch( 'lattice_mc.lattice.Lattice.vacant_sites' ) as mock_vacant_sites:
with patch( 'lattice_mc.lattice.Lattice.site_with_id' ) as mock_site_with_id:
mock_vacant_sites.return_value = vacant_sites
mock_site_with_id.side_effect = occupied_sites[:2]
jumps = self.lattice.potential_jumps()
self.assertEqual( jumps, [ 'jump1', 'jump2' ] )
self.assertEqual( mock_Jump.mock_calls[0][1], ( occupied_sites[0], site, 'A', 'B', 'C' ) )
self.assertEqual( mock_Jump.mock_calls[1][1], ( occupied_sites[1], site, 'A', 'B', 'C' ) )
mock_site_with_id.assert_has_calls( [ call(2), call(3) ] )
def test_run_pipeline_pass_skip_parse_context(mocked_work_dir,
mocked_get_pipe_def,
mocked_get_parsed_context,
mocked_run_step_group):
"""run_pipeline passes correct params to all methods."""
pypyr.pipelinerunner.run_pipeline(
pipeline_name='arb pipe',
working_dir='arb/dir',
parse_input=False)
mocked_work_dir.assert_not_called()
mocked_get_pipe_def.assert_called_once_with(pipeline_name='arb pipe',
working_dir='arb/dir')
mocked_get_parsed_context.assert_not_called()
# 1st called steps, then on_success
expected_run_step_groups = [call(context={},
pipeline_definition='pipe def',
step_group_name='steps'),
call(context={},
pipeline_definition='pipe def',
step_group_name='on_success')]
mocked_run_step_group.assert_has_calls(expected_run_step_groups)
def test6_RGBLED_set(self, mock_wiringpi_softPwmCreate, mock_wiringpi_softPwmWrite, \
mock_wiringpi_ISR, mock_wiringpi_pinMode, mock_wiringpi_setup):
gpio_pin_r = 10
gpio_pin_g = 8
gpio_pin_b = 6
led = RGBLED(gpio_pin_r, gpio_pin_g, gpio_pin_b)
calls = [call(gpio_pin_r, 1), call(gpio_pin_g, 1), call(gpio_pin_b, 1)]
mock_wiringpi_pinMode.assert_has_calls(calls)
calls = [call(gpio_pin_r, 0, 100), call(gpio_pin_g, 0, 100), call(gpio_pin_b, 0, 100)]
mock_wiringpi_softPwmCreate.assert_has_calls(calls)
r_level = 1
g_level = 2
b_level = 3
led.set(r_level, g_level, b_level)
calls = [call(gpio_pin_r, r_level), call(gpio_pin_g, g_level), call(gpio_pin_b, b_level)]
mock_wiringpi_softPwmWrite.assert_has_calls(calls)
self.assertEqual(led.get(), (1, 2, 3))
def test_send_message_to_chat(self, mock_requests):
parsed_update = parse_update(TelegramUpdates.TEXT_OK_ID_OK_TEXT)
parsed_update['parse_mode'] = 'HTML'
parsed_update['text'] = 'Sentiment'
result = send_message_to_chat.delay(parsed_update).get(timeout=5)
# on success the Message sent to the chat is returned
self.assertEqual(result['text'], parsed_update['text'])
self.assertEqual(result['parse_mode'], parsed_update['parse_mode'])
self.assertEqual(result['reply_to_message_id'], parsed_update['reply_to_message_id'])
self.assertEqual(result['chat_id'], parsed_update['chat_id'])
mock_requests.assert_called()
self.assertIn(call(current_app.config['TELEGRAM_URL'] + 'sendMessage',
json=parsed_update,
timeout=current_app.config['TELEGRAM_REQUEST_TIMEOUT_SEC']),
mock_requests.call_args_list)
def test_build_bundled_pdfs_if_some_are_not_prefilled(
self, logger, get_parser, SimpleUploadedFile, slack, SubService):
# two submissions
get_parser.return_value.join_pdfs.return_value = b'pdf'
mock_submissions = [Mock(), Mock()]
mock_bundle = Mock(pk=2)
mock_bundle.should_have_a_pdf.return_value = True
# one is not prefilled
mock_bundle.get_individual_filled_pdfs.return_value = [Mock()]
mock_bundle.submissions.all.return_value = mock_submissions
mock_bundle.organization.pk = 1
# run
BundlesService.build_bundled_pdf_if_necessary(mock_bundle)
error_msg = "Submissions for ApplicationBundle(pk=2) lack pdfs"
logger.error.assert_called_once_with(error_msg)
slack.assert_called_once_with(error_msg)
self.assertEqual(
len(mock_bundle.get_individual_filled_pdfs.mock_calls), 2)
mock_bundle.save.assert_called_once_with()
SubService.fill_pdfs_for_submission.assert_has_calls(
[call(mock_sub) for mock_sub in mock_submissions], any_order=True)
def test_generate_endpoint_parser_noparam(addargument):
"""Generate a parser from endpoint metadata - no params"""
name = 'put-stuff'
metadata = {
'path': 'stuff',
'method': 'PUT',
'help': "Changes stuff",
'params': {},
}
parser = ArgumentParser()
subparsers = parser.add_subparsers()
generate_endpoint_parser(subparsers, name, metadata)
addargument.assert_has_calls([
# first helper for the main parser
mock.call('-h', '--help', action='help',
default=mock.ANY, help=mock.ANY),
# second helper for the 'put-stuff' subparser
mock.call('-h', '--help', action='help',
default=mock.ANY, help=mock.ANY)
])
def test_download_lstree_git(self):
git_url = 'git://githoobie.com/lol.git<%s>' % tree_object
await self._do_download(git_url)
# ensure git config was appended to, and that the output file was
# opened
self.open.assert_any_call('/t/mut/lol.git/config', 'a')
self.open.assert_any_call('/t/res/lol.git', 'w+')
# ensure the sequence of git commands were called
assert self.subprocess.run.mock_calls == [
call(['git', 'clone', '--bare',
'git://githoobie.com/lol.git', '/t/mut/lol.git']),
call(['git', 'ls-tree', '-r', '--long', '--full-tree',
tree_object],
cwd='/t/mut/lol.git', stdout=self.open()),
]
def test_download_directory_git(self):
path = '/'
git_url = 'git://githoobie.com/lol.git<%s><%s>' % (tree_object, path)
await self._do_download(git_url)
self._check_config()
# ensure the sequence of git commands were called
assert self.subprocess.run.mock_calls == [
call(['git', 'clone', '--bare',
'git://githoobie.com/lol.git', '/t/mut/lol.git'],
cwd='/t/mut'),
call(['git', 'rev-parse', '--quiet', '--verify', tree_object],
cwd='/t/mut/lol.git', stdout=-1),
call(['git', 'archive', '--prefix=/t/res/lol.git',
'--format=directory', tree_object],
cwd='/t/mut/lol.git'),
]
def test_download_lstree_git(self):
git_url = 'git://githoobie.com/lol.git<%s>' % tree_object
await self._do_download(git_url)
# ensure git config was appended to, and that the output file was
# opened
self.open.assert_any_call('/t/mut/lol.git/config', 'a')
self.open.assert_any_call('/t/res/lol.git', 'w+')
# ensure the sequence of git commands were called
assert self.subprocess.run.mock_calls == [
call(['git', 'clone', '--bare',
'git://githoobie.com/lol.git', '/t/mut/lol.git'],
cwd='/t/mut'),
call(['git', 'rev-parse', '--quiet', '--verify', tree_object],
cwd='/t/mut/lol.git', stdout=-1),
call(['git', 'ls-tree', '-r', '--long', '--full-tree',
tree_object],
cwd='/t/mut/lol.git', stdout=self.open()),
]
def test_update_when_hash_not_present(self):
class FakeResult:
stdout = b''
self.subprocess.run.return_value = FakeResult
path = 'README.md'
git_url = 'git://githoobie.com/lol.git<%s><%s>' % (tree_object, path)
await self._do_download(git_url)
self._check_config()
# ensure the sequence of git commands were called
assert self.subprocess.run.mock_calls == [
call(['git', 'clone', '--bare',
'git://githoobie.com/lol.git', '/t/mut/lol.git'],
cwd='/t/mut'),
call(['git', 'rev-parse', '--quiet', '--verify', tree_object],
cwd='/t/mut/lol.git', stdout=-1),
call(['git', 'fetch'], cwd='/t/mut/lol.git'),
call(['git', 'archive', '--output=/t/res/README.md',
'--format=raw', tree_object, 'README.md'],
cwd='/t/mut/lol.git'),
]
def test_calls_update_streams_with_content(self, mock_update):
# Calls on create
content = ContentFactory()
mock_update.assert_called_once_with(content)
mock_update.reset_mock()
# Does not call on update
content.text = "update!"
content.save()
self.assertFalse(mock_update.called)
def test_local_content_with_parent_sent_as_reply(self, mock_send):
user = UserFactory()
parent = ContentFactory(author=user.profile)
mock_send.reset_mock()
content = ContentFactory(author=user.profile, parent=parent)
self.assertTrue(content.local)
call_args = [
call(send_reply_notifications, content.id),
call(send_reply, content.id),
]
assert mock_send.call_args_list == call_args