redis_connection.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:python-zentropi 作者: zentropi 项目源码 文件源码
def connect(self, endpoint: str, auth: Optional[str] = None) -> None:  # type: ignore
        endpoint = validate_endpoint(endpoint)
        auth = validate_auth(auth)
        self._auth = auth
        # print('*** redis connecting to ', endpoint, flush=True)
        if self._connected:
            raise ConnectionError('Already connected.')
        if not endpoint.startswith('redis://'):
            raise ValueError('Expected endpoint to begin with "redis://".'
                             'Got: {!r}'.format(endpoint))
        host, port = endpoint.replace('redis://', '').split(':')  # todo: handle exception
        self._subscriber = await aioredis.create_redis((host, port))
        self._publisher = await aioredis.create_redis((host, port))
        if auth:
            await self._subscriber.auth(auth)
            await self._publisher.auth(auth)
        else:
            print('*** WARNING: Redis connection has no password.')
        self._connected = True
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号