def __init__(self, ctx, url):
self.socket = zmq.Socket(ctx, zmq.PUB)
self.socket.connect(url)
python类Socket()的实例源码
def _bind_socket(self, socket, addr=None, transport=None):
"""
Bind a socket using the corresponding transport and address.
Parameters
----------
socket : zmq.Socket
Socket to bind.
addr : str, default is None
The address to bind to.
transport : str, AgentAddressTransport, default is None
Transport protocol.
Returns
-------
addr : str
The address where the socket binded to.
"""
if transport == 'tcp':
host, port = address_to_host_port(addr)
if not port:
uri = 'tcp://%s' % self.host
port = socket.bind_to_random_port(uri)
addr = self.host + ':' + str(port)
else:
socket.bind('tcp://%s' % (addr))
else:
if not addr:
addr = str(unique_identifier())
if transport == 'ipc':
addr = config['IPC_DIR'] / addr
socket.bind('%s://%s' % (transport, addr))
return addr
def _process_async_rep_event(self, socket, channel, data):
"""
Process a ASYNC_REP socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
channel : AgentChannel
AgentChannel associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data,
serializer=channel.serializer)
address_uuid, request_uuid, data, address = message
client_address = address.twin()
if not self.registered(client_address):
self.connect(address)
handler = self.handler[socket]
is_generator = inspect.isgeneratorfunction(handler)
if is_generator:
generator = handler(self, data)
reply = next(generator)
else:
reply = handler(self, data)
self.send(client_address, (address_uuid, request_uuid, reply))
if is_generator:
execute_code_after_yield(generator)
def _process_sync_pub_event(self, socket, channel, data):
"""
Process a SYNC_PUB socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
channel : AgentChannel
AgentChannel associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
message = deserialize_message(message=data,
serializer=channel.serializer)
address_uuid, request_uuid, data = message
handler = self.handler[socket]
is_generator = inspect.isgeneratorfunction(handler)
if is_generator:
generator = handler(self, data)
reply = next(generator)
else:
reply = handler(self, data)
message = (address_uuid, request_uuid, reply)
self._send_channel_sync_pub(channel=channel,
message=message,
topic=address_uuid,
general=False)
if is_generator:
execute_code_after_yield(generator)
def _process_sub_event(self, socket, addr, data):
"""
Process a SUB socket's event.
Parameters
----------
socket : zmq.Socket
Socket that generated the event.
addr : AgentAddress
AgentAddress associated with the socket that generated the event.
data : bytes
Data received on the socket.
"""
handlers = self.handler[socket]
message = self._process_sub_message(addr.serializer, data)
for topic in handlers:
if not data.startswith(topic):
continue
# Call the handler (with or without the topic)
handler = handlers[topic]
nparams = len(inspect.signature(handler).parameters)
if nparams == 2:
handler(self, message)
elif nparams == 3:
handler(self, message, topic)
def get_unique_external_zmq_sockets(self):
"""
Return an iterable containing all the zmq.Socket objects from
`self.socket` which are not internal, without repetition.
Originally, a socket was internal if its alias was one of the
following:
- loopback
- _loopback_safe
- inproc://loopback
- inproc://_loopback_safe
However, since we are storing more than one entry in the `self.socket`
dictionary per zmq.socket (by storing its AgentAddress, for example),
we need a way to simply get all non-internal zmq.socket objects, and
this is precisely what this function does.
"""
reserved = ('loopback', '_loopback_safe', 'inproc://loopback',
'inproc://_loopback_safe')
external_sockets = []
for k, v in self.socket.items():
if isinstance(k, zmq.sugar.socket.Socket):
continue
if isinstance(k, AgentAddress) and k.address in reserved:
continue
if k in reserved:
continue
external_sockets.append(v)
return set(external_sockets)
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def recv_messages(zmq_subscriber, timeout_count, message_count):
"""Test utility function.
Subscriber thread that receives and counts ZMQ messages.
Args:
zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
timeout_count (int): No. of failed receives until exit.
message_count (int): No. of messages expected to be received.
Returns:
(int) Number of messages received.
"""
# pylint: disable=E1101
fails = 0 # No. of receives that didn't return a message.
receive_count = 0 # Total number of messages received.
while fails < timeout_count:
try:
_ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
fails = 0
receive_count += 1
if receive_count == message_count:
break
except zmq.ZMQError as error:
if error.errno == zmq.EAGAIN:
pass
else:
raise
fails += 1
time.sleep(1e-6)
return receive_count
def __init__(self, *args, **kwargs):
zmq.Socket.__init__(self, *args, **kwargs)
#
# Keep track of which thread this socket was created in
#
self.__dict__['_thread'] = threading.current_thread()
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)
def test_shadow(self):
p = self.socket(zmq.PUSH)
p.bind("tcp://127.0.0.1:5555")
p2 = zmq.Socket.shadow(p.underlying)
self.assertEqual(p.underlying, p2.underlying)
s = self.socket(zmq.PULL)
s2 = zmq.Socket.shadow(s.underlying)
self.assertNotEqual(s.underlying, p.underlying)
self.assertEqual(s.underlying, s2.underlying)
s2.connect("tcp://127.0.0.1:5555")
sent = b'hi'
p2.send(sent)
rcvd = self.recv(s2)
self.assertEqual(rcvd, sent)
def unregister(self, socket):
"""Remove a 0MQ socket or native fd for I/O monitoring.
Parameters
----------
socket : Socket
The socket instance to stop polling.
"""
idx = self._map.pop(socket)
self.sockets.pop(idx)
# shift indices after deletion
for socket, flags in self.sockets[idx:]:
self._map[socket] -= 1
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def _get_descriptors(self):
"""Returns three elements tuple with socket descriptors ready
for gevent.select.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets:
if isinstance(socket, zmq.Socket):
rlist.append(socket.getsockopt(zmq.FD))
continue
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError('Socket must be a 0MQ socket, an integer fd '
'or have a fileno() method: %r' % socket)
if flags & zmq.POLLIN:
rlist.append(fd)
if flags & zmq.POLLOUT:
wlist.append(fd)
if flags & zmq.POLLERR:
xlist.append(fd)
return (rlist, wlist, xlist)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)
def register(self, socket, flags=POLLIN|POLLOUT):
"""p.register(socket, flags=POLLIN|POLLOUT)
Register a 0MQ socket or native fd for I/O monitoring.
register(s,0) is equivalent to unregister(s).
Parameters
----------
socket : zmq.Socket or native socket
A zmq.Socket or any Python object having a ``fileno()``
method that returns a valid file descriptor.
flags : int
The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
If `flags=0`, socket will be unregistered.
"""
if flags:
if socket in self._map:
idx = self._map[socket]
self.sockets[idx] = (socket, flags)
else:
idx = len(self.sockets)
self.sockets.append((socket, flags))
self._map[socket] = idx
elif socket in self._map:
# uregister sockets registered with no events
self.unregister(socket)
else:
# ignore new sockets with no events
pass
def unregister(self, socket):
"""Remove a 0MQ socket or native fd for I/O monitoring.
Parameters
----------
socket : Socket
The socket instance to stop polling.
"""
idx = self._map.pop(socket)
self.sockets.pop(idx)
# shift indices after deletion
for socket, flags in self.sockets[idx:]:
self._map[socket] -= 1
def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
if isinstance(interface_or_socket, zmq.Socket):
self.socket = interface_or_socket
self.ctx = self.socket.context
else:
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)
def test_subclass(self):
"""subclasses can assign attributes"""
class S(zmq.Socket):
a = None
def __init__(self, *a, **kw):
self.a=-1
super(S, self).__init__(*a, **kw)
s = S(self.context, zmq.REP)
self.sockets.append(s)
self.assertEqual(s.a, -1)
s.a=1
self.assertEqual(s.a, 1)
a=s.a
self.assertEqual(a, 1)