def receive_message(self, event, event_data, listener_data):
"""
Receives a messages from another processes.
:param * event: Not used.
:param * event_data: Not used.
:param * listener_data: Not used.
"""
del event, event_data, listener_data
# Make a poller for all incoming sockets.
poller = zmq.Poller()
for socket in self.__end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
# Wait for socket is ready for reading.
socks = dict(poller.poll())
for name, socket in self.__end_points.items():
if socket in socks:
self._receive_message(name, socket)
# ------------------------------------------------------------------------------------------------------------------
python类REP的实例源码
def __register_sockets(self):
"""
Registers ZMQ sockets for communication with other processes in Enarksh.
"""
config = Config.get()
# Register socket for receiving asynchronous incoming messages.
self.message_controller.register_end_point('pull', zmq.PULL, config.get_controller_pull_end_point())
# Create socket for lockstep incoming messages.
self.message_controller.register_end_point('lockstep', zmq.REP, config.get_controller_lockstep_end_point())
# Create socket for sending asynchronous messages to the spanner.
self.message_controller.register_end_point('spawner', zmq.PUSH, config.get_spawner_pull_end_point())
# Create socket for sending asynchronous messages to the logger.
self.message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point())
# ------------------------------------------------------------------------------------------------------------------
def register_end_point(self, name, socket_type, end_point):
"""
Registers an end point.
:param str name: The name of the end point.
:param int socket_type: The socket type, one of
- zmq.PULL for asynchronous incoming messages
- zmq.REP for lockstep incoming messages
- zmq.PUSH for asynchronous outgoing messages
:param str end_point: The end point.
"""
socket = self.__zmq_context.socket(socket_type)
self.__end_points[name] = socket
if socket_type in [zmq.PULL, zmq.REP]:
socket.bind(end_point)
elif socket_type == zmq.PUSH:
socket.connect(end_point)
else:
raise ValueError("Unknown socket type {0}".format(socket_type))
# ------------------------------------------------------------------------------------------------------------------
def no_barking(self, seconds):
"""
During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
the socket. This method prevent incoming sockets barking that the are ready the for reading.
:param int seconds: The number of seconds the give the other ZMQ thread to start up.
"""
sleep(seconds)
for _ in range(1, len(self.end_points)):
poller = zmq.Poller()
for socket in self.end_points.values():
if socket.type in [zmq.PULL, zmq.REP]:
poller.register(socket, zmq.POLLIN)
poller.poll(1)
# ----------------------------------------------------------------------------------------------------------------------
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
if zmq.zmq_version() in ('4.1.1', '4.0.6'):
raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
master_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REP)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(RpcServer, self).__init__()
# ??????????key?????value?????
self.__functions = {}
# zmq????
self.__context = zmq.Context()
self.__socketREP = self.__context.socket(zmq.REP) # ????socket
self.__socketREP.bind(repAddress)
self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket
self.__socketPUB.bind(pubAddress)
# ??????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def create_server():
context = zmq.Context()
try:
main_socket = context.socket(zmq.REP)
main_socket.bind("tcp://*:5555")
freeze_socket = context.socket(zmq.REP)
freeze_socket.bind("tcp://*:6666")
except zmq.error.ZMQError:
print("JustDB already running, this is no error.")
sys.exit()
print("Successfully started \033[92mjustdb\033[0m")
while True: # pragma: no cover
_ = main_socket.recv()
main_socket.send(b"")
_ = freeze_socket.recv()
freeze_socket.send(b"")
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REP)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([b'your', b'answer']),
1,
)
def test_tcp_dealer_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.DEALER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'', b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'', b'your', b'answer']
def test_tcp_big_messages(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'1' * 500, b'2' * 100000]
rep_socket.send_multipart([b'3' * 500, b'4' * 100000])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'1' * 500, b'2' * 100000]),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'3' * 500, b'4' * 100000]
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def listen_thread():
ctx = zmq.Context()
# Reply - act as server
socket = ctx.socket(zmq.REP)
socket.bind('tcp://*:%s' % config.ZEROMQ_PORT)
while True:
request = socket.recv()
print('ZEROMQ request -', request)
socket.send('OK')
if request == 'KILL':
socket.close()
print('Socket closed.')
break
with webapp.app_context():
data = json.loads(request)
print('Sending to SocketIO ...')
socketio.emit(
'new_update',
data['msg'] + ' at ' + data['timestamp'],
namespace='/browser',
)