def receive_message(self, event, event_data, listener_data):
"""
Receives a messages from another processes.
:param * event: Not used.
:param * event_data: Not used.
:param * listener_data: Not used.
"""
del event, event_data, listener_data
# Make a poller for all incoming sockets.
poller = zmq.Poller()
for socket in self.__end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
# Wait for socket is ready for reading.
socks = dict(poller.poll())
for name, socket in self.__end_points.items():
if socket in socks:
self._receive_message(name, socket)
# ------------------------------------------------------------------------------------------------------------------
python类POLLIN的实例源码
def no_barking(self, seconds):
"""
During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
the socket. This method prevent incoming sockets barking that the are ready the for reading.
:param int seconds: The number of seconds the give the other ZMQ thread to start up.
"""
sleep(seconds)
for _ in range(1, len(self.end_points)):
poller = zmq.Poller()
for socket in self.end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
poller.poll(1)
# ----------------------------------------------------------------------------------------------------------------------
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_poll(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.poll(timeout=0)
self.assertEqual(f.result(), 0)
f = b.poll(timeout=1)
assert not f.done()
evt = yield f
self.assertEqual(evt, 0)
f = b.poll(timeout=1000)
assert not f.done()
yield a.send_multipart([b'hi', b'there'])
evt = yield f
self.assertEqual(evt, zmq.POLLIN)
recvd = yield b.recv_multipart()
self.assertEqual(recvd, [b'hi', b'there'])
self.loop.run_sync(test)
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def poll(self, timeout=None, flags=_zmq.POLLIN):
"""poll the socket for events
returns a Future for the poll results.
"""
if self.closed:
raise _zmq.ZMQError(_zmq.ENOTSUP)
p = self._poller_class()
p.register(self, flags)
f = p.poll(timeout)
future = self._Future()
def unwrap_result(f):
if future.done():
return
if f.exception():
future.set_exception(f.exeception())
else:
evts = dict(f.result())
future.set_result(evts.get(self, 0))
f.add_done_callback(unwrap_result)
return future
def register(self, socket, address, alias=None, handler=None):
assert not self.registered(address), \
'Socket is already registered!'
if not alias:
alias = address
self.socket[alias] = socket
self.socket[address] = socket
self.socket[socket] = socket
self.address[alias] = address
self.address[socket] = address
self.address[address] = address
if handler is not None:
self.poller.register(socket, zmq.POLLIN)
if address.kind in ('SUB', 'SYNC_SUB'):
self.subscribe(socket, handler)
else:
self._set_handler(socket, handler)
controller_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REQ)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.connect(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
database_proxy_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.PULL)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
master_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REP)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def run(self):
self._loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(self._loop)
self.context = zmq.asyncio.Context()
self.status_sock = self.context.socket(zmq.ROUTER)
self.data_sock = self.context.socket(zmq.PUB)
self.status_sock.bind("tcp://*:%s" % self.status_port)
self.data_sock.bind("tcp://*:%s" % self.data_port)
self.poller = zmq.asyncio.Poller()
self.poller.register(self.status_sock, zmq.POLLIN)
self._loop.create_task(self.poll_sockets())
try:
self._loop.run_forever()
finally:
self.status_sock.close()
self.data_sock.close()
self.context.destroy()
def loop(self):
while self.running:
evts = dict(self.poller.poll(50))
if self.socket in evts and evts[self.socket] == zmq.POLLIN:
msg = self.socket.recv_multipart()
msg_type = msg[0].decode()
name = msg[1].decode()
if msg_type == "done":
self.finished.emit(True)
elif msg_type == "data":
result = [name]
# How many pairs of metadata and data are there?
num_arrays = int((len(msg) - 2)/2)
for i in range(num_arrays):
md, data = msg[2+2*i:4+2*i]
md = json.loads(md.decode())
A = np.frombuffer(data, dtype=md['dtype'])
result.append(A)
self.message.emit(tuple(result))
self.socket.close()
def run(self):
""" Contents of the infinite loop. """
# Create zmq sockets
sockets = SupvisorsZmq(self.supvisors)
# create poller
poller = zmq.Poller()
# register sockets
poller.register(sockets.internal_subscriber.socket, zmq.POLLIN)
poller.register(sockets.puller.socket, zmq.POLLIN)
# poll events forever
while not self.stopping():
socks = dict(poller.poll(500))
# test stop condition again: if Supervisor is stopping,
# any XML-RPC call would block this thread, and the other
# because of the join
if not self.stopping():
self.check_requests(sockets, socks)
self.check_events(sockets.internal_subscriber, socks)
# close resources gracefully
poller.unregister(sockets.puller.socket)
poller.unregister(sockets.internal_subscriber.socket)
sockets.close()
def check_events(self, subscriber, socks):
""" Forward external Supervisor events to main thread. """
if subscriber.socket in socks and \
socks[subscriber.socket] == zmq.POLLIN:
try:
message = subscriber.receive()
except:
print >> stderr, '[ERROR] failed to get data from subscriber'
else:
# The events received are not processed directly in this thread
# because it would conflict with the processing in the
# Supervisor thread, as they use the same data.
# That's why a RemoteCommunicationEvent is used to push the
# event in the Supervisor thread.
self.send_remote_comm_event(
RemoteCommEvents.SUPVISORS_EVENT,
json.dumps(message))
def support_test_send_to_multiple_addresses(self, address1, address2):
poller = zmq.Poller()
socket1 = self.context.socket(roles['listener'])
socket2 = self.context.socket(roles['listener'])
try:
socket1.bind("tcp://%s" % address1)
socket2.bind("tcp://%s" % address2)
poller.register(socket1, zmq.POLLIN)
poller.register(socket2, zmq.POLLIN)
polled = dict(poller.poll(2000))
if socket1 in polled:
socket1.recv()
socket1.send(nw0.sockets._serialise(address1))
elif socket2 in polled:
socket2.recv()
socket2.send(nw0.sockets._serialise(address2))
else:
raise RuntimeError("Nothing found")
finally:
socket1.close()
socket2.close()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def go(self):
self.logger.info('[#############################>. Starting .<#############################]')
while self.is_running:
try:
time.sleep(0.001)
self.heartbeat()
self.free_dead_workers()
for sock, event in self.poller.poll(timeout=POLLING_TIMEOUT):
if event & zmq.POLLIN:
self.handle_in()
if event & zmq.POLLOUT:
self.handle_out()
self.process_sink_results()
except KeyboardInterrupt:
self.logger.debug('Keyboard Interrupt')
self.kill()
except:
self.logger.error("Exception %s" % traceback.format_exc())
self.logger.info('Stopping')
for x in (os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'),
os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address')):
if os.path.exists(x):
os.remove(x)
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.SUB)
await socket.subscribe(b'a')
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XSUB)
await socket.send_multipart([b'\x01a'])
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
pull_socket = socket_factory.create(zmq.PULL)
connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pull_socket.poll(1000) == zmq.POLLIN
message = pull_socket.recv_multipart()
assert message == [b'hello', b'world']
with run_in_background(run) as event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PUSH)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
while not event.is_set():
await asyncio.sleep(0.1)
def test_tcp_pair_socket(event_loop, socket_factory, connect_or_bind):
pair_socket = socket_factory.create(zmq.PAIR)
connect_or_bind(pair_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pair_socket.poll(1000) == zmq.POLLIN
message = pair_socket.recv_multipart()
assert message == [b'hello', b'world']
pair_socket.send_multipart([b'my', b'message'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PAIR)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
message = await asyncio.wait_for(socket.recv_multipart(), 1)
assert message == [b'my', b'message']
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)
def run(self):
""" Start the Authentication Agent thread task """
self.authenticator.start()
zap = self.authenticator.zap_socket
poller = zmq.Poller()
poller.register(self.pipe, zmq.POLLIN)
poller.register(zap, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except zmq.ZMQError:
break # interrupted
if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
terminate = self._handle_pipe()
if terminate:
break
if zap in socks and socks[zap] == zmq.POLLIN:
self._handle_zap()
self.pipe.close()
self.authenticator.stop()
def test_timeout(self):
"""make sure Poller.poll timeout has the right units (milliseconds)."""
s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
poller = self.Poller()
poller.register(s1, zmq.POLLIN)
tic = time.time()
evt = poller.poll(.005)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
tic = time.time()
evt = poller.poll(5)
toc = time.time()
self.assertTrue(toc-tic < 0.1)
self.assertTrue(toc-tic > .001)
tic = time.time()
evt = poller.poll(500)
toc = time.time()
self.assertTrue(toc-tic < 1)
self.assertTrue(toc-tic > 0.1)