def frontendClient(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")
socket.RCVTIMEO = 2000 #we will only wait 2s for a reply
while True:
#randomly request either service A or service B
serviceRequest = random.choice([b'Service A',b'Service B'])
with myLock:
print "client wants %s" % serviceRequest
socket.send(serviceRequest)
try:
reply = socket.recv()
except Exception as e:
print "client timed out"
break
if not reply:
break
with myLock:
print "Client got reply: "
print reply
print
#take a nap
time.sleep(1)
python类REQ的实例源码
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
controller_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REQ)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.connect(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def __init__(self, reqAddress, subAddress):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq????
self.__reqAddress = reqAddress
self.__subAddress = subAddress
self.__context = zmq.Context()
self.__socketREQ = self.__context.socket(zmq.REQ) # ????socket
self.__socketSUB = self.__context.socket(zmq.SUB) # ????socket
# ???????????????????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def __init__(self):
# if not exist server, spawn server, try except around
context = zmq.Context()
# try to start server in background
os.system("justdb serve &")
main_socket = context.socket(zmq.REQ)
main_socket.connect("tcp://localhost:5555")
# print("Connecting to write server")
freeze_socket = context.socket(zmq.REQ)
freeze_socket.connect("tcp://localhost:6666")
self.main_socket = main_socket
self.freeze_socket = freeze_socket
def flash(self):
if self.pid != str(os.getpid()):
# reset process pid
self.pid = str(os.getpid())
# update zmq sockets
# (couldnt share socket in differenet process)
self.zmq_socket = zmq.Context().socket(zmq.REQ)
self.zmq_file_socket = zmq.Context().socket(zmq.DEALER)
# update context
ctx = main_context(self.main_file, self.main_folder)
if self.main_param is not None:
main_config_path = os.path.join(self.main_folder, self.main_param)
params = yaml.load(open(main_config_path, 'r'))
ctx.params = params
self.context = ctx
def main():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://%s:%s" % (config.LISTEN_ON_IP, config.LISTEN_ON_PORT))
while True:
command = input("Command: ")
socket.send(command.encode(config.CODEC))
response = socket.recv().decode(config.CODEC)
print(" ... %s" % response)
words = shlex.split(response.lower())
status = words[0]
if len(words) > 1:
info = words[1:]
if status == "finished":
print("Finished status received from robot")
break
def handle_in(self):
self.msg_count_in += 1
data = self.socket.recv_multipart()
binary, sender = None, None # initialise outside for edge cases
if len(data) == 3:
if data[1] == '': # This is a RPC call from a zmq.REQ socket
sender, _blank, msg_buf = data
self.handle_rpc(sender, msg_factory(msg_buf))
return
sender, msg_buf, binary = data
elif len(data) == 2: # This is an internode call from another zmq.ROUTER, a Controller or Worker
sender, msg_buf = data
msg = msg_factory(msg_buf)
if binary:
msg['data'] = binary
if sender in self.others:
self.handle_peer(sender, msg)
else:
self.handle_worker(sender, msg)
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REP)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([b'your', b'answer']),
1,
)
def test_tcp_router_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
req_socket.identity = b'abcd'
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.ROUTER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
identity = frames.pop(0)
assert identity == req_socket.identity
assert frames == [b'', b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([identity, b'', b'your', b'answer']),
1,
)
def test_tcp_big_messages(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'1' * 500, b'2' * 100000]
rep_socket.send_multipart([b'3' * 500, b'4' * 100000])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'1' * 500, b'2' * 100000]),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'3' * 500, b'4' * 100000]
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()