def test_binary_string(self):
# Binary strings should be cacheable
cache = self.cache
from zlib import compress, decompress
value = 'value_to_be_compressed'
compressed_value = compress(value.encode())
# Test set
cache.set('binary1', compressed_value)
compressed_result = cache.get('binary1')
self.assertEqual(compressed_value, compressed_result)
self.assertEqual(value, decompress(compressed_result).decode())
# Test add
cache.add('binary1-add', compressed_value)
compressed_result = cache.get('binary1-add')
self.assertEqual(compressed_value, compressed_result)
self.assertEqual(value, decompress(compressed_result).decode())
# Test set_many
cache.set_many({'binary1-set_many': compressed_value})
compressed_result = cache.get('binary1-set_many')
self.assertEqual(compressed_value, compressed_result)
self.assertEqual(value, decompress(compressed_result).decode())
python类set_many()的实例源码
def test_long_timeout(self):
"""
Followe memcached's convention where a timeout greater than 30 days is
treated as an absolute expiration timestamp instead of a relative
offset (#12399).
"""
cache = self.cache
cache.set('key1', 'eggs', 60 * 60 * 24 * 30 + 1) # 30 days + 1 second
self.assertEqual(cache.get('key1'), 'eggs')
cache.add('key2', 'ham', 60 * 60 * 24 * 30 + 1)
self.assertEqual(cache.get('key2'), 'ham')
cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, 60 * 60 * 24 * 30 + 1)
self.assertEqual(cache.get('key3'), 'sausage')
self.assertEqual(cache.get('key4'), 'lobster bisque')
def get_all_contest_participants_detail(contest: Contest, users=None, privilege=False):
cache_template = PARTICIPANT_RANK_DETAIL_PRIVATE if privilege else PARTICIPANT_RANK_DETAIL
timeout = 60 if privilege else FORTNIGHT
contest_users = users if users else contest.participants_ids
cache_names = list(map(lambda x: cache_template.format(contest=contest.pk, user=x), contest_users))
cache_res = cache.get_many(cache_names)
ans = dict()
second_attempt = []
for user in contest_users:
cache_name = cache_template.format(contest=contest.pk, user=user)
if cache_name not in cache_res.keys():
second_attempt.append(user)
else:
ans[user] = cache_res[cache_name]
if second_attempt:
ans2 = recalculate_for_participants(contest, second_attempt, privilege)
cache.set_many({cache_template.format(contest=contest.pk, user=user_id): val for user_id, val in ans2.items()},
timeout * uniform(0.8, 1))
ans.update(ans2)
return ans
def test_unicode(self):
# Unicode values can be cached
cache = self.cache
stuff = {
'ascii': 'ascii_value',
'unicode_ascii': 'Iñtërnâtiônàlizætiøn1',
'Iñtërnâtiônàlizætiøn': 'Iñtërnâtiônàlizætiøn2',
'ascii2': {'x': 1}
}
# Test `set`
for (key, value) in stuff.items():
cache.set(key, value)
self.assertEqual(cache.get(key), value)
# Test `add`
for (key, value) in stuff.items():
cache.delete(key)
cache.add(key, value)
self.assertEqual(cache.get(key), value)
# Test `set_many`
for (key, value) in stuff.items():
cache.delete(key)
cache.set_many(stuff)
for (key, value) in stuff.items():
self.assertEqual(cache.get(key), value)
def test_set_many(self):
# Multiple keys can be set using set_many
cache = self.cache
cache.set_many({"key1": "spam", "key2": "eggs"})
self.assertEqual(cache.get("key1"), "spam")
self.assertEqual(cache.get("key2"), "eggs")
def test_set_many_expiration(self):
# set_many takes a second ``timeout`` parameter
cache = self.cache
cache.set_many({"key1": "spam", "key2": "eggs"}, 1)
time.sleep(2)
self.assertIsNone(cache.get("key1"))
self.assertIsNone(cache.get("key2"))
def test_zero_timeout(self):
"""
Passing in zero into timeout results in a value that is not cached
"""
cache = self.cache
cache.set('key1', 'eggs', 0)
self.assertIsNone(cache.get('key1'))
cache.add('key2', 'ham', 0)
self.assertIsNone(cache.get('key2'))
cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, 0)
self.assertIsNone(cache.get('key3'))
self.assertIsNone(cache.get('key4'))
def global_template_vars(request):
"""
Adds variables to all templates.
It uses memcached to minimize hitting the db.
"""
def get_user():
if request.user.is_authenticated():
return ExtendedUser.objects.get( id = request.user.id )
else:
return None
vars_funcs = {
'SITE_NAME': lambda: Site.objects.get_current().name,
'SITE_DOMAIN': lambda: Site.objects.get_current().domain,
'USERS_NR': lambda: User.objects.count(),
'EVENTS_NR': lambda: Event.objects.count(),
'GROUPS_NR': lambda: Group.objects.count(), }
# protocol (computed below)
vars_dic = cache.get_many( vars_funcs.keys() )
if not vars_dic:
vars_dic = {}
# we get the values
for key, func in vars_funcs.items():
vars_dic[ key ] = func()
# we put the values in the cache
cache.set_many( vars_dic )
# we add protocol
if request.is_secure():
vars_dic['PROTOCOL'] = "https"
else:
vars_dic['PROTOCOL'] = "http"
vars_dic['VERSION'] = settings.VERSION
vars_dic['MEDIA_URL'] = settings.MEDIA_URL
# TODO: think on the trick to get the user out of a signed Django-1.4 cookie
vars_dic['USER'] = get_user()
vars_dic['READ_ONLY'] = settings.READ_ONLY
# return
return vars_dic
def test_set_get_many(self):
data = {
'key1': 'value1',
'key2': 'value2'
}
await cache.set_many(data)
results = await cache.get_many(data.keys())
self.assertDictEqual(results, data)
data['non_existing'] = None
results = await cache.get_many(data.keys())
self.assertDictEqual(results, data)
def get_cached_multiple_link_load(items, time_interval):
"""Cached version of get_multiple_link_load()"""
item_map = {k: _cache_key(k, time_interval) for k in iterkeys(items)}
# cache lookup
cached = cache.get_many(item_map.values())
_logger.debug(
"get_cached_multiple_link_load: got %d/%d values from cache (%r)",
len(cached), len(items), time_interval)
# retrieve data for cache misses
misses = {k: v
for k, v in iteritems(items) if item_map[k] not in cached}
if misses:
get_multiple_link_load(misses, time_interval)
# set data from cache
reverse_item_map = {v: k for k, v in iteritems(item_map)}
for cache_key, value in iteritems(cached):
key = reverse_item_map[cache_key]
properties = items[key]
properties['load_in'], properties['load_out'] = value
# add new data to cache
missed_data = {item_map[key]: (properties['load_in'],
properties['load_out'])
for key, properties in iteritems(misses)}
_logger.debug("get_cached_multiple_link_load: caching %d values",
len(missed_data))
cache.set_many(missed_data, CACHE_TIMEOUT)
def get_cached_multiple_cpu_load(items, time_interval):
"""Cached version of get_multiple_link_load()"""
item_map = {k: _cache_key(k, time_interval) for k in iterkeys(items)}
# cache lookup
cached = cache.get_many(item_map.values())
_logger.debug(
"get_cached_multiple_cpu_load: got %d/%d values from cache (%r)",
len(cached), len(items), time_interval)
# retrieve data for cache misses
misses = {k: v
for k, v in iteritems(items) if item_map[k] not in cached}
if misses:
get_multiple_cpu_load(misses, time_interval)
# set data from cache
reverse_item_map = {v: k for k, v in iteritems(item_map)}
for cache_key, value in iteritems(cached):
key = reverse_item_map[cache_key]
properties = items[key]
properties['load'] = value
# add new data to cache
missed_data = {item_map[key]: properties['load']
for key, properties in iteritems(misses)}
_logger.debug("get_cached_multiple_cpu_load: caching %d values",
len(missed_data))
cache.set_many(missed_data, CACHE_TIMEOUT)
def get_contest_rank_list(contest: Contest, privilege=False):
def _calculate():
def find_key(tup):
if contest.penalty_counts:
return tup[1]['score'], -tup[1]['penalty']
else:
return tup[1]['score']
items = sorted(get_all_contest_participants_detail(contest, privilege=privilege).items(),
key=find_key, reverse=True)
ans = [] # ans = [(user_id, rank), ...]
last_item = None
for idx, item in enumerate(items, start=1):
if last_item and find_key(item) == find_key(last_item):
ans.append((item[0], ans[-1][1]))
else:
ans.append((item[0], idx))
last_item = item
return ans
if not privilege:
# Try to use cache
cache_name = PARTICIPANT_RANK_LIST.format(contest=contest.pk)
t = cache.get(cache_name)
if t is None:
t = _calculate()
cache.set(cache_name, t, FORTNIGHT * uniform(0.6, 1))
d = {PARTICIPANT_RANK.format(contest=contest.pk, user=user): rank for user, rank in t}
cache.set_many(d, FORTNIGHT * uniform(0.6, 1))
return t
else:
return _calculate()
def test_cache_versioning_get_set_many(self):
cache = self.cache
cache2 = LRUObjectCache('lru2', dict(VERSION=2))
cache2._cache = cache._cache
# set, using default version = 1
cache.set_many({'ford1': 37, 'arthur1': 42})
self.assertEqual(cache.get_many(['ford1', 'arthur1']), {'ford1': 37, 'arthur1': 42})
self.assertEqual(cache.get_many(['ford1', 'arthur1'], version=1), {'ford1': 37, 'arthur1': 42})
self.assertEqual(cache.get_many(['ford1', 'arthur1'], version=2), {})
self.assertEqual(cache2.get_many(['ford1', 'arthur1']), {})
self.assertEqual(cache2.get_many(['ford1', 'arthur1'], version=1), {'ford1': 37, 'arthur1': 42})
self.assertEqual(cache2.get_many(['ford1', 'arthur1'], version=2), {})
# set, default version = 1, but manually override version = 2
cache.set_many({'ford2': 37, 'arthur2': 42}, version=2)
self.assertEqual(cache.get_many(['ford2', 'arthur2']), {})
self.assertEqual(cache.get_many(['ford2', 'arthur2'], version=1), {})
self.assertEqual(cache.get_many(['ford2', 'arthur2'], version=2), {'ford2': 37, 'arthur2': 42})
self.assertEqual(cache2.get_many(['ford2', 'arthur2']), {'ford2': 37, 'arthur2': 42})
self.assertEqual(cache2.get_many(['ford2', 'arthur2'], version=1), {})
self.assertEqual(cache2.get_many(['ford2', 'arthur2'], version=2), {'ford2': 37, 'arthur2': 42})
# v2 set, using default version = 2
cache2.set_many({'ford3': 37, 'arthur3': 42})
self.assertEqual(cache.get_many(['ford3', 'arthur3']), {})
self.assertEqual(cache.get_many(['ford3', 'arthur3'], version=1), {})
self.assertEqual(cache.get_many(['ford3', 'arthur3'], version=2), {'ford3': 37, 'arthur3': 42})
self.assertEqual(cache2.get_many(['ford3', 'arthur3']), {'ford3': 37, 'arthur3': 42})
self.assertEqual(cache2.get_many(['ford3', 'arthur3'], version=1), {})
self.assertEqual(cache2.get_many(['ford3', 'arthur3'], version=2), {'ford3': 37, 'arthur3': 42})
# v2 set, default version = 2, but manually override version = 1
cache2.set_many({'ford4': 37, 'arthur4': 42}, version=1)
self.assertEqual(cache.get_many(['ford4', 'arthur4']), {'ford4': 37, 'arthur4': 42})
self.assertEqual(cache.get_many(['ford4', 'arthur4'], version=1), {'ford4': 37, 'arthur4': 42})
self.assertEqual(cache.get_many(['ford4', 'arthur4'], version=2), {})
self.assertEqual(cache2.get_many(['ford4', 'arthur4']), {})
self.assertEqual(cache2.get_many(['ford4', 'arthur4'], version=1), {'ford4': 37, 'arthur4': 42})
self.assertEqual(cache2.get_many(['ford4', 'arthur4'], version=2), {})