python类get()的实例源码

tests.py 文件源码 项目:django-redis-sentinel-redux 作者: danigosa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_save_dict(self):
        if isinstance(self.cache.client._serializer,
                      json_serializer.JSONSerializer):
            self.skipTest("Datetimes are not JSON serializable")

        if isinstance(self.cache.client._serializer,
                      msgpack_serializer.MSGPackSerializer):
            # MSGPackSerializer serializers use the isoformat for datetimes
            # https://github.com/msgpack/msgpack-python/issues/12
            now_dt = datetime.datetime.now().isoformat()
        else:
            now_dt = datetime.datetime.now()

        test_dict = {"id": 1, "date": now_dt, "name": "Foo"}

        self.cache.set("test_key", test_dict)
        res = self.cache.get("test_key")

        self.assertIsInstance(res, dict)
        self.assertEqual(res["id"], 1)
        self.assertEqual(res["name"], "Foo")
        self.assertEqual(res["date"], now_dt)
tests.py 文件源码 项目:django-redis-sentinel-redux 作者: danigosa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_timeout_parameter_as_positional_argument(self):
        self.cache.set("test_key", 222, -1)
        res = self.cache.get("test_key", None)
        self.assertIsNone(res)

        self.cache.set("test_key", 222, 1)
        res1 = self.cache.get("test_key", None)
        time.sleep(2)
        res2 = self.cache.get("test_key", None)
        self.assertEqual(res1, 222)
        self.assertEqual(res2, None)

        # nx=True should not overwrite expire of key already in db
        self.cache.set("test_key", 222, 0)
        self.cache.set("test_key", 222, -1, nx=True)
        res = self.cache.get("test_key", None)
        self.assertEqual(res, 222)
tests.py 文件源码 项目:django-redis-sentinel-redux 作者: danigosa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_sentinel_switching(self):
        if not isinstance(self.cache.client,
                          SentinelClient):
            self.skipTest("Not Sentinel clients use default master-slave setup")
        try:
            cache = caches["sample"]
            client = cache.client
            master = client.get_client(write=True)
            slave = client.get_client(write=False)

            master.set("Foo", "Bar")
            self.assertEqual(slave.get("Foo"), "Bar")
            self.assertEqual(master.info()['role'], "master")
            self.assertEqual(slave.info()['role'], "slave")
        except NotImplementedError:
            pass
tests.py 文件源码 项目:django-redis-sentinel-redux 作者: danigosa 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_invalid_key(self):
        # Submitting an invalid session key (either by guessing, or if the db has
        # removed the key) results in a new key being generated.
        try:
            session = self.backend('1')
            try:
                session.save()
            except AttributeError:
                self.fail(
                    "The session object did not save properly. "
                    "Middleware may be saving cache items without namespaces."
                )
            self.assertNotEqual(session.session_key, '1')
            self.assertEqual(session.get('cat'), None)
            session.delete()
        finally:
            # Some backends leave a stale cache entry for the invalid
            # session key; make sure that entry is manually deleted
            session.delete('1')
account.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def query_user_by_id(user_id=0, use_cache=True):
        """
        ????????
        :param user_id: ???ID
        :param use_cache: ??????
        """
        key = CACHE_KEY+str(user_id)
        if use_cache:
            account = cache.get(key)
            if account:
                return account
        try:
            account = UserAccount.objects.get(id=user_id)
            cache.set(key, account, CACHE_TIME)
            return account
        except UserAccount.DoesNotExist:
            return None
token.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def query_token_by_user_id(user_id, use_cache=True):
        """
        ?????ID?????Token
        :param user_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_TOKEN_ID+str(user_id)
        if use_cache:
            token = cache.get(key)
            if token:
                return token
        try:
            token = AccessToken.objects.order_by("-id").filter(status=1).get(user_id=user_id)
            cache.set(key, token, CACHE_TIME)
            return token
        except AccessToken.DoesNotExist:
            return None
token.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def query_token(token, use_cache=True):
        """
        ????token??????
        """
        key = CACHE_TOKEN+token
        if use_cache:
            token_result = cache.get(key)
            if token_result:
                return token_result

        try:
            token = AccessToken.objects.order_by("-id").filter(status=1).get(access_token=token)
            cache.set(key, token, CACHE_TIME)
            return token
        except AccessToken.DoesNotExist:
            return None
follow.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query_user_meta_count(user_id, is_follow=True, use_cache=True):
        """
        ???????????????
        :param user_id: ???????ID
        :param is_follow: ???True???????????????
        :param use_cache: ??????
        """
        cache_key = CACHE_FANS_COUNT_KEY + str(user_id)
        if is_follow:
            cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id)

        if use_cache:
            count = cache.get(cache_key)
            if count:
                return count

        if is_follow:
            count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
        else:
            count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id"))

        count = count["id__count"]
        cache.set(cache_key, count, CACHE_COUNT_TIME)

        return count
info.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def query_format_info_by_ease_mob(ease_mob, use_cache=True):
        """
        ?????????????????
        """
        key = CACHE_EASE_MOB_KEY + ease_mob
        if use_cache:
            result = cache.get(key)
            if result:
                return result

        try:
            user_info = UserInfo.objects.get(ease_mob=ease_mob)
            format_user_info = UserInfo.format_user_info(user_info)
            cache.set(key, format_user_info, CACHE_TIME)
            return format_user_info
        except UserInfo.DoesNotExist:
            return None
info.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def query_format_info_by_user_id(user_id, use_cache=True):
        """
        ????ID??????
        :param user_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_KEY + str(user_id)
        if use_cache:
            result = cache.get(key)
            if result:
                return result

        try:
            user_info = UserInfo.objects.get(user_id=user_id)
            format_user_info = UserInfo.format_user_info(user_info)
            cache.set(key, format_user_info, CACHE_TIME)
            return format_user_info
        except UserInfo.DoesNotExist:
            return None
article.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def query_published_article_count(user_id, use_cache=True):
        """
        ??????????????
        :param user_id: ??ID
        :param use_cache: ??????
        """
        cache_key = CACHE_ARTICLE_COUNT + str(user_id)
        if use_cache:
            count = cache.get(cache_key)
            if count:
                return count

        count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
        count = count["id__count"]
        if count:
            cache.set(cache_key, count, CACHE_TIME)
        return count
article.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def query_article_by_id(article_id=0, use_cache=True):
        """
        ????ID???????
        :param article_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_KEY_ID + str(article_id)
        if use_cache:
            cache_result = cache.get(key)
            if cache_result:
                return cache_result

        try:
            article = BlogArticle.objects.get(id=article_id)
            article = BlogArticle.format_article(article)
            cache.set(key, article, CACHE_TIME)
            return article
        except BlogArticle.DoesNotExist:
            return None
models.py 文件源码 项目:steemprojects.com 作者: noisy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def commits_over_52(self):
        cache_name = self.cache_namer(self.commits_over_52)
        value = cache.get(cache_name)
        if value is not None:
            return value
        now = datetime.now()
        commits = self.commit_set.filter(
            commit_date__gt=now - timedelta(weeks=52),
        ).values_list('commit_date', flat=True)

        weeks = [0] * 52
        for cdate in commits:
            age_weeks = (now - cdate).days // 7
            if age_weeks < 52:
                weeks[age_weeks] += 1

        value = ','.join(map(str, reversed(weeks)))
        cache.set(cache_name, value)
        return value
apps.py 文件源码 项目:django-cmsplugin-diff 作者: doctormo 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def record_plugin_history(self, sender, instance, **kwargs):
        """When a plugin is created or edited"""
        from cms.models import CMSPlugin, Page
        from .models import EditHistory
        if not isinstance(instance, CMSPlugin):
            return

        user_id = cache.get('cms-user-id')
        comment = cache.get('cms-comment')
        content = generate_content(instance)
        if content is None:
            return

        # Don't record a history of change if nothing changed.
        history = EditHistory.objects.filter(plugin_id=instance.id)
        if history.count() > 0:
            # Temporary history object for uuid
            this = EditHistory(content=content)
            latest = history.latest()
            if latest.content == content or this.uuid == latest.uuid:
                return

        EditHistory.objects.record(instance, user_id, comment, content)
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_token(self, token_only=True, scopes=None):
        if scopes is None:
            scopes = ['send_notification', 'view_room']

        cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes))

        def gen_token():
            data = {
                'grant_type': 'client_credentials',
                'scope': ' '.join(scopes),
            }
            resp = requests.post(
                self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10
            )
            if resp.status_code == 200:
                return resp.json()
            elif resp.status_code == 401:
                raise OauthClientInvalidError(self)
            else:
                raise Exception('Invalid token: %s' % resp.text)

        if token_only:
            token = cache.get(cache_key)
            if not token:
                data = gen_token()
                token = data['access_token']
                cache.set(cache_key, token, data['expires_in'] - 20)
            return token
        return gen_token()
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_room_info(self, commit=True):
        headers = {
            'Authorization': 'Bearer %s' % self.get_token(),
            'Content-Type': 'application/json'
        }
        room = requests.get(
            urljoin(self.api_base_url, 'room/%s') % self.room_id, headers=headers, timeout=5
        ).json()
        self.room_name = room['name']
        self.room_owner_id = six.text_type(room['owner']['id'])
        self.room_owner_name = room['owner']['name']
        if commit:
            self.save()
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tb):
        # If we get an invalid oauth client we better clean up the tenant
        # and swallow the error.
        if isinstance(exc_value, OauthClientInvalidError):
            self.tenant.delete()
            return True
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def for_request(request, body=None):
        """Creates the context for a specific request."""
        tenant, jwt_data = Tenant.objects.for_request(request, body)
        webhook_sender_id = jwt_data.get('sub')
        sender_data = None

        if body and 'item' in body:
            if 'sender' in body['item']:
                sender_data = body['item']['sender']
            elif 'message' in body['item'] and 'from' in body['item']['message']:
                sender_data = body['item']['message']['from']

        if sender_data is None:
            if webhook_sender_id is None:
                raise BadTenantError('Cannot identify sender in tenant')
            sender_data = {'id': webhook_sender_id}

        return Context(
            tenant=tenant,
            sender=HipchatUser(
                id=sender_data.get('id'),
                name=sender_data.get('name'),
                mention_name=sender_data.get('mention_name'),
            ),
            signed_request=request.GET.get('signed_request'),
            context=jwt_data.get('context') or {},
        )
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_event(self, event_id):
        try:
            event = Event.objects.get(pk=int(event_id))
        except (ValueError, Event.DoesNotExist):
            return None
        return self._ensure_and_bind_event(event)
page_processors.py 文件源码 项目:django-powerpages 作者: Open-E-WEB 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_request(self, request, extra_context=None):
        """Main page processing logic"""
        context = self.get_rendering_context(request)
        if extra_context:
            context.update(extra_context)
        cache_key, seconds = self.get_cache_settings(request)
        if cache_key:
            content = cache.get(cache_key)
            if content is None:
                content = self.render(context)
                cache.set(cache_key, content, seconds)
        else:
            content = self.render(context)
        return self.create_response(request, content)


问题


面经


文章

微信
公众号

扫码关注公众号