python类get_cache()的实例源码

memcached_status.py 文件源码 项目:django-heartbeat 作者: pbs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_cache(cache_name):
    if hasattr(caches, '__call__'):
        return caches(cache_name)
    return caches[cache_name]
dutils.py 文件源码 项目:YouPBX 作者: JoneXiong 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_cache(k):
        from django.core.cache import caches
        return caches[k]
cache.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, session_key=None):
        self._cache = get_cache(settings.SESSION_CACHE_ALIAS)
        super(SessionStore, self).__init__(session_key)
tests.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_default_cache(self):
        self.session.save()
        self.assertNotEqual(get_cache('default').get(self.session.cache_key), None)
tests.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_non_default_cache(self):
        # Re-initalize the session backend to make use of overridden settings.
        self.session = self.backend()

        self.session.save()
        self.assertEqual(get_cache('default').get(self.session.cache_key), None)
        self.assertNotEqual(get_cache('sessions').get(self.session.cache_key), None)
queues.py 文件源码 项目:DCRM 作者: 82Flex 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_redis_connection(config, use_strict_redis=False):
    """
    Returns a redis connection from a connection config
    """
    redis_cls = redis.StrictRedis if use_strict_redis else redis.Redis

    if 'URL' in config:
        return redis_cls.from_url(config['URL'], db=config.get('DB'))
    if 'USE_REDIS_CACHE' in config.keys():

        try:
            from django.core.cache import caches
            cache = caches[config['USE_REDIS_CACHE']]
        except ImportError:
            from django.core.cache import get_cache
            cache = get_cache(config['USE_REDIS_CACHE'])

        if hasattr(cache, 'client'):
            # We're using django-redis. The cache's `client` attribute
            # is a pluggable backend that return its Redis connection as
            # its `client`
            try:
                # To get Redis connection on django-redis >= 3.4.0
                # we need to use cache.client.get_client() instead of
                # cache.client.client used in older versions
                try:
                    return cache.client.get_client()
                except AttributeError:
                    return cache.client.client
            except NotImplementedError:
                pass
        else:
            # We're using django-redis-cache
            try:
                return cache._client
            except AttributeError:
                # For django-redis-cache > 0.13.1
                return cache.get_master_client()

    if 'UNIX_SOCKET_PATH' in config:
        return redis_cls(unix_socket_path=config['UNIX_SOCKET_PATH'], db=config['DB'])

    return redis_cls(host=config['HOST'], port=config['PORT'], db=config['DB'], password=config.get('PASSWORD', None))
creation.py 文件源码 项目:tissuelab 作者: VirtualPlants 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_test_db(self, verbosity=1, autoclobber=False):
        """
        Creates a test database, prompting the user for confirmation if the
        database already exists. Returns the name of the test database created.

        This method is overloaded to load up the SpatiaLite initialization
        SQL prior to calling the `syncdb` command.
        """
        # Don't import django.core.management if it isn't needed.
        from django.core.management import call_command

        test_database_name = self._get_test_db_name()

        if verbosity >= 1:
            test_db_repr = ''
            if verbosity >= 2:
                test_db_repr = " ('%s')" % test_database_name
            print("Creating test database for alias '%s'%s..." % (self.connection.alias, test_db_repr))

        self._create_test_db(verbosity, autoclobber)

        self.connection.close()
        self.connection.settings_dict["NAME"] = test_database_name

        # Need to load the SpatiaLite initialization SQL before running `syncdb`.
        self.load_spatialite_sql()

        # Report syncdb messages at one level lower than that requested.
        # This ensures we don't get flooded with messages during testing
        # (unless you really ask to be flooded)
        call_command('syncdb',
            verbosity=max(verbosity - 1, 0),
            interactive=False,
            database=self.connection.alias,
            load_initial_data=False)

        # We need to then do a flush to ensure that any data installed by
        # custom SQL has been removed. The only test data should come from
        # test fixtures, or autogenerated from post_syncdb triggers.
        # This has the side effect of loading initial data (which was
        # intentionally skipped in the syncdb).
        call_command('flush',
            verbosity=max(verbosity - 1, 0),
            interactive=False,
            database=self.connection.alias)

        from django.core.cache import get_cache
        from django.core.cache.backends.db import BaseDatabaseCache
        for cache_alias in settings.CACHES:
            cache = get_cache(cache_alias)
            if isinstance(cache, BaseDatabaseCache):
                call_command('createcachetable', cache._table, database=self.connection.alias)

        # Get a cursor (even though we don't need one yet). This has
        # the side effect of initializing the test database.
        cursor = self.connection.cursor()

        return test_database_name


问题


面经


文章

微信
公众号

扫码关注公众号