def __init__(self, callback, host=None, res_port=None, use_security=False):
if host is None:
host = env.get_master_host()
context = zmq.Context()
self._socket = context.socket(zmq.REP)
self._auth = None
if use_security:
self._auth = Authenticator.instance(
env.get_server_public_key_dir())
self._auth.set_server_key(
self._socket, env.get_server_secret_key_path())
if res_port is None:
res_port = env.get_res_port()
self._socket.connect(
'tcp://{host}:{port}'.format(host=host, port=res_port))
self._callback = callback
self._thread = None
self._lock = threading.Lock()
python类REP的实例源码
def __init__(self, name, priority, actor_context, endpoints,
operation_function, operation_queue):
"""
Create a server
Keyword arguments:
name - Name of the timer
priority - Priority of the subscriber
actor_context - ZMQ context of the actor process
endpoints - A list of endpoint strings
operation_function - Operation function of the subscriber
operation_queue - The operation queue object
"""
self.name = name
self.priority = priority
self.endpoints = endpoints
self.operation_function = operation_function
self.operation_queue = operation_queue
self.context = actor_context
self.server_socket = self.context.socket(zmq.REP)
for endpoint in self.endpoints:
self.server_socket.bind(endpoint)
self.ready = True
self.func_mutex = Lock()
def run():
print("Getting ready for hello world client. Ctrl-C to exit.\n")
socket = Ctx.socket(zmq.REP)
socket.bind(Url)
while True:
# Wait for next request from client
message = await socket.recv()
print("Received request: {}".format(message))
# Do some "work"
await asyncio.sleep(1)
# Send reply back to client
message = message.decode('utf-8')
message = '{}, world'.format(message)
message = message.encode('utf-8')
print("Sending reply: {}".format(message))
await socket.send(message)
def start(self):
"""Create and bind the ZAP socket"""
self.zap_socket = self.context.socket(zmq.REP)
self.zap_socket.linger = 1
self.zap_socket.bind("inproc://zeromq.zap.01")
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)
def test_close_after_destroy(self):
"""s.close() after ctx.destroy() should be fine"""
ctx = self.Context()
s = ctx.socket(zmq.REP)
ctx.destroy()
# reaper is not instantaneous
time.sleep(1e-2)
s.close()
self.assertTrue(s.closed)
def test_many_sockets(self):
"""opening and closing many sockets shouldn't cause problems"""
ctx = self.Context()
for i in range(16):
sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
[ s.close() for s in sockets ]
# give the reaper a chance
time.sleep(1e-2)
ctx.term()
def test_destroy(self):
"""Context.destroy should close sockets"""
ctx = self.Context()
sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
# close half of the sockets
[ s.close() for s in sockets[::2] ]
ctx.destroy()
# reaper is not instantaneous
time.sleep(1e-2)
for s in sockets:
self.assertTrue(s.closed)
def test_term_thread(self):
"""ctx.term should not crash active threads (#139)"""
ctx = self.Context()
evt = Event()
evt.clear()
def block():
s = ctx.socket(zmq.REP)
s.bind_to_random_port('tcp://127.0.0.1')
evt.set()
try:
s.recv()
except zmq.ZMQError as e:
self.assertEqual(e.errno, zmq.ETERM)
return
finally:
s.close()
self.fail("recv should have been interrupted with ETERM")
t = Thread(target=block)
t.start()
evt.wait(1)
self.assertTrue(evt.is_set(), "sync event never fired")
time.sleep(0.01)
ctx.term()
t.join(timeout=1)
self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
def test_basic(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
msg1 = b'message 1'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_multiple(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
for i in range(10):
msg1 = i*b' '
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_bad_send_recv(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
if zmq.zmq_version() != '2.1.8':
# this doesn't work on 2.1.8
for copy in (True,False):
self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
# I have to have this or we die on an Abort trap.
msg1 = b'asdf'
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def test_json(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
o = dict(a=10,b=list(range(10)))
o2 = self.ping_pong_json(s1, s2, o)
def test_large_msg(self):
s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
msg1 = 10000*b'X'
for i in range(10):
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEqual(msg1, msg2)
def setUp(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.loop = ioloop.IOLoop.instance()
self.stream = zmqstream.ZMQStream(self.socket)
def test_again(self):
s = self.context.socket(zmq.REP)
self.assertRaises(Again, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
s.close()
def atest_ctxterm(self):
s = self.context.socket(zmq.REP)
t = Thread(target=self.context.term)
t.start()
self.assertRaises(ContextTerminated, s.recv, zmq.NOBLOCK)
self.assertRaisesErrno(zmq.TERM, s.recv, zmq.NOBLOCK)
s.close()
t.join()
def zap_handler(self):
socket = self.context.socket(zmq.REP)
socket.bind("inproc://zeromq.zap.01")
try:
msg = self.recv_multipart(socket)
version, sequence, domain, address, identity, mechanism = msg[:6]
if mechanism == b'PLAIN':
username, password = msg[6:]
elif mechanism == b'CURVE':
key = msg[6]
self.assertEqual(version, b"1.0")
self.assertEqual(identity, b"IDENT")
reply = [version, sequence]
if mechanism == b'CURVE' or \
(mechanism == b'PLAIN' and username == USER and password == PASS) or \
(mechanism == b'NULL'):
reply.extend([
b"200",
b"OK",
b"anonymous",
b"\5Hello\0\0\0\5World",
])
else:
reply.extend([
b"400",
b"Invalid username or password",
b"",
b"",
])
socket.send_multipart(reply)
finally:
socket.close()
def test_monitor(self):
"""Test monitoring interface for sockets."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6666")
# try monitoring the REP socket
s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
# create listening socket for monitor
s_event = self.context.socket(zmq.PAIR)
self.sockets.append(s_event)
s_event.connect("inproc://monitor.rep")
s_event.linger = 0
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6666")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
# test monitor can be disabled.
s_rep.disable_monitor()
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
def test_single_socket_forwarder_bind(self):
if zmq.zmq_version() in ('4.1.1', '4.0.6'):
raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
# select random port:
binder = self.context.socket(zmq.REQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
# select random port:
binder = self.context.socket(zmq.REQ)
port = binder.bind_to_random_port('tcp://127.0.0.1')
binder.close()
time.sleep(0.1)
req = self.context.socket(zmq.REQ)
req.connect('tcp://127.0.0.1:%i'%port)
dev.bind_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()