def test_forwarder(forwarder, tcp_sender, context):
"""Monitor should correctly send data"""
sender = tcp_sender
mon = context.socket(zmq.SUB)
mon.setsockopt_string(zmq.SUBSCRIBE, "")
mon.setsockopt(zmq.LINGER, 0)
mon.connect("tcp://localhost:6500")
recv = context.socket(zmq.SUB)
recv.setsockopt_string(zmq.SUBSCRIBE, "")
recv.setsockopt(zmq.LINGER, 0)
recv.connect("tcp://localhost:6002")
server_address = ('localhost', 6001)
sender.connect(server_address)
# waiting for warmup
time.sleep(1)
sender.sendall(b"test test")
data = mon.recv()
assert data is not None
sender.sendall(b"test test")
data = recv.recv()
assert data is not None
sender.close()
forwarder.terminate()
评论列表
文章目录