def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
python类REQ的实例源码
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def test_hwm(self):
zmq3 = zmq.zmq_version_info()[0] >= 3
for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
s = self.context.socket(stype)
s.hwm = 100
self.assertEqual(s.hwm, 100)
if zmq3:
try:
self.assertEqual(s.sndhwm, 100)
except AttributeError:
pass
try:
self.assertEqual(s.rcvhwm, 100)
except AttributeError:
pass
s.close()
def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)
poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)
poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)
def test_monitor_connected(self):
"""Test connected monitoring socket."""
s_rep = self.context.socket(zmq.REP)
s_req = self.context.socket(zmq.REQ)
self.sockets.extend([s_rep, s_req])
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
self.sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
# test receive event for connected event
m = recv_monitor_message(s_event)
self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self):
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_in('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
req = self.context.socket(zmq.REQ)
port = req.bind_to_random_port('tcp://127.0.0.1')
dev.connect_out('tcp://127.0.0.1:%i'%port)
dev.start()
time.sleep(.25)
msg = b'hello again'
req.send(msg)
self.assertEqual(msg, self.recv(req))
del dev
req.close()
def __init__(self, server_ip, server_port, task_id='', debug=False):
if debug:
l.setLevel(logging.DEBUG)
l.debug("Hydra Analyser initiated...")
self.server_ip = server_ip
self.port = server_port
self.task_id = task_id
self.data = {} # This is where all received data will be stored
self.context = zmq.Context.instance()
self.poller = zmq.Poller()
self.req_msg = hdaemon_pb2.CommandMessage()
self.resp_msg = hdaemon_pb2.ResponseMessage()
l.debug("Connecting to server at [%s:%s]", self.server_ip, self.port)
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://%s:%s" % (self.server_ip, self.port))
l.debug("Connected...")
def test_app_communication(self):
tapp = 'testapp2'
# clean up any previous app by this name
self.rt.delete_app(tapp)
self.rt.create_hydra_app(name=tapp, app_path='hydra.selftest.agents.Test',
app_args='5598 0',
cpus=0.01, mem=32)
taskip = self.rt.find_ip_uniqueapp(tapp)
tasks = self.rt.get_app_tasks(tapp)
self.assertTrue(len(tasks) == 1)
self.assertTrue(len(tasks[0].ports) == 1)
taskport = str(tasks[0].ports[0])
pprint('task is launched at ip=' + taskip + ":" + taskport)
# now send a message to this app to find out how it's doing
zctx = zmq.Context()
zsocket = zctx.socket(zmq.REQ)
zsocket.connect("tcp://%s:%s" % (taskip, taskport))
zsocket.send_string('ping')
message = zsocket.recv().decode("utf-8")
# stop and clean up
self.rt.delete_app(tapp)
self.assertEqual(message, 'pong')
def __init__(self, worker_id, outside_ros=False):
self.worker_id = worker_id
self.outside_ros = outside_ros
if self.outside_ros:
rospy.logwarn('Controller is using ZMQ to get work')
self.context = Context()
self.socket = self.context.socket(REQ)
self.socket.connect('tcp://127.0.0.1:33589')
else:
rospy.logwarn('Controller is using ROS to get work')
self.services = {'get': {'name': '/work/get', 'type': GetWork},
'update': {'name': '/work/update', 'type': UpdateWorkStatus}}
for service_name, service in self.services.items():
rospy.loginfo("Controller is waiting service {}...".format(service['name']))
rospy.wait_for_service(service['name'])
service['call'] = rospy.ServiceProxy(service['name'], service['type'])
def __init__(self, url, pattern=ZmqfPattern.MPUP):
'''
'''
protocol, host, port, uri = zmqf_utils.parse_url(url)
self.context = zmq.Context()
self.pattern = pattern
if self.pattern == ZmqfPattern.MPBS:
self._socket = self.context.socket(zmq.PUB) # @UndefinedVariable
self._socket.connect('%s://%s:%s'% (protocol, host, port))
time.sleep(0.25)
elif self.pattern == ZmqfPattern.MPUP:
self._socket = self.context.socket(zmq.PUSH) # @UndefinedVariable
self._socket.connect('%s://%s:%s'% (protocol, host, port))
elif self.pattern == ZmqfPattern.MRER:
self._socket = self.context.socket(zmq.REQ) # @UndefinedVariable
self._socket.connect('%s://%s:%s'% (protocol, host, port))
def __init__(self, host=None, req_port=None, use_security=False):
if host is None:
host = env.get_master_host()
context = zmq.Context()
self._socket = context.socket(zmq.REQ)
self._auth = None
if use_security:
self._auth = Authenticator.instance(
env.get_server_public_key_dir())
self._auth.set_client_key(self._socket, env.get_client_secret_key_path(),
env.get_server_public_key_path())
if req_port is None:
req_port = env.get_req_port()
self._socket.connect(
'tcp://{host}:{port}'.format(host=host, port=req_port))
def __init__(self, name, actor_context = None, endpoints = None):
"""
Create a client
Keyword arguments:
name - Name of the timer
actor_context - ZMQ context of the actor process
endpoints - A list of endpoint strings
"""
self.name = name
self.endpoints = None
self.context = actor_context
self.client_socket = None
if not (endpoints == None):
self.endpoints = endpoints
self.context = zmq.Context()
self.client_socket = self.context.socket(zmq.REQ)
for endpoint in self.endpoints:
self.client_socket.connect(endpoint)
def worker_thread(_url, context, i):
master = context.socket(zmq.REQ)
master.identity = ("Worker-%d" % i).encode('ascii')
master.connect(_url)
# [performance, status]
master.send_multipart([i.to_bytes(1, 'little'), b"", b'READY'])
print("[%s] I'm ready..." % (master.identity.decode('ascii')))
while True:
[client_addr, empty, request] = master.recv_multipart()
assert empty == b""
print("[%s] Processing task... %s / %s" % (master.identity.decode('ascii'),
client_addr.decode('ascii'),
request.decode('ascii')))
time.sleep(randrange(1, 10))
print("[%s] finish task... %s / %s" % (master.identity.decode('ascii'),
client_addr.decode('ascii'),
request.decode('ascii')))
master.send_multipart([i.to_bytes(1, 'little'), b"", client_addr, b"", b"FINISH"])
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism):
"""Ensure that a socket exists, that is *binded* to the given address
and that is registered with the given polling mechanism.
This method is a handy replacement for calling
``.get_or_create()``, ``.bind()`` and then ``.engage()``.
returns the socket itself.
:param socket_name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param address: a valid zeromq address (i.e: inproc://whatevs)
:param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT``
"""
self.get_or_create(socket_name, socket_type, polling_mechanism)
socket = self.bind(socket_name, address, polling_mechanism)
self.engage()
return socket
def get_or_create(self, name, socket_type, polling_mechanism):
"""ensure that a socket exists and is registered with a given
polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
:param name: the socket name
:param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...)
:param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``)
"""
if name not in self.sockets:
self.create(name, socket_type)
socket = self.get_by_name(name)
self.register_socket(socket, polling_mechanism)
return socket
def zmq_request(self, msg_type, msg_content, timeout=__DEFAULT_REQUEST_TIMEOUT):
# new socket to talk to server
self.__socket = zmq.Context().socket(zmq.REQ)
self.__socket.connect("tcp://localhost:" + ZMQPort.RQ)
# init poller and register to socket that web can poll socket to check is it has messages
poller = zmq.Poller()
poller.register(self.__socket, zmq.POLLIN)
send_flatbuf_msg(self.__socket, msg_type, msg_content)
reqs = 0
while reqs * self.__POLL_INTERVAL <= timeout:
socks = dict(poller.poll(self.__POLL_INTERVAL))
if self.__socket in socks and socks[self.__socket] == zmq.POLLIN:
msg = self.__socket.recv()
msgObj = TransMsg.GetRootAsTransMsg(msg, 0)
return msgObj.Content()
reqs = reqs + 1
return False
def __zmq_init(self):
"""
Initializes ZMQ.
"""
config = Config.get()
self.__zmq_context = zmq.Context()
# Create socket for communicating with the controller.
self.__zmq_controller = self.__zmq_context.socket(zmq.REQ)
self.__zmq_controller.connect(config.get_controller_lockstep_end_point())
# ----------------------------------------------------------------------------------------------------------------------
def __zmq_init(self):
"""
Initializes ZMQ.
"""
config = Config.get()
self.__zmq_context = zmq.Context()
# Create socket for communicating with the controller.
self.__zmq_controller = self.__zmq_context.socket(zmq.REQ)
self.__zmq_controller.connect(config.get_controller_lockstep_end_point())
# ----------------------------------------------------------------------------------------------------------------------