def redis(app, config, args, kwargs):
kwargs.update(dict(
host=config.get('CACHE_REDIS_HOST', 'localhost'),
port=config.get('CACHE_REDIS_PORT', 6379),
))
password = config.get('CACHE_REDIS_PASSWORD')
if password:
kwargs['password'] = password
key_prefix = config.get('CACHE_KEY_PREFIX')
if key_prefix:
kwargs['key_prefix'] = key_prefix
db_number = config.get('CACHE_REDIS_DB')
if db_number:
kwargs['db'] = db_number
redis_url = config.get('CACHE_REDIS_URL')
if redis_url:
kwargs['host'] = redis_from_url(
redis_url,
db=kwargs.pop('db', None),
)
return RedisCache(*args, **kwargs)
python类RedisCache()的实例源码
def redis(app, config, args, kwargs):
kwargs.update(dict(
host=config.get('CACHE_REDIS_HOST', 'localhost'),
port=config.get('CACHE_REDIS_PORT', 6379),
))
password = config.get('CACHE_REDIS_PASSWORD')
if password:
kwargs['password'] = password
key_prefix = config.get('CACHE_KEY_PREFIX')
if key_prefix:
kwargs['key_prefix'] = key_prefix
db_number = config.get('CACHE_REDIS_DB')
if db_number:
kwargs['db'] = db_number
redis_url = config.get('CACHE_REDIS_URL')
if redis_url:
kwargs['host'] = redis_from_url(
redis_url,
db=kwargs.pop('db', None),
)
return RedisCache(*args, **kwargs)
def make_cache(self):
return cache.RedisCache(key_prefix='werkzeug-test-case:')
def filesystem(app, config, args, kwargs):
args.insert(0, config['CACHE_DIR'])
kwargs.update(dict(threshold=config['CACHE_THRESHOLD']))
return FileSystemCache(*args, **kwargs)
# RedisCache is supported since Werkzeug 0.7.
def filesystem(app, config, args, kwargs):
args.insert(0, config['CACHE_DIR'])
kwargs.update(dict(threshold=config['CACHE_THRESHOLD']))
return FileSystemCache(*args, **kwargs)
# RedisCache is supported since Werkzeug 0.7.
def test_redis(self):
"""
Test the redis
"""
# Write config
self.write_config([
"from edmunds.cache.drivers.redis import Redis \n",
"APP = { \n",
" 'cache': { \n",
" 'enabled': True, \n",
" 'instances': [ \n",
" { \n",
" 'name': 'redis',\n",
" 'driver': Redis,\n",
" 'host': 'localhost',\n",
" 'port': 6379,\n",
" 'password': None,\n",
" 'db': 0,\n",
" 'default_timeout': 300,\n",
" 'key_prefix': None,\n",
" }, \n",
" ], \n",
" }, \n",
"} \n",
])
# Create app
app = self.create_application()
driver = app.cache()
self.assert_is_instance(driver, Redis)
self.assert_is_instance(driver, RedisCache)
def get_redis():
if not hasattr(current_app, 'redis'):
host, port = current_app.config["REDIS_CONN"]
cache = RedisCache(host, port, default_timeout=300)
current_app.redis = cache
return current_app.redis