def get_cache(cache_name):
if hasattr(caches, '__call__'):
return caches(cache_name)
return caches[cache_name]
python类get_cache()的实例源码
def get_cache(k):
from django.core.cache import caches
return caches[k]
def __init__(self, session_key=None):
self._cache = get_cache(settings.SESSION_CACHE_ALIAS)
super(SessionStore, self).__init__(session_key)
def test_default_cache(self):
self.session.save()
self.assertNotEqual(get_cache('default').get(self.session.cache_key), None)
def test_non_default_cache(self):
# Re-initalize the session backend to make use of overridden settings.
self.session = self.backend()
self.session.save()
self.assertEqual(get_cache('default').get(self.session.cache_key), None)
self.assertNotEqual(get_cache('sessions').get(self.session.cache_key), None)
def get_redis_connection(config, use_strict_redis=False):
"""
Returns a redis connection from a connection config
"""
redis_cls = redis.StrictRedis if use_strict_redis else redis.Redis
if 'URL' in config:
return redis_cls.from_url(config['URL'], db=config.get('DB'))
if 'USE_REDIS_CACHE' in config.keys():
try:
from django.core.cache import caches
cache = caches[config['USE_REDIS_CACHE']]
except ImportError:
from django.core.cache import get_cache
cache = get_cache(config['USE_REDIS_CACHE'])
if hasattr(cache, 'client'):
# We're using django-redis. The cache's `client` attribute
# is a pluggable backend that return its Redis connection as
# its `client`
try:
# To get Redis connection on django-redis >= 3.4.0
# we need to use cache.client.get_client() instead of
# cache.client.client used in older versions
try:
return cache.client.get_client()
except AttributeError:
return cache.client.client
except NotImplementedError:
pass
else:
# We're using django-redis-cache
try:
return cache._client
except AttributeError:
# For django-redis-cache > 0.13.1
return cache.get_master_client()
if 'UNIX_SOCKET_PATH' in config:
return redis_cls(unix_socket_path=config['UNIX_SOCKET_PATH'], db=config['DB'])
return redis_cls(host=config['HOST'], port=config['PORT'], db=config['DB'], password=config.get('PASSWORD', None))
def create_test_db(self, verbosity=1, autoclobber=False):
"""
Creates a test database, prompting the user for confirmation if the
database already exists. Returns the name of the test database created.
This method is overloaded to load up the SpatiaLite initialization
SQL prior to calling the `syncdb` command.
"""
# Don't import django.core.management if it isn't needed.
from django.core.management import call_command
test_database_name = self._get_test_db_name()
if verbosity >= 1:
test_db_repr = ''
if verbosity >= 2:
test_db_repr = " ('%s')" % test_database_name
print("Creating test database for alias '%s'%s..." % (self.connection.alias, test_db_repr))
self._create_test_db(verbosity, autoclobber)
self.connection.close()
self.connection.settings_dict["NAME"] = test_database_name
# Need to load the SpatiaLite initialization SQL before running `syncdb`.
self.load_spatialite_sql()
# Report syncdb messages at one level lower than that requested.
# This ensures we don't get flooded with messages during testing
# (unless you really ask to be flooded)
call_command('syncdb',
verbosity=max(verbosity - 1, 0),
interactive=False,
database=self.connection.alias,
load_initial_data=False)
# We need to then do a flush to ensure that any data installed by
# custom SQL has been removed. The only test data should come from
# test fixtures, or autogenerated from post_syncdb triggers.
# This has the side effect of loading initial data (which was
# intentionally skipped in the syncdb).
call_command('flush',
verbosity=max(verbosity - 1, 0),
interactive=False,
database=self.connection.alias)
from django.core.cache import get_cache
from django.core.cache.backends.db import BaseDatabaseCache
for cache_alias in settings.CACHES:
cache = get_cache(cache_alias)
if isinstance(cache, BaseDatabaseCache):
call_command('createcachetable', cache._table, database=self.connection.alias)
# Get a cursor (even though we don't need one yet). This has
# the side effect of initializing the test database.
cursor = self.connection.cursor()
return test_database_name