def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
python类event()的实例源码
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if getattr(self, '_state_event', None):
_stop(self._state_event)
self._state_event = None
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if getattr(self, '_state_event', None):
_stop(self._state_event)
self._state_event = None
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if getattr(self, '_state_event', None):
_stop(self._state_event)
self._state_event = None
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if getattr(self, '_state_event', None):
_stop(self._state_event)
self._state_event = None
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()