def thread_loop(self, context, pipe):
poller = zmq.Poller()
ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url)
poller.register(pipe, zmq.POLLIN)
remote_socket = None
while True:
items = dict(poller.poll())
if pipe in items:
cmd = pipe.recv_string()
if cmd == 'Exit':
break
elif cmd == 'Bind':
new_url = pipe.recv_string()
if remote_socket:
poller.unregister(remote_socket)
remote_socket.close(linger=0)
try:
remote_socket = context.socket(zmq.REP)
remote_socket.bind(new_url)
except zmq.ZMQError as e:
remote_socket = None
pipe.send_string("Error", flags=zmq.SNDMORE)
pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e))
else:
pipe.send_string("Bind OK", flags=zmq.SNDMORE)
# `.last_endpoint` is already of type `bytes`
pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b""))
poller.register(remote_socket, zmq.POLLIN)
if remote_socket in items:
self.on_recv(remote_socket, ipc_pub)
self.thread_pipe = None
评论列表
文章目录