def test_send_batch_400_no_raise(self, mock_post):
"""
Test that if raise_for_status is False we don't raise an exception for a 400 response
"""
mock_post.return_value = Mock(
spec=Response,
status_code=HTTP_400_BAD_REQUEST,
json=mocked_json()
)
chunk_size = 10
recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
assert len(recipient_tuples) == 52
with override_settings(
MAILGUN_RECIPIENT_OVERRIDE=None,
):
resp_list = MailgunClient.send_batch(
'email subject', 'email body', recipient_tuples, chunk_size=chunk_size, raise_for_status=False
)
assert len(resp_list) == 6
for resp in resp_list:
assert resp.status_code == HTTP_400_BAD_REQUEST
assert mock_post.call_count == 6
assert mock_post.return_value.raise_for_status.called is False
python类override_settings()的实例源码
def test_get_expire_at_browser_close(self):
# Tests get_expire_at_browser_close with different settings and different
# set_expiry calls
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=False):
self.session.set_expiry(10)
self.assertFalse(self.session.get_expire_at_browser_close())
self.session.set_expiry(0)
self.assertTrue(self.session.get_expire_at_browser_close())
self.session.set_expiry(None)
self.assertFalse(self.session.get_expire_at_browser_close())
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=True):
self.session.set_expiry(10)
self.assertFalse(self.session.get_expire_at_browser_close())
self.session.set_expiry(0)
self.assertTrue(self.session.get_expire_at_browser_close())
self.session.set_expiry(None)
self.assertTrue(self.session.get_expire_at_browser_close())
def test_actual_expiry(self):
# this doesn't work with JSONSerializer (serializing timedelta)
with override_settings(SESSION_SERIALIZER='django.contrib.sessions.serializers.PickleSerializer'):
self.session = self.backend() # reinitialize after overriding settings
# Regression test for #19200
old_session_key = None
new_session_key = None
try:
self.session['foo'] = 'bar'
self.session.set_expiry(-timedelta(seconds=10))
self.session.save()
old_session_key = self.session.session_key
# With an expiry date in the past, the session expires instantly.
new_session = self.backend(self.session.session_key)
new_session_key = new_session.session_key
self.assertNotIn('foo', new_session)
finally:
self.session.delete(old_session_key)
self.session.delete(new_session_key)
def test_invalidate_fernet_cached_properties(self):
"""
Tests that fernet field properties are properly invalidated.
"""
def verify_model_field_keys(model, field_name, expected_keys_list):
"""
Verifies cached property keys has expected keys list.
"""
field = model._meta.get_field(field_name)
# Verify keys are properly set and fetched.
self.assertEqual(field.keys, expected_keys_list)
self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)
verify_model_field_keys(TranscriptCredentials, 'api_key', OLD_FERNET_KEYS_LIST)
# Invalidate cached properties.
utils.invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key'])
# Prepend a new key.
new_keys_set = ['new-fernet-key'] + settings.FERNET_KEYS
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
verify_model_field_keys(TranscriptCredentials, 'api_key', new_keys_set)
def test_decrypt_different_key(self):
"""
Tests decryption with one more key pre-pended. Note that we still have the old key with which value was
encrypted so we should be able to decrypt it again.
"""
old_keys_set = ['test-ferent-key']
self.assertEqual(settings.FERNET_KEYS, old_keys_set)
new_keys_set = ['new-fernet-key'] + settings.FERNET_KEYS
# Invalidate cached properties so that we get the latest keys
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
transcript_credentials = TranscriptCredentials.objects.get(
org=self.credentials_data['org'], provider=self.credentials_data['provider']
)
self.assertEqual(transcript_credentials.api_key, self.credentials_data['api_key'])
self.assertEqual(transcript_credentials.api_secret, self.credentials_data['api_secret'])
def test_decrypt_different_key_set(self):
"""
Tests decryption with different fernet key set. Note that now we don't have the old fernet key with which
value was encrypted so we would not be able to decrypt it and we should get an Invalid Token.
"""
old_keys_set = ['test-ferent-key']
self.assertEqual(settings.FERNET_KEYS, old_keys_set)
new_keys_set = ['new-fernet-key']
# Invalidate cached properties so that we get the latest keys
invalidate_fernet_cached_properties(TranscriptCredentials, ['api_key', 'api_secret'])
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
with self.assertRaises(InvalidToken):
TranscriptCredentials.objects.get(
org=self.credentials_data['org'], provider=self.credentials_data['provider']
)
test_re_encrypt_transcript_credentials.py 文件源码
项目:edx-video-pipeline
作者: edx
项目源码
文件源码
阅读 48
收藏 0
点赞 0
评论 0
def test_reencrypt_transcript_credentials(self, mock_logger):
"""
Test transcript credentials are re-encrypted correctly.
"""
# Verify fernet keys.
self.assertEqual(settings.FERNET_KEYS, OLD_FERNET_KEYS_LIST)
# Verify we are able to access the record.
self.verify_access_credentials()
# Add a new key to the set
new_keys_set = ['new-fernet-key'] + settings.FERNET_KEYS
with override_settings(FERNET_KEYS=new_keys_set):
self.assertEqual(settings.FERNET_KEYS, new_keys_set)
# Run re-encryption process.
call_command('re_encrypt_transcript_credentials')
# Verify logging.
mock_logger.info.assert_called_with('[Transcript credentials re-encryption] Process completed.')
# Verify we are able to access the record.
self.verify_access_credentials()
def test_db_sms_sender(self):
sender = self._test_sms_sender()
sms = Sms.objects.get(phone=self.phone, text=self.text)
self.assertTrue(sms)
sms = Sms.objects.get(client__id=1)
self.assertTrue(sms)
Sms.objects.all().delete()
sender.add_sms(self.text + '1', self.phone)
sender.add_sms(self.text + '2', self.phone)
sender.process()
sms_objects = Sms.objects.all().order_by('text')
self.assertEqual(2, len(sms_objects))
self.assertEqual([self.text + '1', self.text + '2'],
[m.text for m in sms_objects])
# @override_settings(
# SMS_SENDER='vishleva.messengers.sms.providers.epochta.Epochta')
# def test_epochta_sms_sender(self):
# self._test_sms_sender()
def test_django_user_main_attribute(self):
backend = Saml2Backend()
old_username_field = User.USERNAME_FIELD
User.USERNAME_FIELD = 'slug'
self.assertEqual(backend.get_django_user_main_attribute(), 'slug')
User.USERNAME_FIELD = old_username_field
with override_settings(AUTH_USER_MODEL='auth.User'):
self.assertEqual(
DjangoUserModel.USERNAME_FIELD,
backend.get_django_user_main_attribute())
with override_settings(
AUTH_USER_MODEL='testprofiles.StandaloneUserModel'):
self.assertEqual(
backend.get_django_user_main_attribute(),
'username')
with override_settings(SAML_DJANGO_USER_MAIN_ATTRIBUTE='foo'):
self.assertEqual(backend.get_django_user_main_attribute(), 'foo')
def test_caching_enabled(admin_client, router, destination):
# Only sqlite3 logs a begin query within transaction
atomic_queries = 1 if connection.vendor == 'sqlite' else 0
with override_settings(ROUTING_CACHE=True):
with CaptureQueriesContext(connection=connection) as c:
response = admin_client.get(router.source, follow=True)
assert response.status_code == 200
assert_string_equal(response.content, 'destination')
first = len(c)
assert first - atomic_queries == 5
response = admin_client.get(router.source, follow=True)
assert response.status_code == 200
assert_string_equal(response.content, 'destination')
# Should only query for user and session because of condition
assert len(c) - first - atomic_queries == 2
router.delete()
with CaptureQueriesContext(connection=connection) as c:
response = admin_client.get(router.source, follow=True)
assert response.status_code == 200
assert_string_equal(response.content, 'home')
# Only the router query
assert len(c) == 1
def setUp(self):
class TestDirFSResource(MetaEtagMixIn, BaseFSDavResource):
root = os.path.dirname(os.path.realpath(__file__))
def __str__(self):
return "<Resource object for %s>" % self.get_abs_path()
self.dir_resource = TestDirFSResource('/')
self.file_resource = TestDirFSResource('/test_serializers.py')
#re-import because override_settings
from rest_framework_webdav.settings import webdav_api_settings as s2
ser1 = Resourcetype(instance=self.dir_resource,
resourcetype_clss=s2.RESOURCETYPES,
context={
'depth': 1,
})
self.rep1 = ser1.data
ser2 = Resourcetype(instance=self.file_resource,
resourcetype_clss=s2.RESOURCETYPES,
context={
'depth': 1,
})
self.rep2 = ser2.data
def test_find_templates(self):
with override_settings(INSTALLED_APPS=[]):
self.assertListEqual(template_classes, [])
self.assertListEqual(layout_classes, [])
with override_settings(INSTALLED_APPS=['dummy']):
self.assertListEqual(template_classes, [WelcomeTemplate])
self.assertListEqual(layout_classes, [BasicLayout])
with override_settings(INSTALLED_APPS=['dummy2']):
self.assertListEqual(template_classes, [DummyTemplate])
self.assertListEqual(layout_classes, [])
with override_settings(INSTALLED_APPS=['dummy', 'dummy2']):
self.assertSetEqual(set(template_classes), {WelcomeTemplate, DummyTemplate})
self.assertSetEqual(set(layout_classes), {BasicLayout})
def test_post_existing(self, rf):
"""It should fail when adding an existing
page / language combination.
"""
request = rf.post('/', {'parent_page': self.last_page.pk})
assert self.last_page.language.code == 'en'
with override_settings(WAGTAILTRANS_SYNC_TREE=False):
view = TranslationView()
view.request = request
response = view.dispatch(
request, instance_id=self.last_page.pk,
language_code=self.default_language.code)
assert response.status_code == 200
assert not view.get_form().is_valid()
def apimas_context(urlconf_module, spec):
"""
This function (decorator) is used to customize `TestCase` classes based
on the `APIMAS` spec of an application.
More specifically, the given spec is used in order `Django` urls to be
created and then test functions (triggering test case scenarios for
every collection and action) are created and bound to the provided
`TestCase` class.
:param urlconf_module: Path to the module where generated django urls
will be added.
:param spec: `APIMAS` specification.
"""
def wrapper(cls):
setattr(cls, 'spec', spec)
adapter = DjangoRestAdapter()
adapter.construct(spec)
setattr(cls, 'adapter', adapter)
urls = adapter.urls.values()
_add_urlpatterns(urlconf_module, urls)
_add_test_functions(cls, adapter, spec)
return override_settings(ROOT_URLCONF=urlconf_module)(cls)
return wrapper
def test_get_context_data(self):
with override_settings(DEBUG=True):
self.assertDictEqual(
MyNotification().get_context_data(),
{'hello': 'world', 'base_url': '', 'subject': None}
)
self.assertDictEqual(
MyNotification().get_context_data(),
{'hello': 'world', 'base_url': 'http://example.com', 'subject': None}
)
def test_get_sent_from_default(self):
class TestNotification(TwilioTextNotification):
from_number = None
with override_settings(TWILIO_DEFAULT_FROM_NUMBER='1231231234'):
self.assertEqual(TestNotification().get_sent_from(), '1231231234')
def test_unimportable_sendable_email_raises_import_error():
with pytest.raises(ImportError):
with override_settings(SENDABLE_EMAILS=['boop']):
pass # pragma: no cover
# This is weird, but required for the next test to not explode.
# I think b/c the former exception was raised in a way that "broke"
# override_settings, preventing it from restoring the old value.
delattr(settings, 'SENDABLE_EMAILS')
def test_non_sendable_email_raises_improperly_configured_error():
with pytest.raises(ImproperlyConfigured):
with override_settings(SENDABLE_EMAILS=['unittest.TestCase']):
pass # pragma: no cover
# This is weird, but required for the next test to not explode.
# I think b/c the former exception was raised in a way that "broke"
# override_settings, preventing it from restoring the old value.
delattr(settings, 'SENDABLE_EMAILS')
def client():
with override_settings(SENDABLE_EMAILS=[MY_SENDABLE_EMAIL],
ROOT_URLCONF=__name__):
yield Client()
def enable(self):
return override_settings(TEMPLATES=[{
**settings.TEMPLATES[0],
'BACKEND': self.ENGINE_BACKENDS[self]
}])
def render_html(self, *args, **kwargs):
"""
Renders the template.
:rtype: str
"""
static_url = '%s://%s%s' % (self.request.scheme, self.request.get_host(), settings.STATIC_URL)
media_url = '%s://%s%s' % (self.request.scheme, self.request.get_host(), settings.MEDIA_URL)
with override_settings(STATIC_URL=static_url, MEDIA_URL=media_url):
template = loader.get_template(self.template_name)
context = self.get_context_data(*args, **kwargs)
html = template.render(context)
return html
def test_check_custom_user_model(self):
# Django doesn't re-register admins when using `override_settings`,
# so we have to do it manually in this test case.
admin.site.register(get_user_model(), HijackUserAdmin)
warnings = checks.check_custom_user_model(HijackAdminConfig)
self.assertFalse(warnings)
admin.site.unregister(get_user_model())
def test_check_custom_user_model_default_admin(self):
# Django doesn't re-register admins when using `override_settings`,
# so we have to do it manually in this test case.
admin.site.register(get_user_model(), UserAdmin)
warnings = checks.check_custom_user_model(HijackAdminConfig)
expected_warnings = [
Warning(
'django-hijack-admin does not work out the box with a custom user model.',
hint='Please mix HijackUserAdminMixin into your custom UserAdmin.',
obj=settings.AUTH_USER_MODEL,
id='hijack_admin.W001',
)
]
self.assertEqual(warnings, expected_warnings)
admin.site.unregister(get_user_model())
def test_disabled_eraserhead(self):
""" When eraserhead is disabled, request signals handlers shouldn't be connected """
request_started_receivers_count_before = len(request_started.receivers)
request_finished_receivers_count_before = len(request_finished.receivers)
with override_settings(INSTALLED_APPS=("eraserhead.apps.EraserheadConfig",)):
apps.get_app_config('eraserhead')
self.assertEqual(request_started_receivers_count_before, len(request_started.receivers))
self.assertEqual(request_finished_receivers_count_before, len(request_finished.receivers))
def test_enbaled_eraserhead(self):
""" When eraserhead is enabled, request signals handlers should be connected """
request_started_receivers_count_before = len(request_started.receivers)
request_finished_receivers_count_before = len(request_finished.receivers)
with override_settings(INSTALLED_APPS=("eraserhead.apps.EraserheadConfig",)):
apps.get_app_config('eraserhead')
self.assertEqual(request_started_receivers_count_before + 1, len(request_started.receivers))
self.assertEqual(request_finished_receivers_count_before + 1, len(request_finished.receivers))
def handle(self, *args, **options):
random.seed(12345)
if options.get('list_scenarios'):
self.stdout.write('Scenarios:\n')
for num, (_, name) in enumerate(DashboardStates()):
self.stdout.write(" {:03}_{}\n".format(num, name))
return
os.environ['DJANGO_LIVE_TEST_SERVER_ADDRESS'] = '0.0.0.0:7000-8000'
if not os.environ.get('WEBPACK_DEV_SERVER_HOST'):
# This should only happen if the user is running in an environment without Docker, which isn't allowed
# for this command.
raise Exception('Missing environment variable WEBPACK_DEV_SERVER_HOST.')
if os.environ.get('RUNNING_SELENIUM') != 'true':
raise Exception(
"This management command must be run with ./scripts/test/run_snapshot_dashboard_states.sh"
)
# We need to use pytest here instead of invoking the tests directly so that the test database
# is used. Using override_settings(DATABASE...) causes a warning message and is not reliable.
global RUNNING_DASHBOARD_STATES # pylint: disable=global-statement
RUNNING_DASHBOARD_STATES = True
global DASHBOARD_STATES_OPTIONS # pylint: disable=global-statement
DASHBOARD_STATES_OPTIONS = options
with override_settings(
ELASTICSEARCH_INDEX='testindex',
):
pytest_args = ["{}::test_dashboard_states".format(__file__), "-s"]
if options.get('create_db'):
pytest_args.append('--create-db')
sys.exit(pytest.main(args=pytest_args))
def test_500_error_context_logged_in(self):
"""
Assert context values for 500 error page when logged in
"""
with mute_signals(post_save):
profile = self.create_and_login_user()
self.client.force_login(profile.user)
with override_settings(EMAIL_SUPPORT='support'), patch(
'ui.templatetags.render_bundle._get_bundle'
) as get_bundle:
response = self.client.get('/500/')
assert response.context['authenticated'] is True
assert response.context['name'] == profile.preferred_name
assert response.context['support_email'] == 'support'
assert response.context['is_public'] is True
assert response.context['has_zendesk_widget'] is True
self.assertContains(response, 'Share this page', status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
bundles = [bundle[0][1] for bundle in get_bundle.call_args_list]
assert set(bundles) == {
'common',
'public',
'sentry_client',
'style',
'style_public',
'zendesk_widget',
}
def valid_settings():
"""
Fixture that provides valid (passes checks in configure()) configuration
"""
with override_settings(**DEFAULT_SETTINGS):
yield DEFAULT_SETTINGS
def invalid_settings(request):
"""
Fixture that runs a test against a set of invalid configurations
"""
settings = copy.copy(DEFAULT_SETTINGS)
settings.update(request.param)
with override_settings(**settings):
yield settings
def test_s3_store_configure(s3_store, key):
"""Test configure() against missing settings"""
with override_settings(**{
key: None,
}):
with pytest.raises(ImproperlyConfigured):
s3_store.configure()