def test_django_cache(self):
try:
from django.conf import settings
settings.configure(CACHE_BACKEND = 'locmem://')
from django.core.cache import cache
except ImportError:
# no Django, so nothing to test
return
congress = Congress(API_KEY, cache)
self.assertEqual(congress.http.cache, cache)
self.assertEqual(congress.members.http.cache, cache)
self.assertEqual(congress.bills.http.cache, cache)
self.assertEqual(congress.votes.http.cache, cache)
try:
bills = congress.bills.introduced('house')
except Exception as e:
self.fail(e)
python类cache()的实例源码
def release(self):
"""
Release the lock.
"""
# If we went over the lock duration (timeout), we need to exit to avoid
# accidentally releasing a lock that was acquired by another process.
if not self.held:
logger.warning('Tried to release unheld lock: %r', self)
return False
try:
# XXX: There is a possible race condition here -- this could be
# actually past the timeout due to clock skew or the delete
# operation could reach the server after the timeout for a variety
# of reasons. The only real fix for this would be to use a check
# and delete operation, but that is backend dependent and not
# supported by the cache API.
self.cache.delete(self.lock_key)
except Exception as e:
logger.exception(e)
finally:
self.__acquired_at = None
return True
def handle(self, sender, signal, **kwargs):
# only add things to the activity stream if there's a user
if kwargs.get("user", None) is None:
return
target = self.getTargetObj(**kwargs)
# invalidate cache for target followers
users = followers(target)
for user in users:
update_activity_stream_for_user.delay(user.username)
update_activity_stream_for_user.delay(kwargs['user'].username, actor=True)
if signal == signals.create:
action.send(kwargs['user'], verb=self.createVerb, action_object=kwargs['instance'], target=target)
if signal == signals.modify:
action.send(kwargs['user'], verb=self.modifyVerb, action_object=kwargs['instance'], target=target)
def test_page_view_ok_with_cache(self):
page = Page.objects.create(
url='/test/',
template='<h1>Hello world!</h1>',
page_processor_config={'cache': 15} # cache for 15 seconds
)
cache_key = cachekeys.rendered_source_for_lang(page.pk, 'en')
self.assertNotIn(cache_key, cache)
response = self.client.get('/test/')
self.assertEqual(response.status_code, 200)
self.assertContains(response, '<h1>Hello world!</h1>')
self.assertIn(cache_key, cache)
# Not Found Processor:
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def apply(self, envelope):
for path in settings.TRANSACTIONAL.get('EXEC_QUEUE_POLICIES'):
if os.path.exists(path):
with open(path) as module:
ephemeral_context = {}
allowed_context = {'__builtins__': {
'settings': settings,
'cache': cache,
'logger': logger,
'print': print
}}
for mod in settings.TRANSACTIONAL.get('EXEC_QUEUE_POLICIES_CONTEXT_BUILTINS'):
allowed_context['__builtins__'][mod] = __import__(mod)
try:
exec(module.read(), allowed_context, ephemeral_context)
ephemeral_context.get('apply')(envelope)
except Exception as err:
logger.warning(
'[{}] Failed to execute "{}" ephemeral '
'policy: {}'.format(
envelope.headers.get(
settings.TRANSACTIONAL.get(
'X_MESSAGE_ID_HEADER',
'NO-MESSAGE-ID')),
path, err))
else:
logger.warning(
"[{}] Following ephemeral policy doesn't "
"exists: {}".format(
envelope.headers.get(settings.TRANSACTIONAL.get(
'X_MESSAGE_ID_HEADER', 'NO-MESSAGE-ID')),
path))
def key(cache, attr):
return 'cache.%s.%s' % (cache.__module__.split('.')[-1], attr)
def __init__(self, cache):
self.cache = cache
def __getattribute__(self, attr):
if attr == 'cache':
return BaseCache.__getattribute__(self, attr)
return wrap(getattr(self.cache, attr), key(self.cache, attr))
def patch():
cache.cache = XRayTracker(cache.cache)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def __init__(self):
super(SignatureValidator, self).__init__()
self.endpoint = SignatureOnlyEndpoint(self)
self.lti_consumer = None
self.cache = cache
# The OAuth signature uses the endpoint URL as part of the request to be
# hashed. By default, the oauthlib library rejects any URLs that do not
# use HTTPS. We turn this behavior off in order to allow edX to run without
# SSL in development mode. When the platform is deployed and running with
# SSL enabled, the URL passed to the signature verifier must start with
# 'https', otherwise the message signature would not match the one generated
# on the platform.
def validate_timestamp_and_nonce(self, client_key, timestamp, nonce, request):
"""
Verify that the request is not too old (according to the timestamp), and that the nonce value is unique.
Nonce value should not have been used already within the period of time in which the timestamp marks a
request as valid. This method signature is required by the oauthlib library.
:return: True if the OAuth nonce and timestamp are valid, False if they
are not.
"""
msg = "LTI request's {} is not valid."
log.debug('Timestamp validating is started.')
ts = int(timestamp)
ts_key = '{}_ts'.format(client_key)
cache_ts = self.cache.get(ts_key, ts)
if cache_ts > ts:
log.debug(msg.format('timestamp'))
return False
# NOTE(idegtiarov) cache data with timestamp and nonce lives for 10 seconds
self.cache.set(ts_key, ts, 10)
log.debug('Timestamp is valid.')
log.debug('Nonce validating is started.')
if self.cache.get(nonce):
log.debug(msg.format('nonce'))
return False
self.cache.set(nonce, 1, 10)
log.debug('Nonce is valid.')
return True
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)
def set_cached_choices(self,choices):
if not self.cache_config['enabled']:
return
c = caches(self.cache_config['cache'])
return c.set(self.cache_config['key']%self.field_path,choices)
def get_cached_choices(self):
if not self.cache_config['enabled']:
return None
c = caches(self.cache_config['cache'])
return c.get(self.cache_config['key']%self.field_path)