def test_save_dict(self):
if isinstance(self.cache.client._serializer,
json_serializer.JSONSerializer):
self.skipTest("Datetimes are not JSON serializable")
if isinstance(self.cache.client._serializer,
msgpack_serializer.MSGPackSerializer):
# MSGPackSerializer serializers use the isoformat for datetimes
# https://github.com/msgpack/msgpack-python/issues/12
now_dt = datetime.datetime.now().isoformat()
else:
now_dt = datetime.datetime.now()
test_dict = {"id": 1, "date": now_dt, "name": "Foo"}
self.cache.set("test_key", test_dict)
res = self.cache.get("test_key")
self.assertIsInstance(res, dict)
self.assertEqual(res["id"], 1)
self.assertEqual(res["name"], "Foo")
self.assertEqual(res["date"], now_dt)
python类get()的实例源码
def test_timeout_parameter_as_positional_argument(self):
self.cache.set("test_key", 222, -1)
res = self.cache.get("test_key", None)
self.assertIsNone(res)
self.cache.set("test_key", 222, 1)
res1 = self.cache.get("test_key", None)
time.sleep(2)
res2 = self.cache.get("test_key", None)
self.assertEqual(res1, 222)
self.assertEqual(res2, None)
# nx=True should not overwrite expire of key already in db
self.cache.set("test_key", 222, 0)
self.cache.set("test_key", 222, -1, nx=True)
res = self.cache.get("test_key", None)
self.assertEqual(res, 222)
def test_sentinel_switching(self):
if not isinstance(self.cache.client,
SentinelClient):
self.skipTest("Not Sentinel clients use default master-slave setup")
try:
cache = caches["sample"]
client = cache.client
master = client.get_client(write=True)
slave = client.get_client(write=False)
master.set("Foo", "Bar")
self.assertEqual(slave.get("Foo"), "Bar")
self.assertEqual(master.info()['role'], "master")
self.assertEqual(slave.info()['role'], "slave")
except NotImplementedError:
pass
def test_invalid_key(self):
# Submitting an invalid session key (either by guessing, or if the db has
# removed the key) results in a new key being generated.
try:
session = self.backend('1')
try:
session.save()
except AttributeError:
self.fail(
"The session object did not save properly. "
"Middleware may be saving cache items without namespaces."
)
self.assertNotEqual(session.session_key, '1')
self.assertEqual(session.get('cat'), None)
session.delete()
finally:
# Some backends leave a stale cache entry for the invalid
# session key; make sure that entry is manually deleted
session.delete('1')
def query_user_by_id(user_id=0, use_cache=True):
"""
????????
:param user_id: ???ID
:param use_cache: ??????
"""
key = CACHE_KEY+str(user_id)
if use_cache:
account = cache.get(key)
if account:
return account
try:
account = UserAccount.objects.get(id=user_id)
cache.set(key, account, CACHE_TIME)
return account
except UserAccount.DoesNotExist:
return None
def query_token_by_user_id(user_id, use_cache=True):
"""
?????ID?????Token
:param user_id: ??ID
:param use_cache: ??????
"""
key = CACHE_TOKEN_ID+str(user_id)
if use_cache:
token = cache.get(key)
if token:
return token
try:
token = AccessToken.objects.order_by("-id").filter(status=1).get(user_id=user_id)
cache.set(key, token, CACHE_TIME)
return token
except AccessToken.DoesNotExist:
return None
def query_token(token, use_cache=True):
"""
????token??????
"""
key = CACHE_TOKEN+token
if use_cache:
token_result = cache.get(key)
if token_result:
return token_result
try:
token = AccessToken.objects.order_by("-id").filter(status=1).get(access_token=token)
cache.set(key, token, CACHE_TIME)
return token
except AccessToken.DoesNotExist:
return None
def query_user_meta_count(user_id, is_follow=True, use_cache=True):
"""
???????????????
:param user_id: ???????ID
:param is_follow: ???True???????????????
:param use_cache: ??????
"""
cache_key = CACHE_FANS_COUNT_KEY + str(user_id)
if is_follow:
cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id)
if use_cache:
count = cache.get(cache_key)
if count:
return count
if is_follow:
count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
else:
count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id"))
count = count["id__count"]
cache.set(cache_key, count, CACHE_COUNT_TIME)
return count
def query_format_info_by_ease_mob(ease_mob, use_cache=True):
"""
?????????????????
"""
key = CACHE_EASE_MOB_KEY + ease_mob
if use_cache:
result = cache.get(key)
if result:
return result
try:
user_info = UserInfo.objects.get(ease_mob=ease_mob)
format_user_info = UserInfo.format_user_info(user_info)
cache.set(key, format_user_info, CACHE_TIME)
return format_user_info
except UserInfo.DoesNotExist:
return None
def query_format_info_by_user_id(user_id, use_cache=True):
"""
????ID??????
:param user_id: ??ID
:param use_cache: ??????
"""
key = CACHE_KEY + str(user_id)
if use_cache:
result = cache.get(key)
if result:
return result
try:
user_info = UserInfo.objects.get(user_id=user_id)
format_user_info = UserInfo.format_user_info(user_info)
cache.set(key, format_user_info, CACHE_TIME)
return format_user_info
except UserInfo.DoesNotExist:
return None
def query_published_article_count(user_id, use_cache=True):
"""
??????????????
:param user_id: ??ID
:param use_cache: ??????
"""
cache_key = CACHE_ARTICLE_COUNT + str(user_id)
if use_cache:
count = cache.get(cache_key)
if count:
return count
count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
count = count["id__count"]
if count:
cache.set(cache_key, count, CACHE_TIME)
return count
def query_article_by_id(article_id=0, use_cache=True):
"""
????ID???????
:param article_id: ??ID
:param use_cache: ??????
"""
key = CACHE_KEY_ID + str(article_id)
if use_cache:
cache_result = cache.get(key)
if cache_result:
return cache_result
try:
article = BlogArticle.objects.get(id=article_id)
article = BlogArticle.format_article(article)
cache.set(key, article, CACHE_TIME)
return article
except BlogArticle.DoesNotExist:
return None
def commits_over_52(self):
cache_name = self.cache_namer(self.commits_over_52)
value = cache.get(cache_name)
if value is not None:
return value
now = datetime.now()
commits = self.commit_set.filter(
commit_date__gt=now - timedelta(weeks=52),
).values_list('commit_date', flat=True)
weeks = [0] * 52
for cdate in commits:
age_weeks = (now - cdate).days // 7
if age_weeks < 52:
weeks[age_weeks] += 1
value = ','.join(map(str, reversed(weeks)))
cache.set(cache_name, value)
return value
def record_plugin_history(self, sender, instance, **kwargs):
"""When a plugin is created or edited"""
from cms.models import CMSPlugin, Page
from .models import EditHistory
if not isinstance(instance, CMSPlugin):
return
user_id = cache.get('cms-user-id')
comment = cache.get('cms-comment')
content = generate_content(instance)
if content is None:
return
# Don't record a history of change if nothing changed.
history = EditHistory.objects.filter(plugin_id=instance.id)
if history.count() > 0:
# Temporary history object for uuid
this = EditHistory(content=content)
latest = history.latest()
if latest.content == content or this.uuid == latest.uuid:
return
EditHistory.objects.record(instance, user_id, comment, content)
def get_token(self, token_only=True, scopes=None):
if scopes is None:
scopes = ['send_notification', 'view_room']
cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes))
def gen_token():
data = {
'grant_type': 'client_credentials',
'scope': ' '.join(scopes),
}
resp = requests.post(
self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10
)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 401:
raise OauthClientInvalidError(self)
else:
raise Exception('Invalid token: %s' % resp.text)
if token_only:
token = cache.get(cache_key)
if not token:
data = gen_token()
token = data['access_token']
cache.set(cache_key, token, data['expires_in'] - 20)
return token
return gen_token()
def update_room_info(self, commit=True):
headers = {
'Authorization': 'Bearer %s' % self.get_token(),
'Content-Type': 'application/json'
}
room = requests.get(
urljoin(self.api_base_url, 'room/%s') % self.room_id, headers=headers, timeout=5
).json()
self.room_name = room['name']
self.room_owner_id = six.text_type(room['owner']['id'])
self.room_owner_name = room['owner']['name']
if commit:
self.save()
def __exit__(self, exc_type, exc_value, tb):
# If we get an invalid oauth client we better clean up the tenant
# and swallow the error.
if isinstance(exc_value, OauthClientInvalidError):
self.tenant.delete()
return True
def for_request(request, body=None):
"""Creates the context for a specific request."""
tenant, jwt_data = Tenant.objects.for_request(request, body)
webhook_sender_id = jwt_data.get('sub')
sender_data = None
if body and 'item' in body:
if 'sender' in body['item']:
sender_data = body['item']['sender']
elif 'message' in body['item'] and 'from' in body['item']['message']:
sender_data = body['item']['message']['from']
if sender_data is None:
if webhook_sender_id is None:
raise BadTenantError('Cannot identify sender in tenant')
sender_data = {'id': webhook_sender_id}
return Context(
tenant=tenant,
sender=HipchatUser(
id=sender_data.get('id'),
name=sender_data.get('name'),
mention_name=sender_data.get('mention_name'),
),
signed_request=request.GET.get('signed_request'),
context=jwt_data.get('context') or {},
)
def get_event(self, event_id):
try:
event = Event.objects.get(pk=int(event_id))
except (ValueError, Event.DoesNotExist):
return None
return self._ensure_and_bind_event(event)
def process_request(self, request, extra_context=None):
"""Main page processing logic"""
context = self.get_rendering_context(request)
if extra_context:
context.update(extra_context)
cache_key, seconds = self.get_cache_settings(request)
if cache_key:
content = cache.get(cache_key)
if content is None:
content = self.render(context)
cache.set(cache_key, content, seconds)
else:
content = self.render(context)
return self.create_response(request, content)