如何在一个类中实现asyncio websockets?

发布于 2021-01-29 17:20:31

我想通过asyncio和连接到websocket websockets,格式如下所示。我将如何做到这一点?

from websockets import connect


class EchoWebsocket:

    def __init__(self):
        self.websocket = self._connect()

    def _connect(self):
        return connect("wss://echo.websocket.org")

    def send(self, message):
        self.websocket.send(message)

    def receive(self):
        return self.websocket.recv()

echo = EchoWebsocket()
echo.send("Hello!")
print(echo.receive())  # "Hello!"
关注者
0
被浏览
156
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    如何编写异步程序?

    1. 您应该使用定义异步功能 async
    2. 您应该使用调用异步函数 await
    3. 您需要事件循环才能启动异步程序

    所有其他内容与常规Python程序几乎相同。

    import asyncio
    from websockets import connect
    
    
    class EchoWebsocket:
        async def __aenter__(self):
            self._conn = connect("wss://echo.websocket.org")
            self.websocket = await self._conn.__aenter__()        
            return self
    
        async def __aexit__(self, *args, **kwargs):
            await self._conn.__aexit__(*args, **kwargs)
    
        async def send(self, message):
            await self.websocket.send(message)
    
        async def receive(self):
            return await self.websocket.recv()
    
    
    async def main():
        async with EchoWebsocket() as echo:
            await echo.send("Hello!")
            print(await echo.receive())  # "Hello!"
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    输出:

    Hello!
    

    如您所见,代码与您编写的几乎相同。

    唯一的区别是websockets.connect设计成异步上下文管理器(它使用__aenter____aexit__)。有必要释放连接,这也将帮助您在类初始化期间进行异步操作(因为我们没有的异步版本__init__)。

    我建议您以相同的方式组织课程。但是,如果由于某些原因您确实不想使用上下文管理器,则可以使用新__await__方法进行异步初始化,并使用其他一些异步函数来释放连接:

    import sys
    import asyncio
    from websockets import connect
    
    
    class EchoWebsocket:
        def __await__(self):
            # see: http://stackoverflow.com/a/33420721/1113207
            return self._async_init().__await__()
    
        async def _async_init(self):
            self._conn = connect("wss://echo.websocket.org")
            self.websocket = await self._conn.__aenter__()
            return self
    
        async def close(self):
            await self._conn.__aexit__(*sys.exc_info())
    
        async def send(self, message):
            await self.websocket.send(message)
    
        async def receive(self):
            return await self.websocket.recv()
    
    
    async def main():
        echo = await EchoWebsocket()
        try:
            await echo.send("Hello!")
            print(await echo.receive())  # "Hello!"
        finally:
            await echo.close()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    websockets您可以在docs中找到许多使用示例。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看