def __init__(self, *arg, **kw):
super(PyLibMCNamespaceManager, self).__init__(*arg, **kw)
memcache_module = kw.get('memcache_module', 'auto')
_memcache_module = _client_libs[memcache_module]
protocol = kw.get('protocol', 'text')
username = kw.get('username', None)
password = kw.get('password', None)
url = kw.get('url')
behaviors = parse_memcached_behaviors(kw)
self.mc = MemcachedNamespaceManager.clients.get(
(memcache_module, url),
_memcache_module.Client,
servers=url.split(';'), behaviors=behaviors,
binary=(protocol == 'binary'), username=username,
password=password)
self.pool = PyLibMCNamespaceManager.pools.get(
(memcache_module, url),
pylibmc.ThreadMappedPool, self.mc)
python类ThreadMappedPool()的实例源码
def __init__(self, *arg, **kw):
super(PyLibMCNamespaceManager, self).__init__(*arg, **kw)
memcache_module = kw.get('memcache_module', 'auto')
_memcache_module = _client_libs[memcache_module]
protocol = kw.get('protocol', 'text')
username = kw.get('username', None)
password = kw.get('password', None)
url = kw.get('url')
behaviors = parse_memcached_behaviors(kw)
self.mc = MemcachedNamespaceManager.clients.get(
(memcache_module, url),
_memcache_module.Client,
servers=url.split(';'), behaviors=behaviors,
binary=(protocol == 'binary'), username=username,
password=password)
self.pool = PyLibMCNamespaceManager.pools.get(
(memcache_module, url),
pylibmc.ThreadMappedPool, self.mc)
def __init__(self, client, adapter, *, namespace="anom"):
self.client_pool = pylibmc.ThreadMappedPool(client)
self.adapter = adapter
self.namespace = namespace
self.query = self.adapter.query