def test_redirect(self):
g1 = Gate()
self.assertEqual(g1.login_url, settings.BAYA_LOGIN_URL)
g2 = Gate(login_url=None)
self.assertEqual(g2.login_url, settings.BAYA_LOGIN_URL)
custom_login = "/testlogin/"
g3 = Gate(login_url=custom_login)
self.assertEqual(g3.login_url, custom_login)
self.assertEqual(six.text_type((g3 + g2).login_url),
six.text_type(custom_login))
self.assertEqual((g2 + g3).login_url, custom_login)
with override_settings(BAYA_LOGIN_URL="/testlogin/"):
g4 = Gate()
self.assertEqual(g4.login_url, "/testlogin/")
with override_settings(BAYA_LOGIN_URL=None):
g5 = Gate()
self.assertEqual(g5.login_url, "/login/")
python类override_settings()的实例源码
def test_redirect(self):
g1 = Gate()
self.assertEqual(g1.login_url, settings.BAYA_LOGIN_URL)
g2 = Gate(login_url=None)
self.assertEqual(g2.login_url, settings.BAYA_LOGIN_URL)
custom_login = "/testlogin/"
g3 = Gate(login_url=custom_login)
self.assertEqual(g3.login_url, custom_login)
self.assertEqual(six.text_type((g3 + g2).login_url),
six.text_type(custom_login))
self.assertEqual((g2 + g3).login_url, custom_login)
with override_settings(BAYA_LOGIN_URL="/testlogin/"):
g4 = Gate()
self.assertEqual(g4.login_url, "/testlogin/")
with override_settings(BAYA_LOGIN_URL=None):
g5 = Gate()
self.assertEqual(g5.login_url, "/login/")
def test_user_info_with_custom_user(django_elasticapm_client, client):
with override_settings(AUTH_USER_MODEL='testapp.MyUser'):
from django.contrib.auth import get_user_model
MyUser = get_user_model()
user = MyUser(my_username='admin')
user.set_password('admin')
user.save()
assert client.login(username='admin', password='admin')
with pytest.raises(Exception):
client.get(reverse('elasticapm-raise-exc'))
assert len(django_elasticapm_client.events) == 1
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert 'user' in event['context']
user_info = event['context']['user']
assert 'is_authenticated' in user_info
assert user_info['is_authenticated']
assert 'username' in user_info
assert user_info['username'] == 'admin'
assert 'email' not in user_info
def test_response_middlware_exception(django_elasticapm_client, client):
if django.VERSION[:2] < (1, 3):
return
with override_settings(**middleware_setting(django.VERSION,
['tests.contrib.django.testapp.middleware.BrokenResponseMiddleware'])):
with pytest.raises(ImportError):
client.get(reverse('elasticapm-no-error'))
assert len(django_elasticapm_client.events) == 1
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert 'exception' in event
exc = event['exception']
assert exc['type'] == 'ImportError'
assert exc['message'] == 'ImportError: response'
assert event['culprit'] == 'tests.contrib.django.testapp.middleware.process_response'
def test_broken_500_handler_with_middleware(django_elasticapm_client, client):
with override_settings(BREAK_THAT_500=True):
client.handler = MockMiddleware(MockClientHandler())
with override_settings(**middleware_setting(django.VERSION, [])):
with pytest.raises(Exception):
client.get(reverse('elasticapm-raise-exc'))
assert len(django_elasticapm_client.events) == 2
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert 'exception' in event
exc = event['exception']
assert exc['type'] == 'Exception'
assert exc['message'] == 'Exception: view exception'
assert event['culprit'] == 'tests.contrib.django.testapp.views.raise_exc'
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert 'exception' in event
exc = event['exception']
assert exc['type'] == 'ValueError'
assert exc['message'] == 'ValueError: handler500'
assert event['culprit'] == 'tests.contrib.django.testapp.urls.handler500'
def test_404_middleware(django_elasticapm_client, client):
with override_settings(**middleware_setting(django.VERSION,
['elasticapm.contrib.django.middleware.Catch404Middleware'])):
resp = client.get('/non-existant-page')
assert resp.status_code == 404
assert len(django_elasticapm_client.events) == 1
event = django_elasticapm_client.events.pop(0)['errors'][0]
assert event['log']['level'] == 'info'
assert event['log']['logger_name'] == 'http404'
assert 'request' in event['context']
request = event['context']['request']
assert request['url']['raw'] == u'http://testserver/non-existant-page'
assert request['method'] == 'GET'
assert request['body'] == None
def test_transaction_metrics(django_elasticapm_client, client):
django_elasticapm_client.instrumentation_store.get_all() # clear the store
with override_settings(**middleware_setting(
django.VERSION, ['elasticapm.contrib.django.middleware.TracingMiddleware']
)):
assert len(django_elasticapm_client.instrumentation_store) == 0
client.get(reverse('elasticapm-no-error'))
assert len(django_elasticapm_client.instrumentation_store) == 1
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert len(transactions) == 1
transaction = transactions[0]
assert transaction['duration'] > 0
assert transaction['result'] == 'HTTP 2xx'
assert transaction['name'] == 'GET tests.contrib.django.testapp.views.no_error'
def test_request_metrics_301_append_slash(django_elasticapm_client, client):
django_elasticapm_client.instrumentation_store.get_all() # clear the store
# enable middleware wrapping
django_elasticapm_client.config.instrument_django_middleware = True
from elasticapm.contrib.django.middleware import TracingMiddleware
TracingMiddleware._elasticapm_instrumented = False
with override_settings(
APPEND_SLASH=True,
**middleware_setting(django.VERSION, [
'elasticapm.contrib.django.middleware.TracingMiddleware',
'django.middleware.common.CommonMiddleware',
])
):
client.get(reverse('elasticapm-no-error-slash')[:-1])
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert transactions[0]['name'] in (
# django <= 1.8
'GET django.middleware.common.CommonMiddleware.process_request',
# django 1.9+
'GET django.middleware.common.CommonMiddleware.process_response',
)
assert transactions[0]['result'] == 'HTTP 3xx'
def test_request_metrics_301_prepend_www(django_elasticapm_client, client):
django_elasticapm_client.instrumentation_store.get_all() # clear the store
# enable middleware wrapping
django_elasticapm_client.config.instrument_django_middleware = True
from elasticapm.contrib.django.middleware import TracingMiddleware
TracingMiddleware._elasticapm_instrumented = False
with override_settings(
PREPEND_WWW=True,
**middleware_setting(django.VERSION, [
'elasticapm.contrib.django.middleware.TracingMiddleware',
'django.middleware.common.CommonMiddleware',
])
):
client.get(reverse('elasticapm-no-error'))
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert transactions[0]['name'] == 'GET django.middleware.common.CommonMiddleware.process_request'
assert transactions[0]['result'] == 'HTTP 3xx'
def test_request_metrics_contrib_redirect(django_elasticapm_client, client):
django_elasticapm_client.instrumentation_store.get_all() # clear the store
# enable middleware wrapping
django_elasticapm_client.config.instrument_django_middleware = True
from elasticapm.contrib.django.middleware import TracingMiddleware
TracingMiddleware._elasticapm_instrumented = False
s = Site.objects.get(pk=1)
Redirect.objects.create(site=s, old_path='/redirect/me/', new_path='/here/')
with override_settings(
**middleware_setting(django.VERSION, [
'elasticapm.contrib.django.middleware.TracingMiddleware',
'django.contrib.redirects.middleware.RedirectFallbackMiddleware',
])
):
response = client.get('/redirect/me/')
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert transactions[0]['name'] == 'GET django.contrib.redirects.middleware.RedirectFallbackMiddleware.process_response'
assert transactions[0]['result'] == 'HTTP 3xx'
def test_stacktrace_filtered_for_elasticapm(client, django_elasticapm_client):
with mock.patch(
"elasticapm.traces.TransactionsStore.should_collect") as should_collect:
should_collect.return_value = False
with override_settings(**middleware_setting(django.VERSION,
['elasticapm.contrib.django.middleware.TracingMiddleware'])):
resp = client.get(reverse("render-heavy-template"))
assert resp.status_code == 200
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert transactions[0]['result'] == 'HTTP 2xx'
spans = transactions[0]['spans']
expected_signatures = ['transaction', 'list_users.html',
'something_expensive']
assert spans[1]['name'] == 'list_users.html'
# Top frame should be inside django rendering
assert spans[1]['stacktrace'][0]['module'].startswith('django.template')
def test_perf_template_render(benchmark, client, django_elasticapm_client):
responses = []
with mock.patch("elasticapm.traces.TransactionsStore.should_collect") as should_collect:
should_collect.return_value = False
with override_settings(**middleware_setting(django.VERSION,
['elasticapm.contrib.django.middleware.TracingMiddleware'])):
benchmark(lambda: responses.append(
client_get(client, reverse("render-heavy-template"))
))
for resp in responses:
assert resp.status_code == 200
transactions = django_elasticapm_client.instrumentation_store.get_all()
# If the test falls right at the change from one minute to another
# this will have two items.
assert len(transactions) == len(responses)
for transaction in transactions:
assert len(transaction['spans']) == 2
assert transaction['result'] == 'HTTP 2xx'
def test_perf_database_render(benchmark, client, django_elasticapm_client):
responses = []
django_elasticapm_client.instrumentation_store.get_all()
with mock.patch("elasticapm.traces.TransactionsStore.should_collect") as should_collect:
should_collect.return_value = False
with override_settings(**middleware_setting(django.VERSION,
['elasticapm.contrib.django.middleware.TracingMiddleware'])):
benchmark(lambda: responses.append(
client_get(client, reverse("render-user-template"))
))
for resp in responses:
assert resp.status_code == 200
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert len(transactions) == len(responses)
for transaction in transactions:
assert len(transaction['spans']) in (102, 103)
def test_template_rendering_django18_jinja2(should_collect, django_elasticapm_client, client):
should_collect.return_value = False
with override_settings(
TEMPLATES=TEMPLATES,
**middleware_setting(django.VERSION,
['elasticapm.contrib.django.middleware.TracingMiddleware'])
):
client.get(reverse('render-jinja2-template'))
client.get(reverse('render-jinja2-template'))
client.get(reverse('render-jinja2-template'))
transactions = django_elasticapm_client.instrumentation_store.get_all()
assert len(transactions) == 3
spans = transactions[0]['spans']
assert len(spans) == 1, [t['name'] for t in spans]
kinds = ['template.jinja2']
assert set([t['type'] for t in spans]) == set(kinds)
assert spans[0]['type'] == 'template.jinja2'
assert spans[0]['name'] == 'jinja2_template.html'
assert spans[0]['parent'] is None
def test_get_expire_at_browser_close(self):
# Tests get_expire_at_browser_close with different settings and different
# set_expiry calls
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=False):
self.session.set_expiry(10)
self.assertFalse(self.session.get_expire_at_browser_close())
self.session.set_expiry(0)
self.assertTrue(self.session.get_expire_at_browser_close())
self.session.set_expiry(None)
self.assertFalse(self.session.get_expire_at_browser_close())
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=True):
self.session.set_expiry(10)
self.assertFalse(self.session.get_expire_at_browser_close())
self.session.set_expiry(0)
self.assertTrue(self.session.get_expire_at_browser_close())
self.session.set_expiry(None)
self.assertTrue(self.session.get_expire_at_browser_close())
def test_actual_expiry(self):
# this doesn't work with JSONSerializer (serializing timedelta)
with override_settings(SESSION_SERIALIZER='django.contrib.sessions.serializers.PickleSerializer'):
self.session = self.backend() # reinitialize after overriding settings
# Regression test for #19200
old_session_key = None
new_session_key = None
try:
self.session['foo'] = 'bar'
self.session.set_expiry(-timedelta(seconds=10))
self.session.save()
old_session_key = self.session.session_key
# With an expiry date in the past, the session expires instantly.
new_session = self.backend(self.session.session_key)
new_session_key = new_session.session_key
self.assertNotIn('foo', new_session)
finally:
self.session.delete(old_session_key)
self.session.delete(new_session_key)
def test_upload_video(self):
mock_backend = Mock(return_value=Mock(
upload_video=Mock(),
start_transcoding=Mock(return_value=[]),
iter_formats=Mock(return_value=[]),
))
factories.VideoUploadUrlFactory(
was_used=False,
public_video_id='videoid',
expires_at=time() + 3600
)
file_object = Mock()
file_object.name = "Some video.mp4"
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.upload_video('videoid', file_object)
self.assertEqual(1, models.Video.objects.count())
self.assertEqual(1, models.VideoUploadUrl.objects.count())
video = models.Video.objects.get()
video_upload_url = models.VideoUploadUrl.objects.get()
self.assertEqual("Some video.mp4", video.title)
self.assertLess(10, len(video.public_thumbnail_id))
self.assertTrue(video_upload_url.was_used)
def test_upload_url_invalidated_after_failed_upload(self):
mock_backend = Mock(return_value=Mock(
upload_video=Mock(side_effect=ValueError),
))
factories.VideoUploadUrlFactory(
was_used=False,
public_video_id='videoid',
expires_at=time() + 3600
)
file_object = Mock()
file_object.name = "Some video.mp4"
with override_settings(PLUGIN_BACKEND=mock_backend):
self.assertRaises(ValueError, tasks.upload_video, 'videoid', file_object)
self.assertEqual(0, models.Video.objects.count())
self.assertEqual(0, models.VideoUploadUrl.objects.available().count())
def test_transcode_video_success(self):
factories.VideoFactory(public_id='videoid', public_thumbnail_id='thumbid')
mock_backend = Mock(return_value=Mock(
start_transcoding=Mock(return_value=['job1']),
check_progress=Mock(return_value=(42, True)),
iter_formats=Mock(return_value=[('SD', 128)]),
create_thumbnail=Mock(),
))
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video('videoid')
self.assertEqual(1, models.ProcessingState.objects.count())
video_processing_state = models.ProcessingState.objects.get()
self.assertEqual(models.ProcessingState.STATUS_SUCCESS, video_processing_state.status)
self.assertEqual("", video_processing_state.message)
self.assertEqual(42, video_processing_state.progress)
mock_backend.return_value.create_thumbnail.assert_called_once_with('videoid', 'thumbid')
mock_backend.return_value.check_progress.assert_called_once_with('job1')
self.assertEqual(1, models.VideoFormat.objects.count())
video_format = models.VideoFormat.objects.get()
self.assertEqual('videoid', video_format.video.public_id)
self.assertEqual('SD', video_format.name)
self.assertEqual(128, video_format.bitrate)
def test_video_transcoding_failure_invalidates_cache(self):
# Login
user = models.User.objects.create(username="test", is_active=True)
user.set_password("password")
user.save()
self.client.login(username="test", password="password")
factories.VideoFactory(public_id='videoid', owner=user)
mock_backend = Mock(return_value=Mock(
start_transcoding=Mock(return_value=['job']),
check_progress=Mock(side_effect=exceptions.TranscodingFailed),
))
video_pre_transcoding = self.client.get(reverse("api:v1:video-detail", kwargs={"id": 'videoid'})).json()
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video('videoid')
video_post_transcoding = self.client.get(reverse("api:v1:video-detail", kwargs={"id": 'videoid'})).json()
self.assertEqual('pending', video_pre_transcoding['processing']['status'])
self.assertEqual('failed', video_post_transcoding['processing']['status'])
def test_transcode_video_twice(self):
factories.VideoFactory(public_id='videoid')
mock_backend = Mock(return_value=Mock(
start_transcoding=Mock(return_value=['job1']),
iter_formats=Mock(return_value=[]),
))
# First attempt: failure
mock_backend.return_value.check_progress = Mock(side_effect=exceptions.TranscodingFailed)
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video('videoid')
# Second attempt: success
mock_backend.return_value.check_progress = Mock(return_value=(100, True))
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video('videoid')
video_processing_state = models.ProcessingState.objects.get()
self.assertEqual(models.ProcessingState.STATUS_SUCCESS, video_processing_state.status)
self.assertEqual("", video_processing_state.message)
self.assertEqual(100, video_processing_state.progress)
def test_transcode_video_restart(self):
video = factories.VideoFactory(public_id='videoid')
models.ProcessingState.objects.filter(video=video).update(status=models.ProcessingState.STATUS_RESTART)
mock_backend = Mock(return_value=Mock(
start_transcoding=Mock(return_value=[]),
iter_formats=Mock(return_value=[]),
))
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video_restart()
mock_backend.return_value.start_transcoding.assert_called_once_with('videoid')
self.assertEqual(
models.ProcessingState.STATUS_SUCCESS,
models.ProcessingState.objects.get(video=video).status
)
def test_transcode_video_restart_fails(self):
video = factories.VideoFactory(public_id='videoid')
models.ProcessingState.objects.filter(video=video).update(status=models.ProcessingState.STATUS_RESTART)
mock_backend = Mock(return_value=Mock(
start_transcoding=Mock(return_value=[1]),
check_progress=Mock(side_effect=exceptions.TranscodingFailed),
))
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video_restart()
self.assertEqual(
models.ProcessingState.STATUS_FAILED,
models.ProcessingState.objects.get(video=video).status
)
mock_backend.return_value.delete_video.assert_not_called()
def test_video_is_deleted_during_transcoding(self):
factories.VideoFactory(public_id='videoid')
def start_transcoding(video_id):
models.Video.objects.filter(public_id='videoid').delete()
return []
mock_backend = Mock(return_value=Mock(
start_transcoding=start_transcoding,
iter_formats=Mock(return_value=[]),
))
with override_settings(PLUGIN_BACKEND=mock_backend):
tasks.transcode_video('videoid')
self.assertEqual(0, models.Video.objects.count())
self.assertEqual(0, models.ProcessingState.objects.count())
mock_backend.return_value.delete_video.assert_called_once()
def test_apns_config(self):
message = {
"device": self.apns_device,
"message": "msg",
"data": {}
}
# test with no settings
self.assertRaises(ImproperlyConfigured, apns_send_message, **message)
# test without certificate set
with override_settings(UNIVERSAL_NOTIFICATIONS_MOBILE_APPS={"app1": {"GCM_API_KEY": "key"}}):
self.assertRaises(ImproperlyConfigured, apns_send_message, **message)
# test unreadable certificate
with override_settings(UNIVERSAL_NOTIFICATIONS_MOBILE_APPS={"app1": {"APNS_CERTIFICATE": "123d"}}):
self.assertRaises(ImproperlyConfigured, apns_send_message, **message)
def setUp(self):
self.css = '<link rel="stylesheet" href="/static/css/one.css" type="text/css" />'
self.tmpdir = mkdtemp()
new_static_root = os.path.join(self.tmpdir, "static")
copytree(settings.STATIC_ROOT, new_static_root)
self.override_settings = self.settings(
COMPRESS_ENABLED=True,
COMPRESS_PRECOMPILERS=(),
COMPRESS_DEBUG_TOGGLE='nocompress',
DEBUG=True,
STATIC_ROOT=new_static_root,
COMPRESS_ROOT=new_static_root,
STATICFILES_DIRS=[settings.COMPRESS_ROOT]
)
self.override_settings.__enter__()
def test_get_expire_at_browser_close(self):
# Tests get_expire_at_browser_close with different settings and different
# set_expiry calls
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=False):
self.session.set_expiry(10)
self.assertFalse(self.session.get_expire_at_browser_close())
self.session.set_expiry(0)
self.assertTrue(self.session.get_expire_at_browser_close())
self.session.set_expiry(None)
self.assertFalse(self.session.get_expire_at_browser_close())
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=True):
self.session.set_expiry(10)
self.assertFalse(self.session.get_expire_at_browser_close())
self.session.set_expiry(0)
self.assertTrue(self.session.get_expire_at_browser_close())
self.session.set_expiry(None)
self.assertTrue(self.session.get_expire_at_browser_close())
def test_actual_expiry(self):
# this doesn't work with JSONSerializer (serializing timedelta)
with override_settings(SESSION_SERIALIZER='django.contrib.sessions.serializers.PickleSerializer'):
self.session = self.backend() # reinitialize after overriding settings
# Regression test for #19200
old_session_key = None
new_session_key = None
try:
self.session['foo'] = 'bar'
self.session.set_expiry(-timedelta(seconds=10))
self.session.save()
old_session_key = self.session.session_key
# With an expiry date in the past, the session expires instantly.
new_session = self.backend(self.session.session_key)
new_session_key = new_session.session_key
self.assertNotIn('foo', new_session)
finally:
self.session.delete(old_session_key)
self.session.delete(new_session_key)
def override_options(options):
"""
A context manager for overriding specific configuration
Options.
"""
from django.conf import settings
from sentry.options import default_manager
wrapped = default_manager.store.get
def new_get(key, **kwargs):
try:
return options[key.name]
except KeyError:
return wrapped(key, **kwargs)
# Patch options into SENTRY_OPTIONS as well
new_options = settings.SENTRY_OPTIONS.copy()
new_options.update(options)
with override_settings(SENTRY_OPTIONS=new_options):
with patch.object(default_manager.store, 'get', side_effect=new_get):
yield
def test_form_submission_without_sessions(self):
"""
Test that a (normal, non-Crispy) form submission results in an
additional Sample instance.
"""
# Disable session MW
MW = list(settings.MIDDLEWARE_CLASSES)
MW.remove('django.contrib.sessions.middleware.SessionMiddleware')
with override_settings(MiddleWare=MW):
# First get the number Sample-instances
current_samples = Sample.objects.count()
sample_name = self.get_random_string(10)
sample_msg = self.get_random_string(100)
action_url = reverse(
'cmsplugin_form_handler:process_form',
args=(self.model_form_plugin.pk, )
)
response = self.client.post(action_url, {
'name': sample_name,
'message': sample_msg,
'cmsplugin_form_source_url': '/en/',
}, follow=True)
self.assertEqual(response.status_code, 200)
assert Sample.objects.count() > current_samples