def test_getsockopt_events(self):
sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER)
eventlet.sleep()
poll_out = zmq.Poller()
poll_out.register(sock1, zmq.POLLOUT)
sock_map = poll_out.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock1.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT)
sock1.send(b'')
poll_in = zmq.Poller()
poll_in.register(sock2, zmq.POLLIN)
sock_map = poll_in.poll(100)
self.assertEqual(len(sock_map), 1)
events = sock2.getsockopt(zmq.EVENTS)
self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
python类EVENTS的实例源码
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def get(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
if opt in TIMEOS:
warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
optval = super(_Socket, self).get(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval
def new_data(self):
return self.socket.get(zmq.EVENTS)
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def get(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
if opt in TIMEOS:
warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
optval = super(_Socket, self).get(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def get(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
if opt in TIMEOS:
warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
optval = super(_Socket, self).get(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def get(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
if opt in TIMEOS:
warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
optval = super(_Socket, self).get(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def get(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
if opt in TIMEOS:
warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
optval = super(_Socket, self).get(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def get(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
if opt in TIMEOS:
warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning)
optval = super(_Socket, self).get(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval
def checkForMessage(self, socket):
""" Check on socket activity if there is a complete ZMQ message.
@param socket: ZMQ socket
"""
logging.debug( "Check: {0!s}".format(self.readnotifier.socket()))
self.readnotifier.setEnabled(False)
check = True
try:
while check:
events = self.socket.get(zmq.EVENTS)
check = events & zmq.POLLIN
logging.debug( "EVENTS: {0!s}".format(events))
if check:
try:
msg = self.socket.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# state changed since poll event
pass
else:
logging.info( "RECV Error: {0!s}".format(zmq.strerror(e.errno)))
else:
logging.debug( "MSG: {0!s} {1!s}".format(self.readnotifier.socket(), msg))
self.sigMsgRecvd.emit(msg)
except:
pass
else:
self.readnotifier.setEnabled(True)
def test_int_sockopts(self):
"test integer sockopts"
v = zmq.zmq_version_info()
if v < (3,0):
default_hwm = 0
else:
default_hwm = 1000
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
p.setsockopt(zmq.LINGER, 0)
self.assertEqual(p.getsockopt(zmq.LINGER), 0)
p.setsockopt(zmq.LINGER, -1)
self.assertEqual(p.getsockopt(zmq.LINGER), -1)
self.assertEqual(p.hwm, default_hwm)
p.hwm = 11
self.assertEqual(p.hwm, 11)
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
# check for overflow / wrong type:
errors = []
backref = {}
constants = zmq.constants
for name in constants.__all__:
value = getattr(constants, name)
if isinstance(value, int):
backref[value] = name
for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
sopt = backref[opt]
if sopt.startswith((
'ROUTER', 'XPUB', 'TCP', 'FAIL',
'REQ_', 'CURVE_', 'PROBE_ROUTER',
'IPC_FILTER', 'GSSAPI', 'STREAM_',
)):
# some sockopts are write-only
continue
try:
n = p.getsockopt(opt)
except zmq.ZMQError as e:
errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
else:
if n > 2**31:
errors.append("getsockopt(zmq.%s) returned a ridiculous value."
" It is probably the wrong type."%sopt)
if errors:
self.fail('\n'.join([''] + errors))
def test_int_sockopts(self):
"test integer sockopts"
v = zmq.zmq_version_info()
if v < (3,0):
default_hwm = 0
else:
default_hwm = 1000
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
p.setsockopt(zmq.LINGER, 0)
self.assertEqual(p.getsockopt(zmq.LINGER), 0)
p.setsockopt(zmq.LINGER, -1)
self.assertEqual(p.getsockopt(zmq.LINGER), -1)
self.assertEqual(p.hwm, default_hwm)
p.hwm = 11
self.assertEqual(p.hwm, 11)
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
# check for overflow / wrong type:
errors = []
backref = {}
constants = zmq.constants
for name in constants.__all__:
value = getattr(constants, name)
if isinstance(value, int):
backref[value] = name
for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
sopt = backref[opt]
if sopt.startswith((
'ROUTER', 'XPUB', 'TCP', 'FAIL',
'REQ_', 'CURVE_', 'PROBE_ROUTER',
'IPC_FILTER', 'GSSAPI',
)):
# some sockopts are write-only
continue
try:
n = p.getsockopt(opt)
except zmq.ZMQError as e:
errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
else:
if n > 2**31:
errors.append("getsockopt(zmq.%s) returned a ridiculous value."
" It is probably the wrong type."%sopt)
if errors:
self.fail('\n'.join([''] + errors))
def test_int_sockopts(self):
"test integer sockopts"
v = zmq.zmq_version_info()
if v < (3,0):
default_hwm = 0
else:
default_hwm = 1000
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
p.setsockopt(zmq.LINGER, 0)
self.assertEqual(p.getsockopt(zmq.LINGER), 0)
p.setsockopt(zmq.LINGER, -1)
self.assertEqual(p.getsockopt(zmq.LINGER), -1)
self.assertEqual(p.hwm, default_hwm)
p.hwm = 11
self.assertEqual(p.hwm, 11)
# p.setsockopt(zmq.EVENTS, zmq.POLLIN)
self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
# check for overflow / wrong type:
errors = []
backref = {}
constants = zmq.constants
for name in constants.__all__:
value = getattr(constants, name)
if isinstance(value, int):
backref[value] = name
for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
sopt = backref[opt]
if sopt.startswith((
'ROUTER', 'XPUB', 'TCP', 'FAIL',
'REQ_', 'CURVE_', 'PROBE_ROUTER',
'IPC_FILTER', 'GSSAPI',
)):
# some sockopts are write-only
continue
try:
n = p.getsockopt(opt)
except zmq.ZMQError as e:
errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
else:
if n > 2**31:
errors.append("getsockopt(zmq.%s) returned a ridiculous value."
" It is probably the wrong type."%sopt)
if errors:
self.fail('\n'.join([''] + errors))