def _send_raw(self, serialized):
self.create_socket()
self._socket.send_string(serialized, zmq.NOBLOCK)
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
if poller.poll(self._timeout * 1000):
msg = self._socket.recv()
self.on_message(msg)
self.cleanup_socket()
else:
self._transport.log("Peer " + self._address + " timed out.")
self.cleanup_socket()
self._transport.remove_peer(self._address)
python类Poller()的实例源码
def zmq_request(self, msg_type, msg_content, timeout=__DEFAULT_REQUEST_TIMEOUT):
# new socket to talk to server
self.__socket = zmq.Context().socket(zmq.REQ)
self.__socket.connect("tcp://localhost:" + ZMQPort.RQ)
# init poller and register to socket that web can poll socket to check is it has messages
poller = zmq.Poller()
poller.register(self.__socket, zmq.POLLIN)
send_flatbuf_msg(self.__socket, msg_type, msg_content)
reqs = 0
while reqs * self.__POLL_INTERVAL <= timeout:
socks = dict(poller.poll(self.__POLL_INTERVAL))
if self.__socket in socks and socks[self.__socket] == zmq.POLLIN:
msg = self.__socket.recv()
msgObj = TransMsg.GetRootAsTransMsg(msg, 0)
return msgObj.Content()
reqs = reqs + 1
return False
def test_getsockopt_events(self):
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
poll_out = zmq.Poller()
poll_out.register(sock1, zmq.POLLOUT)
sock_map = poll_out.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock1.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
sock1.send(b'')
poll_in = zmq.Poller()
poll_in.register(sock2, zmq.POLLIN)
sock_map = poll_in.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock2.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def __init__(self, *args, **kwargs):
super(ZMQPlaybook, self).__init__(*args, **kwargs)
self._sockets = []
self._hooks = {}
self.context = kwargs.get("context", zmq.Context.instance())
self.socket = self.context.socket(zmq.SUB)
self.socket_dir = tempfile.mkdtemp()
self._env['DAUBER_SOCKET_URI'] = "ipc://{}/dauber.socket".format(self.socket_dir)
self.socket.connect(self._env['DAUBER_SOCKET_URI'])
self._env['DAUBER_CONTROL_SOCKET_URI'] = "ipc://{}/control.socket".format(self.socket_dir)
self.poller = zmq.Poller()
self._register_socket(self.socket, self.__class__._zmq_socket_handler)
self.add_callback_plugin_dir(
pr.resource_filename(__name__, 'ansible/callback_plugins'))
self.logger.setLevel(logging.WARNING)
self.verbosity = 4
def _wait_for_goahead(self):
control_socket = self.context.socket(zmq.REP)
control_socket.bind(os.environ['DAUBER_CONTROL_SOCKET_URI'])
poller = zmq.Poller()
poller.register(control_socket)
timeout = 500
t_last = time.time()
while (time.time() - t_last) < timeout:
ready = dict(poller.poll(10))
if ready.get(control_socket):
control_socket.recv()
control_socket.send(b'')
break
self.socket.send_multipart(['hello', b''])
t_last = time.time()
assert (time.time() - t_last) < timeout, \
"Timed out before recieving a signal to continue"
del control_socket
def monitored_queue(in_socket, out_socket, mon_socket,
in_prefix=b'in', out_prefix=b'out'):
swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
poller = zmq.Poller()
poller.register(in_socket, zmq.POLLIN)
poller.register(out_socket, zmq.POLLIN)
while True:
events = dict(poller.poll())
if in_socket in events:
_relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
if out_socket in events:
_relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_retry_poll(self):
x, y = self.create_bound_pair()
poller = zmq.Poller()
poller.register(x, zmq.POLLIN)
self.alarm()
def send():
time.sleep(2 * self.signal_delay)
y.send(b('ping'))
t = Thread(target=send)
t.start()
evts = dict(poller.poll(2 * self.timeout_ms))
t.join()
assert x in evts
assert self.timer_fired
x.recv()
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_wakeup(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s2, zmq.POLLIN)
tic = time.time()
r = gevent.spawn(lambda: poller.poll(10000))
s = gevent.spawn(lambda: s1.send(b'msg1'))
r.join()
toc = time.time()
self.assertTrue(toc-tic < 1)
def __init__(self):
super().__init__()
# this maps file descriptors to keys
self._fd_to_key = {}
# read-only mapping returned by get_map()
self._map = _SelectorMapping(self)
self._zmq_poller = _zmq.Poller()
def thread_loop(self, context, pipe):
poller = zmq.Poller()
ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url)
poller.register(pipe, zmq.POLLIN)
remote_socket = None
while True:
items = dict(poller.poll())
if pipe in items:
cmd = pipe.recv_string()
if cmd == 'Exit':
break
elif cmd == 'Bind':
new_url = pipe.recv_string()
if remote_socket:
poller.unregister(remote_socket)
remote_socket.close(linger=0)
try:
remote_socket = context.socket(zmq.REP)
remote_socket.bind(new_url)
except zmq.ZMQError as e:
remote_socket = None
pipe.send_string("Error", flags=zmq.SNDMORE)
pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e))
else:
pipe.send_string("Bind OK", flags=zmq.SNDMORE)
# `.last_endpoint` is already of type `bytes`
pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b""))
poller.register(remote_socket, zmq.POLLIN)
if remote_socket in items:
self.on_recv(remote_socket, ipc_pub)
self.thread_pipe = None
def __init__(self, host, port=7772):
QtCore.QObject.__init__(self)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect("tcp://{}:{}".format(host, port))
self.socket.setsockopt_string(zmq.SUBSCRIBE, "data")
self.socket.setsockopt_string(zmq.SUBSCRIBE, "done")
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.running = True
def open_connection(self, address, status_port, data_port):
self.statusBar().showMessage("Open session to {}:{}".format(address, status_port), 2000)
socket = self.context.socket(zmq.DEALER)
socket.identity = "Matplotlib_Qt_Client".encode()
socket.connect("tcp://{}:{}".format(address, status_port))
socket.send(b"WHATSUP")
poller = zmq.Poller()
poller.register(socket, zmq.POLLOUT)
time.sleep(0.1)
evts = dict(poller.poll(100))
if socket in evts:
try:
reply, desc = [e.decode() for e in socket.recv_multipart()]
desc = json.loads(desc)
self.statusBar().showMessage("Connection established. Pulling plot information.", 2000)
except:
self.statusBar().showMessage("Could not connect to server.", 2000)
return
else:
self.statusBar().showMessage("Server did not respond.", 2000)
socket.close()
self.construct_plots(desc)
# Actual data listener
if self.listener_thread:
self.Datalistener.running = False
self.listener_thread.quit()
self.listener_thread.wait()
self.listener_thread = QtCore.QThread()
self.Datalistener = DataListener(address, data_port)
self.Datalistener.moveToThread(self.listener_thread)
self.listener_thread.started.connect(self.Datalistener.loop)
self.Datalistener.message.connect(self.data_signal_received)
self.Datalistener.finished.connect(self.stop_listening)
QtCore.QTimer.singleShot(0, self.listener_thread.start)
def run(self):
""" Main loop of the thread. """
# create event socket
self.subscriber = EventSubscriber(self.zmq_context,
self.event_port,
self.logger)
self.configure()
# create poller and register event subscriber
poller = zmq.Poller()
poller.register(self.subscriber.socket, zmq.POLLIN)
# poll events every seconds
self.logger.info('entering main loop')
while not self.stop_event.is_set():
socks = dict(poller.poll(self._Poll_timeout))
# check if something happened on the socket
if self.subscriber.socket in socks and \
socks[self.subscriber.socket] == zmq.POLLIN:
self.logger.debug('got message on subscriber')
try:
message = self.subscriber.receive()
except Exception, e:
self.logger.error(
'failed to get data from subscriber: {}'.format(
e.message))
else:
if message[0] == EventHeaders.SUPVISORS:
self.on_supvisors_status(message[1])
elif message[0] == EventHeaders.ADDRESS:
self.on_address_status(message[1])
elif message[0] == EventHeaders.APPLICATION:
self.on_application_status(message[1])
elif message[0] == EventHeaders.PROCESS_EVENT:
self.on_process_event(message[1])
elif message[0] == EventHeaders.PROCESS_STATUS:
self.on_process_status(message[1])
self.logger.warn('exiting main loop')
self.subscriber.close()
def _receive_with_timeout(self, socket, timeout_s, use_multipart=False):
"""Check for socket activity and either return what's
received on the socket or time out if timeout_s expires
without anything on the socket.
This is implemented in loops of self.try_length_ms milliseconds
to allow Ctrl-C handling to take place.
"""
if timeout_s is config.FOREVER:
timeout_ms = config.FOREVER
else:
timeout_ms = int(1000 * timeout_s)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
ms_so_far = 0
try:
for interval_ms in self.intervals_ms(timeout_ms):
sockets = dict(poller.poll(interval_ms))
ms_so_far += interval_ms
if socket in sockets:
if use_multipart:
return socket.recv_multipart()
else:
return socket.recv()
else:
raise core.SocketTimedOutError(timeout_s)
except KeyboardInterrupt:
raise core.SocketInterruptedError(ms_so_far / 1000.0)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):
self.redis_url = redis_url
self.redis_server = redis.from_url(redis_url)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes
self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.node_name = socket.gethostname()
self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
max_tries=100)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
F.write(self.address)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
F.write(str(os.getpid()))
self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
self.logger.setLevel(loglevel)
self.msg_count_in = 0
self.rpc_results = [] # buffer of results that are ready to be returned to callers
self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones...
self.files_map = {} # shows on which workers a file is available on
self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity
self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages
self.is_running = True
self.last_heartbeat = 0
self.others = {} # A dict of other Controllers running on other DQE nodes
self.start_time = time.time()
def test_pair(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
# Poll result should contain both sockets
socks = dict(poller.poll())
# Now make sure that both are send ready.
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
# Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
s1.send(b'msg1')
s2.send(b'msg2')
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
# Make sure that both are in POLLOUT after recv.
s1.recv()
s2.recv()
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(socks[s2], zmq.POLLOUT)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()
def test_no_events(self):
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, 0)
self.assertTrue(s1 in poller)
self.assertFalse(s2 in poller)
poller.register(s1, 0)
self.assertFalse(s1 in poller)
def test_pubsub(self):
s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
s2.setsockopt(zmq.SUBSCRIBE, b'')
# Sleep to allow sockets to connect.
wait()
poller = self.Poller()
poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
poller.register(s2, zmq.POLLIN)
# Now make sure that both are send ready.
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
self.assertEqual(s2 in socks, 0)
# Make sure that s1 stays in POLLOUT after a send.
s1.send(b'msg1')
socks = dict(poller.poll())
self.assertEqual(socks[s1], zmq.POLLOUT)
# Make sure that s2 is POLLIN after waiting.
wait()
socks = dict(poller.poll())
self.assertEqual(socks[s2], zmq.POLLIN)
# Make sure that s2 goes into 0 after recv.
s2.recv()
socks = dict(poller.poll())
self.assertEqual(s2 in socks, 0)
poller.unregister(s1)
poller.unregister(s2)
# Wait for everything to finish.
wait()