websocket.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:uvicorn 作者: encode 项目源码 文件源码
def websocket_upgrade(http):
    request_headers = dict(http.headers)
    response_headers = []

    def get_header(key):
        key = key.lower().encode('utf-8')
        return request_headers.get(key, b'').decode('utf-8')

    def set_header(key, val):
        response_headers.append((key.encode('utf-8'), val.encode('utf-8')))

    try:
        key = websockets.handshake.check_request(get_header)
        websockets.handshake.build_response(set_header, key)
    except websockets.InvalidHandshake:
        http.loop.create_task(http.channels['reply'].send({
            'status': 400,
            'headers': [[b'content-type', b'text/plain']],
            'content': b'Invalid WebSocket handshake'
        }))
        return

    protocol = WebSocketProtocol(http, response_headers)
    protocol.connection_made(http.transport, http.message)
    http.transport.set_protocol(protocol)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号