def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
python类HWM的实例源码
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def get_hwm(self):
"""get the High Water Mark
On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
# return sndhwm, fallback on rcvhwm
try:
return self.getsockopt(zmq.SNDHWM)
except zmq.ZMQError as e:
pass
return self.getsockopt(zmq.RCVHWM)
else:
return self.getsockopt(zmq.HWM)
def _setup_ipc(self):
'''
Subscribe to the pub IPC
and publish the messages
on the right transport.
'''
self.ctx = zmq.Context()
log.debug('Setting up the publisher puller')
self.sub = self.ctx.socket(zmq.PULL)
self.sub.bind(PUB_IPC_URL)
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
def set_hwm(self, value):
"""set the High Water Mark
On libzmq ? 3, this sets both SNDHWM and RCVHWM
.. warning::
New values only take effect for subsequent socket
bind/connects.
"""
major = zmq.zmq_version_info()[0]
if major >= 3:
raised = None
try:
self.sndhwm = value
except Exception as e:
raised = e
try:
self.rcvhwm = value
except Exception as e:
raised = e
if raised:
raise raised
else:
return self.setsockopt(zmq.HWM, value)
def _setup_ipc(self):
'''
Setup the listener ICP pusher.
'''
log.debug('Setting up the listener IPC pusher')
self.ctx = zmq.Context()
self.pub = self.ctx.socket(zmq.PUSH)
self.pub.connect(LST_IPC_URL)
log.debug('Setting HWM for the listener: %d', self.opts['hwm'])
try:
self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def _setup_ipc(self):
'''
Subscribe to the right topic
in the device IPC and publish to the
publisher proxy.
'''
self.ctx = zmq.Context()
# subscribe to device IPC
log.debug('Creating the dealer IPC for %s', self._name)
self.sub = self.ctx.socket(zmq.DEALER)
if six.PY2:
self.sub.setsockopt(zmq.IDENTITY, self._name)
elif six.PY3:
self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8'))
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
# subscribe to the corresponding IPC pipe
self.sub.connect(DEV_IPC_URL)
# self.sub.setsockopt(zmq.SUBSCRIBE, '')
# publish to the publisher IPC
self.pub = self.ctx.socket(zmq.PUSH)
self.pub.connect(PUB_IPC_URL)
try:
self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def start(self):
'''
Startup the zmq consumer.
'''
zmq_uri = '{protocol}://{address}:{port}'.format(
protocol=self.protocol,
address=self.address,
port=self.port
) if self.port else\
'{protocol}://{address}'.format( # noqa
protocol=self.protocol,
address=self.address
)
log.debug('ZMQ URI: %s', zmq_uri)
self.ctx = zmq.Context()
if hasattr(zmq, self.type):
skt_type = getattr(zmq, self.type)
else:
skt_type = zmq.PULL
self.sub = self.ctx.socket(skt_type)
self.sub.connect(zmq_uri)
if self.hwm is not None:
try:
self.sub.setsockopt(zmq.HWM, self.hwm)
except AttributeError:
self.sub.setsockopt(zmq.RCVHWM, self.hwm)
if self.recvtimeout is not None:
log.debug('Setting RCVTIMEO to %d', self.recvtimeout)
self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout)
if self.keepalive is not None:
log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive)
self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive)
if self.keepalive_idle is not None:
log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle)
if self.keepalive_interval is not None:
log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval)
self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
def _setup_ipc(self):
'''
Setup the IPC pub and sub.
Subscript to the listener IPC
and publish to the device specific IPC.
'''
log.debug('Setting up the server IPC puller to receive from the listener')
self.ctx = zmq.Context()
# subscribe to listener
self.sub = self.ctx.socket(zmq.PULL)
self.sub.bind(LST_IPC_URL)
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
# device publishers
log.debug('Creating the router ICP on the server')
self.pub = self.ctx.socket(zmq.ROUTER)
self.pub.bind(DEV_IPC_URL)
try:
self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def set_socket_option(self, name, option, value):
"""calls ``zmq.setsockopt`` on the given socket.
:param name: the name of the socket where data will pad through
:param option: the option from the ``zmq`` module
:param value:
Here are some examples of options:
* ``zmq.HWM``: Set high water mark
* ``zmq.AFFINITY``: Set I/O thread affinity
* ``zmq.IDENTITY``: Set socket identity
* ``zmq.SUBSCRIBE``: Establish message filter
* ``zmq.UNSUBSCRIBE``: Remove message filter
* ``zmq.SNDBUF``: Set kernel transmit buffer size
* ``zmq.RCVBUF``: Set kernel receive buffer size
* ``zmq.LINGER``: Set linger period for socket shutdown
* ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections
* for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>>
>>> # block after 10 messages are queued
>>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
"""
socket = self.get_by_name(name)
socket.setsockopt(option, value)
def test_recv_during_send(self):
sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ)
eventlet.sleep()
done = eventlet.Event()
try:
SNDHWM = zmq.SNDHWM
except AttributeError:
# ZeroMQ <3.0
SNDHWM = zmq.HWM
sender.setsockopt(SNDHWM, 10)
sender.setsockopt(zmq.SNDBUF, 10)
receiver.setsockopt(zmq.RCVBUF, 10)
def tx():
tx_i = 0
while tx_i <= 1000:
sender.send(str(tx_i).encode())
tx_i += 1
done.send(0)
eventlet.spawn(tx)
final_i = done.wait()
self.assertEqual(final_i, 0)