python类SuspiciousOperation()的实例源码

record.py 文件源码 项目:zinc 作者: PressLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def record_factory(zone, created=None, **validated_data):
    record_type = validated_data.pop('type')
    if record_type == POLICY_ROUTED:
        assert len(validated_data['values']) == 1
        policy_id = validated_data['values'][0]
        try:
            policy = models.Policy.objects.get(id=policy_id)
        except models.Policy.DoesNotExist:
            raise SuspiciousOperation("Policy {}  does not exists.".format(
                policy_id))
        record_model = models.PolicyRecord.new_or_deleted(name=validated_data['name'], zone=zone)
        obj = PolicyRecord(
            policy_record=record_model,
            zone=zone.r53_zone,
            policy=policy,
            dirty=True,
            created=created,
        )
    else:
        obj = Record(zone=zone.r53_zone, type=record_type, created=created, **validated_data)
    return obj
backends.py 文件源码 项目:django-aliyun-oss2-storage 作者: xiewenya 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _normalize_name(self, name):
        """
        Normalizes the name so that paths like /path/to/ignored/../foo.txt
        work. We check to make sure that the path pointed to is not outside
        the directory specified by the LOCATION setting.
        """

        base_path = force_text(self.location)
        base_path = base_path.rstrip('/')

        final_path = urljoin(base_path.rstrip('/') + "/", name)

        base_path_len = len(base_path)
        if (not final_path.startswith(base_path) or
                final_path[base_path_len:base_path_len + 1] not in ('', '/')):
            raise SuspiciousOperation("Attempted access to '%s' denied." %
                                      name)
        return final_path.lstrip('/')
base.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                        e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
file.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self):
        session_data = {}
        try:
            with open(self._key_to_file(), "rb") as session_file:
                file_data = session_file.read()
            # Don't fail if there is no data in the session file.
            # We may have opened the empty placeholder file.
            if file_data:
                try:
                    session_data = self.decode(file_data)
                except (EOFError, SuspiciousOperation) as e:
                    if isinstance(e, SuspiciousOperation):
                        logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                        logger.warning(force_text(e))
                    self.create()

                # Remove expired sessions.
                expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
                if expiry_age <= 0:
                    session_data = {}
                    self.delete()
                    self.create()
        except (IOError, SuspiciousOperation):
            self._session_key = None
        return session_data
cached_db.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load(self):
        try:
            data = self._cache.get(self.cache_key)
        except Exception:
            # Some backends (e.g. memcache) raise an exception on invalid
            # cache keys. If this happens, reset the session. See #17810.
            data = None

        if data is None:
            # Duplicate DBStore.load, because we need to keep track
            # of the expiry date to set it properly in the cache.
            try:
                s = self.model.objects.get(
                    session_key=self.session_key,
                    expire_date__gt=timezone.now()
                )
                data = self.decode(s.session_data)
                self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
            except (self.model.DoesNotExist, SuspiciousOperation) as e:
                if isinstance(e, SuspiciousOperation):
                    logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                    logger.warning(force_text(e))
                self._session_key = None
                data = {}
        return data
base.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
views.py 文件源码 项目:SpongeAuth 作者: lukegb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def verify_step2(request, uidb64, token):
    bytes_uid = urlsafe_base64_decode(uidb64)
    try:
        uid = int(bytes_uid)
    except ValueError:
        raise SuspiciousOperation('verify_step2 received invalid base64 user ID: {}'.format(
            bytes_uid))
    if uid != request.user.id:
        raise PermissionDenied('UID mismatch - user is {}, request was for {}'.format(
            request.user.id, uid))
    user = get_object_or_404(models.User, pk=uid)
    if not verify_token_generator.check_token(user, token):
        raise Http404('token invalid')

    if not user.email_verified:
        user.email_verified = True
        user.save()
        messages.success(request, _('Your email has been verified successfully. Thanks!'))
    else:
        messages.info(request, _('Your email address has already been verified.'))
    return redirect('index')
file.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def load(self):
        session_data = {}
        try:
            with open(self._key_to_file(), "rb") as session_file:
                file_data = session_file.read()
            # Don't fail if there is no data in the session file.
            # We may have opened the empty placeholder file.
            if file_data:
                try:
                    session_data = self.decode(file_data)
                except (EOFError, SuspiciousOperation) as e:
                    if isinstance(e, SuspiciousOperation):
                        logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                        logger.warning(force_text(e))
                    self.create()

                # Remove expired sessions.
                expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
                if expiry_age <= 0:
                    session_data = {}
                    self.delete()
                    self.create()
        except (IOError, SuspiciousOperation):
            self._session_key = None
        return session_data
cached_db.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load(self):
        try:
            data = self._cache.get(self.cache_key)
        except Exception:
            # Some backends (e.g. memcache) raise an exception on invalid
            # cache keys. If this happens, reset the session. See #17810.
            data = None

        if data is None:
            # Duplicate DBStore.load, because we need to keep track
            # of the expiry date to set it properly in the cache.
            try:
                s = self.model.objects.get(
                    session_key=self.session_key,
                    expire_date__gt=timezone.now()
                )
                data = self.decode(s.session_data)
                self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
            except (self.model.DoesNotExist, SuspiciousOperation) as e:
                if isinstance(e, SuspiciousOperation):
                    logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                    logger.warning(force_text(e))
                self._session_key = None
                data = {}
        return data
base.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
file.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load(self):
        session_data = {}
        try:
            with open(self._key_to_file(), "rb") as session_file:
                file_data = session_file.read()
            # Don't fail if there is no data in the session file.
            # We may have opened the empty placeholder file.
            if file_data:
                try:
                    session_data = self.decode(file_data)
                except (EOFError, SuspiciousOperation) as e:
                    if isinstance(e, SuspiciousOperation):
                        logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                        logger.warning(force_text(e))
                    self.create()

                # Remove expired sessions.
                expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
                if expiry_age <= 0:
                    session_data = {}
                    self.delete()
                    self.create()
        except (IOError, SuspiciousOperation):
            self._session_key = None
        return session_data
cached_db.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load(self):
        try:
            data = self._cache.get(self.cache_key)
        except Exception:
            # Some backends (e.g. memcache) raise an exception on invalid
            # cache keys. If this happens, reset the session. See #17810.
            data = None

        if data is None:
            # Duplicate DBStore.load, because we need to keep track
            # of the expiry date to set it properly in the cache.
            try:
                s = self.model.objects.get(
                    session_key=self.session_key,
                    expire_date__gt=timezone.now()
                )
                data = self.decode(s.session_data)
                self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
            except (self.model.DoesNotExist, SuspiciousOperation) as e:
                if isinstance(e, SuspiciousOperation):
                    logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                    logger.warning(force_text(e))
                self._session_key = None
                data = {}
        return data
base.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
utils.py 文件源码 项目:nrp 作者: django-rea 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ensure_safe_url(url, allowed_protocols=None, allowed_host=None, raise_on_fail=False):
    if allowed_protocols is None:
        allowed_protocols = ["http", "https"]
    parsed = urlparse.urlparse(url)
    # perform security checks to ensure no malicious intent
    # (i.e., an XSS attack with a data URL)
    safe = True
    if parsed.scheme and parsed.scheme not in allowed_protocols:
        if raise_on_fail:
            raise SuspiciousOperation("Unsafe redirect to URL with protocol '%s'" % parsed.scheme)
        safe = False
    if allowed_host and parsed.netloc and parsed.netloc != allowed_host:
        if raise_on_fail:
            raise SuspiciousOperation("Unsafe redirect to URL not matching host '%s'" % allowed_host)
        safe = False
    return safe
utils.py 文件源码 项目:wagtail_room_booking 作者: Tamriel 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ensure_safe_url(url, allowed_protocols=None, allowed_host=None, raise_on_fail=False):
    if allowed_protocols is None:
        allowed_protocols = ["http", "https"]
    parsed = urlparse(url)
    # perform security checks to ensure no malicious intent
    # (i.e., an XSS attack with a data URL)
    safe = True
    if parsed.scheme and parsed.scheme not in allowed_protocols:
        if raise_on_fail:
            raise SuspiciousOperation("Unsafe redirect to URL with protocol '{0}'".format(parsed.scheme))
        safe = False
    if allowed_host and parsed.netloc and parsed.netloc != allowed_host:
        if raise_on_fail:
            raise SuspiciousOperation("Unsafe redirect to URL not matching host '{0}'".format(allowed_host))
        safe = False
    return safe
db.py 文件源码 项目:acacia_main 作者: AcaciaTrading 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load(self):
        try:
            s = Session.objects.get(
                session_key=self.session_key,
                expire_date__gt=timezone.now()
            )
            self.user_id = s.user_id
            # do not overwrite user_agent/ip, as those might have been updated
            if self.user_agent != s.user_agent or self.ip != s.ip:
                self.modified = True
            return self.decode(s.session_data)
        except (Session.DoesNotExist, SuspiciousOperation) as e:
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                                           e.__class__.__name__)
                logger.warning(force_text(e))
            self.create()
            return {}
attachments.py 文件源码 项目:mercure 作者: synhack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_build_invalid_zip(self):
        attachment_name = 'build.json'
        attachment_path = os.path.join(settings.MEDIA_ROOT, 'test_attachment')
        copyfile(os.path.join(FILES_PATH, 'invalid_archive.zip'),
                 attachment_path)
        attachment = Attachment(
            name=attachment_name,
            buildable=True
        )
        attachment.file = os.path.join('..', attachment_path)
        attachment.save()

        kwargs = {
            'key': TRACKER_ATTACHMENT_EXECUTED,
            'campaign_id': 1,
            'target_id': 1,
            'value': 'tracker: not opened',
        }
        tracker = Tracker.objects.create(**kwargs)
        with self.assertRaises(SuspiciousOperation):
            attachment.build(tracker)
exception.py 文件源码 项目:Gypsy 作者: benticarlos 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def convert_exception_to_response(get_response):
    """
    Wrap the given get_response callable in exception-to-response conversion.

    All exceptions will be converted. All known 4xx exceptions (Http404,
    PermissionDenied, MultiPartParserError, SuspiciousOperation) will be
    converted to the appropriate response, and all other exceptions will be
    converted to 500 responses.

    This decorator is automatically applied to all middleware to ensure that
    no middleware leaks an exception and that the next middleware in the stack
    can rely on getting a response instead of an exception.
    """
    @wraps(get_response, assigned=available_attrs(get_response))
    def inner(request):
        try:
            response = get_response(request)
        except Exception as exc:
            response = response_for_exception(request, exc)
        return response
    return inner
file.py 文件源码 项目:Gypsy 作者: benticarlos 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load(self):
        session_data = {}
        try:
            with open(self._key_to_file(), "rb") as session_file:
                file_data = session_file.read()
            # Don't fail if there is no data in the session file.
            # We may have opened the empty placeholder file.
            if file_data:
                try:
                    session_data = self.decode(file_data)
                except (EOFError, SuspiciousOperation) as e:
                    if isinstance(e, SuspiciousOperation):
                        logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                        logger.warning(force_text(e))
                    self.create()

                # Remove expired sessions.
                expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
                if expiry_age <= 0:
                    session_data = {}
                    self.delete()
                    self.create()
        except (IOError, SuspiciousOperation):
            self._session_key = None
        return session_data
cached_db.py 文件源码 项目:Gypsy 作者: benticarlos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load(self):
        try:
            data = self._cache.get(self.cache_key)
        except Exception:
            # Some backends (e.g. memcache) raise an exception on invalid
            # cache keys. If this happens, reset the session. See #17810.
            data = None

        if data is None:
            # Duplicate DBStore.load, because we need to keep track
            # of the expiry date to set it properly in the cache.
            try:
                s = self.model.objects.get(
                    session_key=self.session_key,
                    expire_date__gt=timezone.now()
                )
                data = self.decode(s.session_data)
                self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
            except (self.model.DoesNotExist, SuspiciousOperation) as e:
                if isinstance(e, SuspiciousOperation):
                    logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                    logger.warning(force_text(e))
                self._session_key = None
                data = {}
        return data
base.py 文件源码 项目:DjangoBlog 作者: 0daybug 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                        e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
common.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def skip_suspicious_operations(record):
    """Prevent django from sending 500 error
    email notifications for SuspiciousOperation
    events, since they are not true server errors,
    especially when related to the ALLOWED_HOSTS
    configuration

    background and more information:
    http://www.tiwoc.de/blog/2013/03/django-prevent-email-notification-on-susp\
    iciousoperation/
    """
    if record.exc_info:
        exc_value = record.exc_info[1]
        if isinstance(exc_value, SuspiciousOperation):
            return False
    return True

# A sample logging configuration. The only tangible logging
# performed by this configuration is to send an email to
# the site admins on every HTTP 500 error.
# See http://docs.djangoproject.com/en/dev/topics/logging for
# more details on how to customize your logging configuration.
base.py 文件源码 项目:wanblog 作者: wanzifa 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                        e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
base.py 文件源码 项目:tabmaster 作者: NicolasMinghetti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                        e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
base.py 文件源码 项目:trydjango18 作者: lucifer-yqh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                        e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
base.py 文件源码 项目:trydjango18 作者: wei0104 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' %
                        e.__class__.__name__)
                logger.warning(force_text(e))
            return {}
storage_qiniu.py 文件源码 项目:YouPBX 作者: JoneXiong 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _normalize_name(self, name):
        """
        Normalizes the name so that paths like /path/to/ignored/../foo.txt
        work. We check to make sure that the path pointed to is not outside
        the directory specified by the LOCATION setting.
        """

        base_path = force_text(self.location)
        base_path = base_path.rstrip('/')

        final_path = urljoin(base_path.rstrip('/') + "/", name)

        base_path_len = len(base_path)
        if (not final_path.startswith(base_path) or
                final_path[base_path_len:base_path_len + 1] not in ('', '/')):
            raise SuspiciousOperation("Attempted access to '%s' denied." %
                                      name)
        return final_path.lstrip('/')
file.py 文件源码 项目:ims 作者: ims-team 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(self):
        session_data = {}
        try:
            with open(self._key_to_file(), "rb") as session_file:
                file_data = session_file.read()
            # Don't fail if there is no data in the session file.
            # We may have opened the empty placeholder file.
            if file_data:
                try:
                    session_data = self.decode(file_data)
                except (EOFError, SuspiciousOperation) as e:
                    if isinstance(e, SuspiciousOperation):
                        logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                        logger.warning(force_text(e))
                    self.create()

                # Remove expired sessions.
                expiry_age = self.get_expiry_age(expiry=self._expiry_date(session_data))
                if expiry_age <= 0:
                    session_data = {}
                    self.delete()
                    self.create()
        except (IOError, SuspiciousOperation):
            self._session_key = None
        return session_data
cached_db.py 文件源码 项目:ims 作者: ims-team 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load(self):
        try:
            data = self._cache.get(self.cache_key)
        except Exception:
            # Some backends (e.g. memcache) raise an exception on invalid
            # cache keys. If this happens, reset the session. See #17810.
            data = None

        if data is None:
            # Duplicate DBStore.load, because we need to keep track
            # of the expiry date to set it properly in the cache.
            try:
                s = self.model.objects.get(
                    session_key=self.session_key,
                    expire_date__gt=timezone.now()
                )
                data = self.decode(s.session_data)
                self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
            except (self.model.DoesNotExist, SuspiciousOperation) as e:
                if isinstance(e, SuspiciousOperation):
                    logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                    logger.warning(force_text(e))
                self._session_key = None
                data = {}
        return data
base.py 文件源码 项目:ims 作者: ims-team 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def decode(self, session_data):
        encoded_data = base64.b64decode(force_bytes(session_data))
        try:
            # could produce ValueError if there is no ':'
            hash, serialized = encoded_data.split(b':', 1)
            expected_hash = self._hash(serialized)
            if not constant_time_compare(hash.decode(), expected_hash):
                raise SuspiciousSession("Session data corrupted")
            else:
                return self.serializer().loads(serialized)
        except Exception as e:
            # ValueError, SuspiciousOperation, unpickling exceptions. If any of
            # these happen, just return an empty dictionary (an empty session).
            if isinstance(e, SuspiciousOperation):
                logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                logger.warning(force_text(e))
            return {}


问题


面经


文章

微信
公众号

扫码关注公众号