def test_msgs_recv():
def _recv_multipart():
m = Msg(id=msgpack.packb(1234), mtype=Msg.PING, token='token1234', data=[]).to_list()
return m
ctx = zmq.Context()
s = ctx.socket(zmq.REQ)
s.recv_multipart = _recv_multipart
m = Msg().recv(s)
assert msgpack.unpackb(m[0]) == 1234
assert m[1] == 'token1234'
assert m[2] == 'ping'
assert m[3] == '[]'
评论列表
文章目录