tests.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:django-channels-jsonrpc 作者: millerf 项目源码 文件源码
def test_message_is_not_thread_safe(self):

        class SpoofMessage:

            class Channel:
                def __init__(self):
                    self.name = None

            def __init__(self):
                self.channel = SpoofMessage.Channel()
                self.payload = None

        @MyJsonRpcWebsocketConsumerTest.rpc_method()
        def ping2(**kwargs):
            original_message = kwargs["original_message"]
            return original_message.payload

        @MyJsonRpcWebsocketConsumerTest.rpc_method()
        def ping3(**kwargs):
            original_message = kwargs["original_message"]
            return original_message.payload

        def thread_test():
            for _i in range(0, 10000):
                _message = SpoofMessage()
                _message.channel.name = "websocket.test"
                _message.payload = "test%s" % _i
                _res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process(
                    {"id": 1, "jsonrpc": "2.0", "method": "ping3", "params": []}, _message)
                self.assertEqual(_res['result'], "test%s" % _i)

        import threading
        threading._start_new_thread(thread_test, ())

        for i in range(0, 10000):
            _message = SpoofMessage()
            _message.channel.name = "websocket.test"
            _message.payload = "test%s" % i
            res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process(
                {"id": 1, "jsonrpc": "2.0", "method": "ping2", "params": []}, _message)
            self.assertEqual(res['result'], "test%s" % i)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号