def test_make_provider_token_calls_jwt_encode_with_correct_args(self):
"""
Test that APNSConnection.make_provider_token() calls jwt.encode() with the correct
arguments. jwt.encode() returns different results each time even with the same data passed in
so we cannot just test for the expected return value.
"""
issued_at = time.time()
connection = jwt_apns_client.APNSConnection(
team_id='TEAMID',
apns_key_id='KEYID',
apns_key_path=self.KEY_FILE_PATH)
with mock.patch('jwt_apns_client.utils.jwt.encode') as mock_encode:
connection.make_provider_token(issued_at=issued_at)
mock_encode.assert_called_with(
{
'iss': connection.team_id,
'iat': issued_at
},
connection.secret,
algorithm=connection.algorithm,
headers=connection.get_token_headers())
python类patch()的实例源码
def test_redis_status(self, mock_redis):
setattr(settings, 'CACHEOPS_REDIS', {'host': 'foo', 'port': 1337})
mock_redis.StrictRedis.return_value.ping.return_value = 'PONG'
mock_redis.StrictRedis.return_value.info.return_value = {
'redis_version': '1.0.0'}
status = redis_status.check(request=None)
assert status['ping'] == 'PONG'
assert status['version'] == '1.0.0'
# @mock.patch('heartbeat.checkers.redis_status.redis')
# def test_redis_connection_error(self, mock_redis):
# setattr(settings, 'CACHEOPS_REDIS', {'host': 'foo', 'port': 1337})
# mock_ping = mock_redis.StrictRedis.return_value.ping
# mock_ping.side_effect = ConnectionError('foo')
# status = redis.check(request=None)
# assert status['error'] == 'foo', status
def test_db_version(self):
import django
if django.VERSION >= (1, 7):
cursor = 'django.db.backends.utils.CursorWrapper'
else:
cursor = 'django.db.backends.util.CursorWrapper'
with mock.patch(cursor) as mock_cursor:
mock_cursor.return_value.fetchone.return_value = ['1.0.0']
dbs = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'foo'
}
}
setattr(settings, 'DATABASES', dbs)
dbs = databases.check(request=None)
assert len(dbs) == 1
assert dbs[0]['version'] == '1.0.0'
def call_runscheduler(loops=1, mock_call_command=None):
ctx = {'sleep_count': 0}
def fake_sleep(seconds):
ctx['sleep_count'] += 1
if ctx['sleep_count'] > loops:
raise KeyboardInterrupt()
if mock_call_command is None:
mock_call_command = mock.MagicMock()
with mock.patch.object(runscheduler, 'call_command', mock_call_command):
with mock.patch.object(runscheduler, 'logger') as mock_logger:
with mock.patch('time.sleep', fake_sleep):
with pytest.raises(KeyboardInterrupt):
call_command('runscheduler')
return mock_call_command, mock_logger
def test_args(self):
"""Test WriteTensorBoard argument handling and ``SummaryWriter`` creation."""
model = self.get_model()
with self.assertRaises(AssertionError):
_ = WriteTensorBoard(output_dir=self.tmpdir, model=42)
with self.assertRaises(AssertionError):
_ = WriteTensorBoard(output_dir=self.tmpdir, model=model, on_missing_variable='not-recognized')
with self.assertRaises(AssertionError):
_ = WriteTensorBoard(output_dir=self.tmpdir, model=model, on_unknown_type='not-recognized')
with mock.patch('tensorflow.summary.FileWriter', autospec=True) as mocked_writer:
_ = WriteTensorBoard(output_dir=self.tmpdir, model=model, flush_secs=42, visualize_graph=True)
mocked_writer.assert_called_with(logdir=self.tmpdir, flush_secs=42, graph=model.graph)
_ = WriteTensorBoard(output_dir=self.tmpdir, model=model)
mocked_writer.assert_called_with(logdir=self.tmpdir, flush_secs=10, graph=None)
def test_missing_variable(self):
"""Test if ``WriteTensorBoard`` handles missing image variables as expected."""
bad_epoch_data = {'valid': {}}
with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
# test ignore
hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='ignore')
with LogCapture(level=logging.INFO) as log_capture:
hook.after_epoch(42, bad_epoch_data)
log_capture.check()
# test warn
warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='warn')
with LogCapture(level=logging.INFO) as log_capture2:
warn_hook.after_epoch(42, bad_epoch_data)
log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))
# test error
raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='error')
with self.assertRaises(KeyError):
raise_hook.after_epoch(42, bad_epoch_data)
def test_params_dict_after_font_size(self):
with patch('matplotlib.rcParams.update') as mock_update, \
patch('matplotlib.pyplot.switch_backend') as mock_switch:
old_params = dict(plt.rcParams)
with lp.temp_params(font_size=10, params_dict={
'axes.labelsize': 12,
'legend.fontsize': 12,
}):
called_with = mock_update.call_args[0][0]
assert called_with['font.size'] == 10
assert called_with['axes.labelsize'] == 12
assert called_with['axes.titlesize'] == 10
assert called_with['legend.fontsize'] == 12
assert called_with['xtick.labelsize'] == 10
assert called_with['ytick.labelsize'] == 10
mock_update.assert_called_with(old_params)
def test_parameters_passed_custom_kwargs(self):
params = inspect.signature(lp.figure).parameters
with patch('matplotlib.figure.Figure.set_size_inches'), \
patch('latexipy._latexipy.save_figure') as mock_save_figure:
with lp.figure('filename', directory='directory', exts='exts',
mkdir='mkdir'):
pass
mock_save_figure.assert_called_once_with(
filename='filename',
directory='directory',
exts='exts',
mkdir='mkdir',
from_context_manager=True,
)
def setUp(self):
# disable logging while testing
logging.disable(logging.CRITICAL)
self.patched = {}
if hasattr(self, 'patch_these'):
for patch_this in self.patch_these:
namespace = patch_this[0] if isinstance(patch_this, (list, set)) else patch_this
patcher = mock.patch(namespace)
mocked = patcher.start()
mocked.reset_mock()
self.patched[namespace] = mocked
if isinstance(patch_this, (list, set)) and len(patch_this) > 0:
retval = patch_this[1]
if callable(retval):
retval = retval()
mocked.return_value = retval
def test_publish_sns_event():
TEST_ARN = 'arn:abc'
TEST_MESSAGE = "message"
with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
mock_send_sns_message.return_value = {"MessageId": "1234"}
message_publisher = publish_sns_message(TEST_ARN)
message_publisher(TEST_MESSAGE)
mock_send_sns_message.assert_called_once_with(TopicArn=TEST_ARN, Message=TEST_MESSAGE)
with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
mock_send_sns_message.return_value = None
message_publisher = publish_sns_message(TEST_ARN)
with pytest.raises(Exception):
message_publisher(TEST_MESSAGE)
mock_send_sns_message.assert_called_once_with(TopicArn=TEST_ARN, Message=TEST_MESSAGE)
def test_yml_invalid(mocker, SETTINGS):
GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
from jenkins_epo.extensions.core import YamlExtension
ext = YamlExtension('ext', Mock())
ext.current = ext.bot.current
ext.current.yaml = {}
ext.current.errors = []
GITHUB.fetch_file_contents = CoroutineMock(return_value="{INVALID")
head = ext.current.head
head.repository.url = 'https://github.com/owner/repo.git'
head.repository.jobs = []
yield from ext.run()
assert GITHUB.fetch_file_contents.mock_calls
assert ext.current.errors
def test_yml_found(mocker, SETTINGS):
GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
Job = mocker.patch('jenkins_epo.extensions.core.Job')
from jenkins_epo.extensions.core import YamlExtension
Job.jobs_filter = ['*', '-skip']
SETTINGS.update(YamlExtension.SETTINGS)
ext = YamlExtension('ext', Mock())
ext.current = ext.bot.current
ext.current.yaml = {'job': dict()}
GITHUB.fetch_file_contents = CoroutineMock(
return_value="job: command\nskip: command",
)
head = ext.current.head
head.repository.url = 'https://github.com/owner/repo.git'
head.repository.jobs = {}
yield from ext.run()
assert GITHUB.fetch_file_contents.mock_calls
assert 'job' in ext.current.job_specs
assert 'skip' not in ext.current.job_specs
def test_default(self):
"""Does the default method print the proper information?"""
with patch('sys.stdout', new=StringIO()) as fake_out:
self.my_log.describe(mode="default")
output = fake_out.getvalue()
self.assertIn("Log containing 4 records from local git created at ", output)
self.assertIn("\nOrigin:", output)
self.assertNotIn("Filters:", output)
self.assertIn("\nNumber of authors: 4\n", output)
self.assertIn("\nNumber of files: 7\n", output)
self.assertIn("\nMost common email address domains:", output)
self.assertIn("\n\t @gmail.com [4 users]\n", output)
self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
self.assertIn("\nChange distribution summary:\n", output)
self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
self.assertIn("\nNumber of merges: 0\n", output)
self.assertIn("\nNumber of parsing errors: 0\n", output)
def test_not_default(self):
""" Does a non-default method print the proper information?
Note: At this point, default is the only setting so they end up being the same."""
with patch('sys.stdout', new=StringIO()) as fake_out:
self.my_log.describe(mode="not default")
output = fake_out.getvalue()
self.assertIn("Log containing 4 records from local git created at ", output)
self.assertIn("\nOrigin:", output)
self.assertNotIn("Filters:", output)
self.assertIn("\nNumber of authors: 4\n", output)
self.assertIn("\nNumber of files: 7\n", output)
self.assertIn("\nMost common email address domains:", output)
self.assertIn("\n\t @gmail.com [4 users]\n", output)
self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
self.assertIn("\nChange distribution summary:\n", output)
self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
self.assertIn("\nNumber of merges: 0\n", output)
self.assertIn("\nNumber of parsing errors: 0\n", output)
def test_whole(self):
"""Is the entire output as expected?"""
with patch('sys.stdout', new=StringIO()) as fake_out:
self.my_log.describe()
out = fake_out.getvalue()
self.assertRegex(out, "Log containing 4 records from local git created at ....-..-.. ..:..:..\.......\.\n"
"Origin: .*\n"
"Number of authors: 4\n"
"Number of files: 7\n"
"Most common email address domains:\n"
"\t @gmail.com \[4 users\]\n"
"Date range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n"
"Change distribution summary:\n"
"\t Files changed: Mean = 2.75, SD = 0.829\n"
"\t Line insertions: Mean = 2.75, SD = 0.829\n"
"\t Line deletions: Mean = nan, SD = nan\n"
"Number of merges: 0\n"
"Number of parsing errors: 0\n")
def test_exclude(self):
"""Does exclude prevent statistics from being printed?"""
with patch('sys.stdout', new=StringIO()) as fake_out:
self.my_log.describe(exclude=['merges', 'errors', 'files', 'summary', 'changes', 'path', 'filters',
'authors', 'dates', 'emails'])
output = fake_out.getvalue()
self.assertNotIn("Log containing 4 records from local git created at ", output)
self.assertNotIn("\nOrigin:", output)
self.assertNotIn("Filters:", output)
self.assertNotIn("\nNumber of authors: 4\n", output)
self.assertNotIn("\nNumber of files: 7\n", output)
self.assertNotIn("\nMost common email address domains:", output)
self.assertNotIn("\n\t @gmail.com [4 users]\n", output)
self.assertNotIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
self.assertNotIn("\nChange distribution summary:\n", output)
self.assertNotIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
self.assertNotIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
self.assertNotIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
self.assertNotIn("\nNumber of merges: 0\n", output)
self.assertNotIn("\nNumber of parsing errors: 0\n", output)
self.assertEqual(output, "")
def test_update_record_values(api_client, zone, boto_client):
G(m.Zone)
record_data = {
'values': ['1.2.3.4']
}
response = api_client.patch(
'/zones/{}/records/{}'.format(zone.id, hash_test_record(zone)),
data=record_data)
assert response.data == {
**get_test_record(zone),
**record_data
}
assert aws_strip_ns_and_soa(
boto_client.list_resource_record_sets(HostedZoneId=zone.r53_zone.id), zone.root
) == sorted([
record_data_to_aws({
**get_test_record(zone),
**record_data
}, zone.root)
], key=aws_sort_key)
def test_update_record_ttl(api_client, zone, boto_client):
G(m.Zone)
record_data = {
'ttl': 580
}
response = api_client.patch(
'/zones/{}/records/{}'.format(zone.id, hash_test_record(zone)),
data=record_data
)
assert response.data == {
**get_test_record(zone),
**record_data
}
assert aws_strip_ns_and_soa(
boto_client.list_resource_record_sets(HostedZoneId=zone.r53_zone.id), zone.root
) == sorted([
record_data_to_aws({
**get_test_record(zone),
**record_data
}, zone.root)
], key=aws_sort_key)
def test_get_checks_as_dict(self):
foo = ServiceCheck(timestamp=42, hostname='server1',
name='foo', status=0, output='foo out')
bar = ServiceCheck(timestamp=42, hostname='server1',
name='bar', status=1, output='bar out')
with mock.patch.dict('sauna.check_results', foo=foo, bar=bar):
self.assertDictEqual(base.AsyncConsumer.get_checks_as_dict(), {
'foo': {
'status': 'OK',
'code': 0,
'timestamp': 42,
'output': 'foo out'
},
'bar': {
'status': 'WARNING',
'code': 1,
'timestamp': 42,
'output': 'bar out'
}
})
test_pa_start_django_webapp_with_virtualenv.py 文件源码
项目:helper_scripts
作者: pythonanywhere
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_calls_all_stuff_in_right_order(self):
with patch('scripts.pa_start_django_webapp_with_virtualenv.DjangoProject') as mock_DjangoProject:
main('www.domain.com', 'django.version', 'python.version', nuke='nuke option')
assert mock_DjangoProject.call_args == call('www.domain.com')
assert mock_DjangoProject.return_value.method_calls == [
call.sanity_checks(nuke='nuke option'),
call.create_virtualenv('python.version', 'django.version', nuke='nuke option'),
call.run_startproject(nuke='nuke option'),
call.find_django_files(),
call.update_settings_file(),
call.run_collectstatic(),
call.create_webapp(nuke='nuke option'),
call.add_static_file_mappings(),
call.update_wsgi_file(),
call.webapp.reload(),
]
test_pa_start_django_webapp_with_virtualenv.py 文件源码
项目:helper_scripts
作者: pythonanywhere
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_actually_creates_django_project_in_virtualenv_with_hacked_settings_and_static_files(
self, fake_home, virtualenvs_folder, api_token
):
with patch('scripts.pa_start_django_webapp_with_virtualenv.DjangoProject.update_wsgi_file'):
with patch('pythonanywhere.api.call_api'):
main('mydomain.com', '1.9.2', '2.7', nuke=False)
django_version = subprocess.check_output([
virtualenvs_folder / 'mydomain.com/bin/python',
'-c'
'import django; print(django.get_version())'
]).decode().strip()
assert django_version == '1.9.2'
with open(fake_home / 'mydomain.com/mysite/settings.py') as f:
lines = f.read().split('\n')
assert "MEDIA_ROOT = os.path.join(BASE_DIR, 'media')" in lines
assert "ALLOWED_HOSTS = ['mydomain.com']" in lines
assert 'base.css' in os.listdir(fake_home / 'mydomain.com/static/admin/css')
def test_interactive(self, mock_print, mock_markdown,
mock_print_heading, mock_import_readline):
def MockInputFactory(return_values):
_counter = -1
def mock_input(prompt=''):
nonlocal _counter
_counter += 1
if _counter < len(return_values):
return return_values[_counter]
elif _counter == len(return_values):
raise EOFError
else:
raise KeyboardInterrupt
return mock_input
return_values = ['foo', 'bar', 'baz']
with patch('builtins.input', MockInputFactory(return_values)):
cli.interactive(sentinel.RendererCls)
mock_import_readline.assert_called_with()
mock_print_heading.assert_called_with(sentinel.RendererCls)
mock_markdown.assert_called_with(['foo\n', 'bar\n', 'baz\n'],
sentinel.RendererCls)
calls = [call('\nrendered text', end=''), call('\nExiting.')]
mock_print.assert_has_calls(calls)
def test_munge_config(self):
def mock_find(path):
return os.path.join('/some/path/static/', path)
with mock.patch('webpack.conf.find', new=mock_find) as find:
munged = get_munged_config(WEBPACK_CONFIG)
expected_output = WEBPACK_CONFIG_OUTPUT.format(
url=settings.STATIC_URL,
root=settings.STATIC_ROOT
)
self.assertEqual(
munged,
expected_output
)
def test_init_working_dir():
"""init_working_dir should initialize a valid git repo, and clean up after itself"""
repo_url = "https://github.com/mitodl/release-script.git"
access_token = 'fake_access_token'
with patch('release.check_call', autospec=True) as check_call_mock, init_working_dir(
access_token, repo_url,
) as other_directory:
assert os.path.exists(other_directory)
assert not os.path.exists(other_directory)
calls = check_call_mock.call_args_list
assert [call[0][0] for call in calls] == [
['git', 'init'],
['git', 'remote', 'add', 'origin', url_with_access_token(access_token, repo_url)],
['git', 'fetch'],
['git', 'checkout', '-t', 'origin/master'],
]
def randommock():
"""Returns a contextmanager that mocks random.random() at a specific value
Usage::
def test_something(randommock):
with randommock(0.55):
# test stuff...
"""
@contextlib.contextmanager
def _randommock(value):
with mock.patch('random.random') as mock_random:
mock_random.return_value = value
yield
return _randommock
def test_set_exc_handler_broken(self):
def run_loop():
def zero_error():
1/0
self.loop.call_soon(zero_error)
self.loop._run_once()
def handler(loop, context):
raise AttributeError('spam')
self.loop._process_events = mock.Mock()
self.loop.set_exception_handler(handler)
with mock.patch('asyncio.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern(
'Unhandled error in exception handler'),
exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
def test_create_connection_timeout(self, m_socket):
# Ensure that the socket is closed on timeout
sock = mock.Mock()
m_socket.socket.return_value = sock
def getaddrinfo(*args, **kw):
fut = asyncio.Future(loop=self.loop)
addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
('127.0.0.1', 80))
fut.set_result([addr])
return fut
self.loop.getaddrinfo = getaddrinfo
with mock.patch.object(self.loop, 'sock_connect',
side_effect=asyncio.TimeoutError):
coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
with self.assertRaises(asyncio.TimeoutError):
self.loop.run_until_complete(coro)
self.assertTrue(sock.close.called)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFIFO
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def setUp(self):
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
def waitpid_mocks(func):
def wrapped_func(self):
def patch(target, wrapper):
return mock.patch(target, wraps=wrapper,
new_callable=mock.Mock)
with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
patch('os.waitpid', self.waitpid) as m_waitpid:
func(self, WaitPidMocks(m_waitpid,
m_WIFEXITED, m_WIFSIGNALED,
m_WEXITSTATUS, m_WTERMSIG,
))
return wrapped_func