def test_queue_json(loop):
config = MergeDict(
key=str(uuid.uuid4()),
format='json',
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisQueue(config, context=context, loop=loop)
await q.init()
await q.put({'f': 3})
assert 1 == await q.length()
assert [{'f': 3}] == await q.list()
assert {'f': 3} == await q.get()
await q.put({'f': 3})
assert 1 == await q.length()
await q.clear()
assert not await q.length()
python类create_pool()的实例源码
def lookup(config, request):
global pool
global lock
if request.client is None:
return None
metadata = None
# standard check/lock/check pattern to ensure only one thread creates a connection pool
if pool is None:
with (yield from lock):
if pool is None:
pool = yield from aioredis.create_pool((config.redis_host, config.redis_port), minsize=2, maxsize=20)
# Call the eval script to lookup IP and retrieve instance data.
# Could probably optimize this by storing the script server-side
# during initial pool creation.
with (yield from pool) as redis:
pickle_data = yield from redis.eval(KEY_SCRIPT, args=[KEY_IP, str(request.client)])
if pickle_data is not None:
metadata = pickle.loads(pickle_data)
return metadata
def setup(app):
try:
storage = None
if hasattr(settings, 'SESSION_STORAGE'):
try:
mod = importlib.import_module(settings.SESSION_STORAGE)
storage = await getattr(mod, 'setup')(app)
except Exception as e:
web_logger.warn(
"failed to setup {} storage. Using simple cookie storage".format(settings.SESSION_STORAGE))
if not storage:
app['redis_pool'] = await create_pool(('localhost', 6379))
storage = RedisStorage(app['redis_pool'])
setup_session(app, storage)
except:
from cryptography import fernet
import base64
web_logger.warn("failed to connect to Redis server. Using simple cookie storage")
fernet_key = fernet.Fernet.generate_key()
secret_key = base64.urlsafe_b64decode(fernet_key)
setup_session(app, EncryptedCookieStorage(secret_key))
def startup(self):
self.app = web.Application()
self.app.config = self.config
self.redis_pool = await aioredis.create_pool(('localhost', 6379))
self.app.middlewares.extend([
session_middleware(RedisStorage(self.redis_pool)),
*middlewares
])
for route in routes:
self.app.router.add_route(*route[:3], name=route[3])
self.app.models = Models(self.config)
await self.app.models.startup()
self.app.websockets = []
async def on_shutdown(_app):
for ws in _app.websockets:
await ws.shutdown()
self.app.on_shutdown.append(on_shutdown)
_loop = asyncio.get_event_loop()
self.handler = self.app.make_handler(loop=_loop)
self.server = await _loop.create_server(self.handler, self.config['host'], self.config['port'])
def setup(self):
self.redis = await aioredis.create_pool(('localhost', 6379), db=1, minsize=1, maxsize=10, encoding="utf-8")
async with self.redis.get() as dbp:
modules = await dbp.lrange("BotModules", 0, -1)
self.btoken = await dbp.get("BotTester")
self.creator = await dbp.get("Creator")
self.cid = await dbp.get("DiscordCID")
for mod in modules:
globals()[mod] = importlib.import_module(mod)
for mod in modules:
self.references[mod] = inspect.getmembers(globals()[mod], inspect.isclass)[0][1](self)
for mod in self.references:
for command in self.references[mod].commands:
self.command_map[command[0].lower()] = command[1]
def setup(self):
self.redis = await aioredis.create_pool(('localhost', 6379), db=1, minsize=1, maxsize=10, encoding="utf-8")
async with self.redis.get() as dbp:
modules = await dbp.lrange("BotModules", 0, -1)
self.btoken = await dbp.get("BotToken")
self.creator = await dbp.get("Creator")
self.cid = await dbp.get("DiscordCID")
for mod in modules:
globals()[mod] = importlib.import_module(mod)
for mod in modules:
self.references[mod] = inspect.getmembers(globals()[mod], inspect.isclass)[0][1](self)
for mod in self.references:
for command in self.references[mod].commands:
self.command_map[command[0].lower()] = command[1]
self.loop.call_later(1500, self.update_stats)
def init(self):
redis_host = os.getenv('REDIS_HOST', 'localhost')
self['redis_pool'] = await aioredis.create_pool((redis_host, 6379), loop=self.loop)
def test_queue(loop):
config = MergeDict(key=str(uuid.uuid4()))
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisQueue(config, context=context, loop=loop)
await q.init()
await q.put(3)
assert 1 == await q.length()
assert [b'3'] == await q.list()
assert b'3' == await q.get()
await q.put(3)
assert 1 == await q.length()
await q.clear()
assert not await q.length()
def test_zqueue(loop, mocker):
config = MergeDict(
key=str(uuid.uuid4()),
format='str',
timeout=0,
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisZQueue(config, context=context, loop=loop)
await q.init()
await q.put('a', 4)
await q.put('c', 3)
await q.put('b', 2)
await q.put('a', 1)
assert 3 == await q.length()
assert ['a', 'b', 'c'] == await q.list()
assert 3 == await q.length()
assert 'a' == await q.get()
assert ['b', 'c'] == await q.list()
assert 2 == await q.length()
assert 'b' == await q.get()
assert ['c'] == await q.list()
assert 1 == await q.length()
assert 'c' == await q.get()
assert [] == await q.list()
assert not await q.length()
with pytest.raises(TypeError):
with mocker.patch('asyncio.sleep'):
await q.get()
def test_ts_zqueue(loop, mocker):
config = MergeDict(
key=str(uuid.uuid4()),
format='str',
timeout=10,
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = TimestampZQueue(config, context=context, loop=loop)
await q.init()
async def breaker(*args, **kwargs):
q._lock.release()
raise InterruptedError
with pytest.raises(InterruptedError):
with mock.patch('asyncio.sleep', breaker):
await q.get()
await q.put('c', time.time() + 4)
await q.put('a', 4)
assert 2 == await q.length()
assert ['a', 'c'] == await q.list()
assert 'a' == await q.get()
assert 1 == await q.length()
assert ['c'] == await q.list()
with pytest.raises(InterruptedError):
with mock.patch('asyncio.sleep', breaker):
await q.get()
def startup_redis(self):
config = self.config.redis.default
self['redis'] = await aioredis.create_pool(
(config.host, config.port),
db=config.db,
minsize=config.minsize,
maxsize=config.maxsize,
loop=self.loop)
config = self.config.redis.sessions
self['sessions'] = await aioredis.create_pool(
(config.host, config.port),
db=config.db,
minsize=config.minsize,
maxsize=config.maxsize,
loop=self.loop)
def connect_to_cache(host: str, port: int, password: str,
poolsize: int, db: int) -> aioredis.pool:
global cachepool
cachepool = await aioredis.create_pool((host, port), password=password,
db=db, maxsize=poolsize,
encoding="utf-8")
return cachepool
def connect(self):
"""
Get an connection for the self instance
"""
if self._pool is None:
async with self._lock:
if self._pool is None:
self._pool = await aioredis.create_pool(
(self.host, self.port), minsize=1, maxsize=100)
return await self._pool
def _create_pool_connection(self):
connection_kwargs = {
'db': self._db,
'maxsize': self.pool_size,
'loop': self.loop,
**{k.lower(): v for k, v in self._params.items()}
}
return await create_pool(
(self._host, self._port),
**connection_kwargs
)
def _create_single_connection(self):
connection_kwargs = {
'db': self._db,
'loop': self.loop,
**{k.lower(): v for k, v in self._params.items()}
}
return await create_pool(
(self._host, self._port),
**connection_kwargs
)
def _start(self):
self._shutdown_task_dispatch = False
try:
if not hasattr(self, 'aio_redis_connection_pool') or self.aio_redis_connection_pool.closed():
self.aio_redis_connection_pool = await aioredis.create_pool(**self._get_redis_config())
if not hasattr(self, '_disp_task') or self._disp_task.done():
self._disp_task = asyncio.ensure_future(self._task_dispatch_loop())
except:
log.exception("failed to start")
def aioredis_pool(event_loop):
return event_loop.run_until_complete(
aioredis.create_pool(("127.0.0.1", 6379), maxsize=1))
def _get_pool(self):
async with self._pool_lock:
if self._pool is None:
self._pool = await aioredis.create_pool(
(self.endpoint, self.port),
db=self.db,
password=self.password,
loop=self._loop,
encoding="utf-8",
minsize=self.pool_min_size,
maxsize=self.pool_max_size)
return self._pool
def connect_to_redis(loop):
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379')
redis_pool = await aioredis.create_pool(get_redis_info(redis_url), loop=loop)
return redis_pool
def on_startup(app):
app['db'] = await sa.create_engine(config.SA_URL)
app['redis'] = await create_redis(config.REDIS_URL)
def async_init(self):
"""
Handle here the asynchronous part of the init.
"""
self.pool = await aioredis.create_pool(
(self.host, self.port),
db=self.db_id,
minsize=self.min_pool_size,
maxsize=self.max_pool_size,
loop=asyncio.get_event_loop(),
)
def init(loop):
# Middlewares
middlewares = [
session_middleware(RedisStorage(await aioredis.create_pool((config.redis_ip, 6379)), cookie_name='w')),
error_middleware,
maintain_middleware,
]
# init server
app = web.Application(loop=loop,
middlewares=middlewares)
redis = await aioredis.create_redis((config.redis_ip, config.redis['port']), loop=loop)
app.redis = RedisFilter(redis)
# Register admin account
if await app.redis.get('User') is None:
await app.redis.set('SecretKey', os.urandom(16), many=False)
config.admin['password'] = await encrypt(app.redis, config.admin['password'])
config.admin['permission'] = 0x0f
await app.redis.set('User', config.admin, many=False)
# Init Profile
if await app.redis.get('Profile') is None:
await app.redis.set('Profile', {
'name': config.rss['author'],
'link_desc': '',
'text': ''
}, many=False)
# Security
setup_security(app,
SessionIdentityPolicy(),
RedisAuthorizationPolicy(redis))
await compass(app.router)
CONST.CATEGORY = await app.redis.get('Categories') or []
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(config.template_addr))
_handler = app.make_handler(
access_log=logger,
access_log_format=formatters
)
_srv = await loop.create_server(_handler, config.server['host'], config.server['port'])
print('Server started at http://%s:%s...' % (config.server['host'], config.server['port']))
return _srv, _handler, app