def test_message_is_not_thread_safe(self):
class SpoofMessage:
class Channel:
def __init__(self):
self.name = None
def __init__(self):
self.channel = SpoofMessage.Channel()
self.payload = None
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def ping2(**kwargs):
original_message = kwargs["original_message"]
return original_message.payload
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def ping3(**kwargs):
original_message = kwargs["original_message"]
return original_message.payload
def thread_test():
for _i in range(0, 10000):
_message = SpoofMessage()
_message.channel.name = "websocket.test"
_message.payload = "test%s" % _i
_res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process(
{"id": 1, "jsonrpc": "2.0", "method": "ping3", "params": []}, _message)
self.assertEqual(_res['result'], "test%s" % _i)
import threading
threading._start_new_thread(thread_test, ())
for i in range(0, 10000):
_message = SpoofMessage()
_message.channel.name = "websocket.test"
_message.payload = "test%s" % i
res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process(
{"id": 1, "jsonrpc": "2.0", "method": "ping2", "params": []}, _message)
self.assertEqual(res['result'], "test%s" % i)
评论列表
文章目录