util.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:django-redis-pubsub 作者: andrewyoung1991 项目源码 文件源码
def websocket_pubsub(route, authenticate=False):
    """ a wrapper method for transforming a coroutine into a websocket handler with
    a pubsub manager. if `authenticate=False` the signature of your coroutine should be
    `func(ws: WebSocketResponse, params: MultiDict, manager: SubscriptionManager)`
    otherwise an additional keywork argument is available, that being the authenticated
    user making the request.
    """
    def inner(func):
        func = asyncio.coroutine(func)

        @ft.wraps(func)
        @asyncio.coroutine
        def wrapper(request):
            params = request.GET
            kwargs = {}

            token = params.get("token", None)
            if authenticate:
                kwargs["user"] = handle_auth(params.get("token", None))

            redis_ = yield from get_async_redis()
            manager = SubscriptionManager(redis_)

            kwargs["manager"] = manager
            ws = WebSocketResponse()
            try:
                yield from ws.prepare(request)
                yield from func(ws, params, **kwargs)
            except Exception as err:  # pragma: no cover
                logger.error(str(err))
            finally:
                yield from manager.stop()

            return ws

        # cleanup the route
        route_ = _clean_route(route)
        wrapper.route = ("GET", route_, wrapper)
        return wrapper
    return inner
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号