def record_factory(zone, created=None, **validated_data):
record_type = validated_data.pop('type')
if record_type == POLICY_ROUTED:
assert len(validated_data['values']) == 1
policy_id = validated_data['values'][0]
try:
policy = models.Policy.objects.get(id=policy_id)
except models.Policy.DoesNotExist:
raise SuspiciousOperation("Policy {} does not exists.".format(
policy_id))
record_model = models.PolicyRecord.new_or_deleted(name=validated_data['name'], zone=zone)
obj = PolicyRecord(
policy_record=record_model,
zone=zone.r53_zone,
policy=policy,
dirty=True,
created=created,
)
else:
obj = Record(zone=zone.r53_zone, type=record_type, created=created, **validated_data)
return obj
python类SuspiciousOperation()的实例源码
def _normalize_name(self, name):
"""
Normalizes the name so that paths like /path/to/ignored/../foo.txt
work. We check to make sure that the path pointed to is not outside
the directory specified by the LOCATION setting.
"""
base_path = force_text(self.location)
base_path = base_path.rstrip('/')
final_path = urljoin(base_path.rstrip('/') + "/", name)
base_path_len = len(base_path)
if (not final_path.startswith(base_path) or
final_path[base_path_len:base_path_len + 1] not in ('', '/')):
raise SuspiciousOperation("Attempted access to '%s' denied." %
name)
return final_path.lstrip('/')
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def load(self):
session_data = {}
try:
with open(self._key_to_file(), "rb") as session_file:
file_data = session_file.read()
# Don't fail if there is no data in the session file.
# We may have opened the empty placeholder file.
if file_data:
try:
session_data = self.decode(file_data)
except (EOFError, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self.create()
# Remove expired sessions.
expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
if expiry_age <= 0:
session_data = {}
self.delete()
self.create()
except (IOError, SuspiciousOperation):
self._session_key = None
return session_data
def load(self):
try:
data = self._cache.get(self.cache_key)
except Exception:
# Some backends (e.g. memcache) raise an exception on invalid
# cache keys. If this happens, reset the session. See #17810.
data = None
if data is None:
# Duplicate DBStore.load, because we need to keep track
# of the expiry date to set it properly in the cache.
try:
s = self.model.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
data = self.decode(s.session_data)
self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
except (self.model.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self._session_key = None
data = {}
return data
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
return {}
def verify_step2(request, uidb64, token):
bytes_uid = urlsafe_base64_decode(uidb64)
try:
uid = int(bytes_uid)
except ValueError:
raise SuspiciousOperation('verify_step2 received invalid base64 user ID: {}'.format(
bytes_uid))
if uid != request.user.id:
raise PermissionDenied('UID mismatch - user is {}, request was for {}'.format(
request.user.id, uid))
user = get_object_or_404(models.User, pk=uid)
if not verify_token_generator.check_token(user, token):
raise Http404('token invalid')
if not user.email_verified:
user.email_verified = True
user.save()
messages.success(request, _('Your email has been verified successfully. Thanks!'))
else:
messages.info(request, _('Your email address has already been verified.'))
return redirect('index')
def load(self):
session_data = {}
try:
with open(self._key_to_file(), "rb") as session_file:
file_data = session_file.read()
# Don't fail if there is no data in the session file.
# We may have opened the empty placeholder file.
if file_data:
try:
session_data = self.decode(file_data)
except (EOFError, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self.create()
# Remove expired sessions.
expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
if expiry_age <= 0:
session_data = {}
self.delete()
self.create()
except (IOError, SuspiciousOperation):
self._session_key = None
return session_data
def load(self):
try:
data = self._cache.get(self.cache_key)
except Exception:
# Some backends (e.g. memcache) raise an exception on invalid
# cache keys. If this happens, reset the session. See #17810.
data = None
if data is None:
# Duplicate DBStore.load, because we need to keep track
# of the expiry date to set it properly in the cache.
try:
s = self.model.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
data = self.decode(s.session_data)
self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
except (self.model.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self._session_key = None
data = {}
return data
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
return {}
def load(self):
session_data = {}
try:
with open(self._key_to_file(), "rb") as session_file:
file_data = session_file.read()
# Don't fail if there is no data in the session file.
# We may have opened the empty placeholder file.
if file_data:
try:
session_data = self.decode(file_data)
except (EOFError, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self.create()
# Remove expired sessions.
expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
if expiry_age <= 0:
session_data = {}
self.delete()
self.create()
except (IOError, SuspiciousOperation):
self._session_key = None
return session_data
def load(self):
try:
data = self._cache.get(self.cache_key)
except Exception:
# Some backends (e.g. memcache) raise an exception on invalid
# cache keys. If this happens, reset the session. See #17810.
data = None
if data is None:
# Duplicate DBStore.load, because we need to keep track
# of the expiry date to set it properly in the cache.
try:
s = self.model.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
data = self.decode(s.session_data)
self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
except (self.model.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self._session_key = None
data = {}
return data
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
return {}
def ensure_safe_url(url, allowed_protocols=None, allowed_host=None, raise_on_fail=False):
if allowed_protocols is None:
allowed_protocols = ["http", "https"]
parsed = urlparse.urlparse(url)
# perform security checks to ensure no malicious intent
# (i.e., an XSS attack with a data URL)
safe = True
if parsed.scheme and parsed.scheme not in allowed_protocols:
if raise_on_fail:
raise SuspiciousOperation("Unsafe redirect to URL with protocol '%s'" % parsed.scheme)
safe = False
if allowed_host and parsed.netloc and parsed.netloc != allowed_host:
if raise_on_fail:
raise SuspiciousOperation("Unsafe redirect to URL not matching host '%s'" % allowed_host)
safe = False
return safe
def ensure_safe_url(url, allowed_protocols=None, allowed_host=None, raise_on_fail=False):
if allowed_protocols is None:
allowed_protocols = ["http", "https"]
parsed = urlparse(url)
# perform security checks to ensure no malicious intent
# (i.e., an XSS attack with a data URL)
safe = True
if parsed.scheme and parsed.scheme not in allowed_protocols:
if raise_on_fail:
raise SuspiciousOperation("Unsafe redirect to URL with protocol '{0}'".format(parsed.scheme))
safe = False
if allowed_host and parsed.netloc and parsed.netloc != allowed_host:
if raise_on_fail:
raise SuspiciousOperation("Unsafe redirect to URL not matching host '{0}'".format(allowed_host))
safe = False
return safe
def load(self):
try:
s = Session.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
self.user_id = s.user_id
# do not overwrite user_agent/ip, as those might have been updated
if self.user_agent != s.user_agent or self.ip != s.ip:
self.modified = True
return self.decode(s.session_data)
except (Session.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
self.create()
return {}
def test_build_invalid_zip(self):
attachment_name = 'build.json'
attachment_path = os.path.join(settings.MEDIA_ROOT, 'test_attachment')
copyfile(os.path.join(FILES_PATH, 'invalid_archive.zip'),
attachment_path)
attachment = Attachment(
name=attachment_name,
buildable=True
)
attachment.file = os.path.join('..', attachment_path)
attachment.save()
kwargs = {
'key': TRACKER_ATTACHMENT_EXECUTED,
'campaign_id': 1,
'target_id': 1,
'value': 'tracker: not opened',
}
tracker = Tracker.objects.create(**kwargs)
with self.assertRaises(SuspiciousOperation):
attachment.build(tracker)
def convert_exception_to_response(get_response):
"""
Wrap the given get_response callable in exception-to-response conversion.
All exceptions will be converted. All known 4xx exceptions (Http404,
PermissionDenied, MultiPartParserError, SuspiciousOperation) will be
converted to the appropriate response, and all other exceptions will be
converted to 500 responses.
This decorator is automatically applied to all middleware to ensure that
no middleware leaks an exception and that the next middleware in the stack
can rely on getting a response instead of an exception.
"""
@wraps(get_response, assigned=available_attrs(get_response))
def inner(request):
try:
response = get_response(request)
except Exception as exc:
response = response_for_exception(request, exc)
return response
return inner
def load(self):
session_data = {}
try:
with open(self._key_to_file(), "rb") as session_file:
file_data = session_file.read()
# Don't fail if there is no data in the session file.
# We may have opened the empty placeholder file.
if file_data:
try:
session_data = self.decode(file_data)
except (EOFError, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self.create()
# Remove expired sessions.
expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
if expiry_age <= 0:
session_data = {}
self.delete()
self.create()
except (IOError, SuspiciousOperation):
self._session_key = None
return session_data
def load(self):
try:
data = self._cache.get(self.cache_key)
except Exception:
# Some backends (e.g. memcache) raise an exception on invalid
# cache keys. If this happens, reset the session. See #17810.
data = None
if data is None:
# Duplicate DBStore.load, because we need to keep track
# of the expiry date to set it properly in the cache.
try:
s = self.model.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
data = self.decode(s.session_data)
self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
except (self.model.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self._session_key = None
data = {}
return data
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def skip_suspicious_operations(record):
"""Prevent django from sending 500 error
email notifications for SuspiciousOperation
events, since they are not true server errors,
especially when related to the ALLOWED_HOSTS
configuration
background and more information:
http://www.tiwoc.de/blog/2013/03/django-prevent-email-notification-on-susp\
iciousoperation/
"""
if record.exc_info:
exc_value = record.exc_info[1]
if isinstance(exc_value, SuspiciousOperation):
return False
return True
# A sample logging configuration. The only tangible logging
# performed by this configuration is to send an email to
# the site admins on every HTTP 500 error.
# See http://docs.djangoproject.com/en/dev/topics/logging for
# more details on how to customize your logging configuration.
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' %
e.__class__.__name__)
logger.warning(force_text(e))
return {}
def _normalize_name(self, name):
"""
Normalizes the name so that paths like /path/to/ignored/../foo.txt
work. We check to make sure that the path pointed to is not outside
the directory specified by the LOCATION setting.
"""
base_path = force_text(self.location)
base_path = base_path.rstrip('/')
final_path = urljoin(base_path.rstrip('/') + "/", name)
base_path_len = len(base_path)
if (not final_path.startswith(base_path) or
final_path[base_path_len:base_path_len + 1] not in ('', '/')):
raise SuspiciousOperation("Attempted access to '%s' denied." %
name)
return final_path.lstrip('/')
def load(self):
session_data = {}
try:
with open(self._key_to_file(), "rb") as session_file:
file_data = session_file.read()
# Don't fail if there is no data in the session file.
# We may have opened the empty placeholder file.
if file_data:
try:
session_data = self.decode(file_data)
except (EOFError, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self.create()
# Remove expired sessions.
expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
if expiry_age <= 0:
session_data = {}
self.delete()
self.create()
except (IOError, SuspiciousOperation):
self._session_key = None
return session_data
def load(self):
try:
data = self._cache.get(self.cache_key)
except Exception:
# Some backends (e.g. memcache) raise an exception on invalid
# cache keys. If this happens, reset the session. See #17810.
data = None
if data is None:
# Duplicate DBStore.load, because we need to keep track
# of the expiry date to set it properly in the cache.
try:
s = self.model.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
data = self.decode(s.session_data)
self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
except (self.model.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self._session_key = None
data = {}
return data
def decode(self, session_data):
encoded_data = base64.b64decode(force_bytes(session_data))
try:
# could produce ValueError if there is no ':'
hash, serialized = encoded_data.split(b':', 1)
expected_hash = self._hash(serialized)
if not constant_time_compare(hash.decode(), expected_hash):
raise SuspiciousSession("Session data corrupted")
else:
return self.serializer().loads(serialized)
except Exception as e:
# ValueError, SuspiciousOperation, unpickling exceptions. If any of
# these happen, just return an empty dictionary (an empty session).
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
return {}