def main():
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
python类PAIR的实例源码
def __init__(self, cmd, port, address=None):
try:
import zmq
except ImportError:
raise ImportError("Must have zeromq for remote learner.")
if address is None:
address = '*'
if port is None:
port = 5556
elif int(port) < 1 or int(port) > 65535:
raise ValueError("Invalid port number: %s" % port)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind("tcp://%s:%s" % (address, port))
# launch learner
if cmd is not None:
subprocess.Popen((cmd + ' ' + str(port)).split())
handshake_in = self.socket.recv().decode('utf-8')
assert handshake_in == 'hello' # handshake
# send to learner, and get response;
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_tcp_pair_socket(event_loop, socket_factory, connect_or_bind):
pair_socket = socket_factory.create(zmq.PAIR)
connect_or_bind(pair_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pair_socket.poll(1000) == zmq.POLLIN
message = pair_socket.recv_multipart()
assert message == [b'hello', b'world']
pair_socket.send_multipart([b'my', b'message'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PAIR)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
message = await asyncio.wait_for(socket.recv_multipart(), 1)
assert message == [b'my', b'message']
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)
def test_send_unicode(self):
"test sending unicode objects"
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
self.sockets.extend([a,b])
u = "ç?§"
if str is not unicode:
u = u.decode('utf8')
self.assertRaises(TypeError, a.send, u,copy=False)
self.assertRaises(TypeError, a.send, u,copy=True)
a.send_unicode(u)
s = b.recv()
self.assertEqual(s,u.encode('utf8'))
self.assertEqual(s.decode('utf8'),u)
a.send_unicode(u,encoding='utf16')
s = b.recv_unicode(encoding='utf16')
self.assertEqual(s,u)
def test_multisend(self):
"""ensure that a message remains intact after multiple sends"""
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
s = b"message"
m = zmq.Frame(s)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=False)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
a.send(m, copy=True)
time.sleep(0.1)
self.assertEqual(s, m.bytes)
for i in range(4):
r = b.recv()
self.assertEqual(s,r)
self.assertEqual(s, m.bytes)
def test_noncopying_recv(self):
"""check for clobbering message buffers"""
null = b'\0'*64
sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(32):
# try a few times
sb.send(null, copy=False)
m = sa.recv(copy=False)
mb = m.bytes
# buf = view(m)
buf = m.buffer
del m
for i in range(5):
ff=b'\xff'*(40 + i*10)
sb.send(ff, copy=False)
m2 = sa.recv(copy=False)
if view.__name__ == 'buffer':
b = bytes(buf)
else:
b = buf.tobytes()
self.assertEqual(b, null)
self.assertEqual(mb, null)
self.assertEqual(m2.bytes, ff)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
for i in range(10):
msg = i*x
s1.send(msg)
for i in range(10):
msg = i*x
s2.send(msg)
for i in range(10):
msg = s1.recv()
self.assertEqual(msg, i*x)
for i in range(10):
msg = s2.recv()
self.assertEqual(msg, i*x)