def test_event(self):
"""????event???????????"""
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
_log.info('A: Hey wait for me, I have to do something')
gevent.sleep(3)
_log.info("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
_log.info("I'll wait for you")
evt.wait() # blocking
_log.info("It's about time")
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
python类event()的实例源码
def enable(self, app=None):
""" Enable event loop integration with gevent.
Args:
app: Ignored, it's only a placeholder to keep the call signature of all
gui activation methods consistent, which simplifies the logic of
supporting magics.
Notes:
This methods sets the PyOS_InputHook for gevent, which allows
gevent greenlets to run in the background while interactively using
IPython.
"""
self.manager.set_inputhook(inputhook_gevent)
self._current_gui = GUI_GEVENT
return app
def send_and_wait(self, recipient, message, timeout):
""" Send `message` to `recipient` and wait for the response or `timeout`.
Args:
recipient (address): The address of the node that will receive the
message.
message: The transfer message.
timeout (float): How long should we wait for a response from `recipient`.
Returns:
None: If the wait timed out
object: The result from the event
"""
if not isaddress(recipient):
raise ValueError('recipient is not a valid address.')
self.protocol.send_and_wait(recipient, message, timeout)
def synchronous():
# ??????
from gevent.event import Event
evt = Event()
def setter():
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print('Ok, I\'m done')
evt.set()
def waiter():
print('I\'ll wait for you')
evt.wait()
print('It\'s about time')
gevent.joinall([gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def read_event(self):
'''
Reads one Event from socket until EOL.
Returns Event instance.
Raises LimitExceededError if MAXLINES_PER_EVENT is reached.
'''
buff = ''
for x in range(MAXLINES_PER_EVENT):
line = self.transport.read_line()
if line == '':
self.trace("no more data in read_event !")
raise ConnectError("connection closed")
elif line == EOL:
# When matches EOL, creates Event and returns it.
return Event(buff)
else:
# Else appends line to current buffer.
buff = "%s%s" % (buff, line)
raise LimitExceededError("max lines per event (%d) reached" % MAXLINES_PER_EVENT)
def _api_response(self, event):
'''
Receives api/response callback.
'''
# Gets raw data for this event.
raw = self.read_raw(event)
# If raw was found, this is our Event body.
if raw:
event.set_body(raw)
# Wake up waiting command.
try:
_cmd_uuid, _async_res = self._commands_pool.pop(0)
except (IndexError, ValueError):
raise InternalSyncError("Cannot wakeup command !")
_async_res.set((_cmd_uuid, event))
return None
def _event_plain(self, event):
'''
Receives text/event-plain callback.
'''
# Gets raw data for this event
raw = self.read_raw(event)
# If raw was found drops current event
# and replaces with Event created from raw
if raw:
event = Event(raw)
# Gets raw response from Event Content-Length header
# and raw buffer
raw_response = self.read_raw_response(event, raw)
# If rawresponse was found, this is our Event body
if raw_response:
event.set_body(raw_response)
# Returns Event
return event
def _disconnect_notice(self, event):
'''
Receives text/disconnect-notice callback.
'''
self._closing_state = True
# Gets raw data for this event
raw = self.read_raw(event)
if raw:
event = Event(raw)
# Gets raw response from Event Content-Length header
# and raw buffer
raw_response = self.read_raw_response(event, raw)
# If rawresponse was found, this is our Event body
if raw_response:
event.set_body(raw_response)
return None
def wait_for_event(self, event_name, conditional=None, **kwargs):
result = AsyncResult()
listener = None
def _event_callback(event):
for k, v in kwargs.items():
obj = event
for inst in k.split('__'):
obj = getattr(obj, inst)
if obj != v:
break
else:
if conditional and not conditional(event):
return
listener.remove()
return result.set(event)
listener = self.bot.client.events.on(event_name, _event_callback)
return result
def receive_events(self):
buf = ''
while self._run:
try:
data = self.sock_file.readline()
except Exception:
self._run = False
self.connected = False
self.sock.close()
# logging.exception("Error reading from socket.")
break
if not data:
if self.connected:
logging.error("Error receiving data, is FreeSWITCH running?")
self.connected = False
break
# Empty line
if data == self._EOL:
event = ESLEvent(buf)
buf = ''
self.handle_event(event)
continue
buf += data
def __init__(self):
# number of connected senders that are not closed. incremented by
# OutputPort.open()
self._sender_count = 0
# the connected InputPort
# type: rill.engine.outputport.InputPort
self.inport = None
# the outport currently sending
# type: rill.engine.outputport.OutputPort
self.outport = None
# all connected OutputPorts
# type: Set[rill.engine.outputport.OutputPort]
self.outports = set()
# packet queue and blocking events
self._queue = None
self._not_empty = gevent.event.Event()
self._not_full = gevent.event.Event()
# properties
self.drop_oldest = False
self.count_packets = False
self.metadata = {}
def initialize_reactor(cls):
if not cls._timers:
cls._timers = TimerManager()
cls._timeout_watcher = gevent.spawn(cls.service_timeouts)
cls._new_timer = gevent.event.Event()
def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if getattr(self, '_state_event', None):
_stop(self._state_event)
self._state_event = None
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()
def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__writable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
tic = time.time()
dt = self._gevent_bug_timeout
if dt:
timeout = gevent.Timeout(seconds=dt)
else:
timeout = None
try:
if timeout:
timeout.start()
self.__readable.get(block=True)
except gevent.Timeout as t:
if t is not timeout:
raise
toc = time.time()
# gevent bug: get can raise timeout even on clean return
# don't display zmq bug warning for gevent bug (this is getting ridiculous)
if self._debug_gevent and timeout and toc-tic > dt and \
self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
if timeout:
timeout.cancel()
self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()
def start(self):
# Start grabbing SIGCHLD within libev event loop.
gevent.get_hub().loop.install_sigchld()
# Run new process (based on `fork()` on POSIX-compliant systems).
super(_GProcess, self).start()
# The occurrence of SIGCHLD is recorded asynchronously in libev.
# This guarantees proper behavior even if the child watcher is
# started after the child exits. Start child watcher now.
self._sigchld_watcher = gevent.get_hub().loop.child(self.pid)
self._returnevent = gevent.event.Event()
self._sigchld_watcher.start(
self._on_sigchld, self._sigchld_watcher)
log.debug("SIGCHLD watcher for %s started.", self.pid)
def _on_sigchld(self, watcher):
"""Callback of libev child watcher. Called when libev event loop
catches corresponding SIGCHLD signal.
"""
watcher.stop()
# Status evaluation copied from `multiprocessing.forking` in Py2.7.
if os.WIFSIGNALED(watcher.rstatus):
self._popen.returncode = -os.WTERMSIG(watcher.rstatus)
else:
assert os.WIFEXITED(watcher.rstatus)
self._popen.returncode = os.WEXITSTATUS(watcher.rstatus)
self._returnevent.set()
log.debug("SIGCHLD watcher callback for %s invoked. Exitcode "
"stored: %s", self.pid, self._popen.returncode)
def get(self, timeout=None):
"""Receive, decode and return data from the pipe. Block
gevent-cooperatively until data is available or timeout expires. The
default decoder is ``pickle.loads``.
:arg timeout: ``None`` (default) or a ``gevent.Timeout``
instance. The timeout must be started to take effect and is
canceled when the first byte of a new message arrives (i.e.
providing a timeout does not guarantee that the method completes
within the timeout interval).
:returns: a Python object.
Raises:
- :exc:`gevent.Timeout` (if provided)
- :exc:`GIPCError`
- :exc:`GIPCClosed`
- :exc:`pickle.UnpicklingError`
Recommended usage for silent timeout control::
with gevent.Timeout(TIME_SECONDS, False) as t:
reader.get(timeout=t)
.. warning::
The timeout control is currently not available on Windows,
because Windows can't apply select() to pipe handles.
An ``OSError`` is expected to be raised in case you set a
timeout.
"""
self._validate()
with self._lock:
if timeout:
# Wait for ready-to-read event.
h = gevent.get_hub()
h.wait(h.loop.io(self._fd, 1))
timeout.cancel()
msize, = struct.unpack("!i", self._recv_in_buffer(4).getvalue())
bindata = self._recv_in_buffer(msize).getvalue()
return self._decoder(bindata)
def disable(self):
""" Disable event loop integration with gevent.
This merely sets PyOS_InputHook to NULL.
"""
self.manager.clear_inputhook()
def __init__(self, event):
self.event = event
self.installed = None
self.installed_force = None
self.install_handler()
def event_first_of(*events):
""" Waits until one of `events` is set.
The event returned is /not/ cleared with any of the `events`, this value
must not be reused if the clearing behavior is used.
"""
first_finished = Event()
if not all(isinstance(e, _AbstractLinkable) for e in events):
raise ValueError('all events must be linkable')
for event in events:
event.rawlink(lambda _: first_finished.set())
return first_finished
def get_channel_events(self, channel_address, from_block, to_block=None):
if not isaddress(channel_address):
raise InvalidAddress(
'Expected binary address format for channel in get_channel_events'
)
returned_events = get_all_netting_channel_events(
self.raiden.chain,
channel_address,
events=ALL_EVENTS,
from_block=from_block,
to_block=to_block,
)
raiden_events = self.raiden.transaction_log.get_events_in_block_range(
from_block=from_block,
to_block=to_block
)
# Here choose which raiden internal events we want to expose to the end user
for event in raiden_events:
is_user_transfer_event = isinstance(event.event_object, (
EventTransferSentSuccess,
EventTransferSentFailed,
EventTransferReceivedSuccess
))
if is_user_transfer_event:
new_event = {
'block_number': event.block_number,
'_event_type': type(event.event_object).__name__,
}
new_event.update(event.event_object.__dict__)
returned_events.append(new_event)
return returned_events
def asynchronous():
# ??????
from gevent.event import AsyncResult
# ??????????AsyncRresult?????????????????????
# ???future?defered???????????????????????????
a = AsyncResult()
def setter():
"""
After 3 seconds set the result of a.
"""
gevent.sleep(1)
a.set('Hello!')
def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print(a.get())
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def __init__(self, msg):
gevent.event.Timeout.__init__(self, exception=msg)
def event_object(self):
"""Create an appropriate Event object"""
return gevent.event.Event()
def __call__(self, environ, start_response):
self.environ = environ
uwsgi.websocket_handshake()
self._req_ctx = None
if hasattr(uwsgi, 'request_context'):
# uWSGI >= 2.1.x with support for api access across-greenlets
self._req_ctx = uwsgi.request_context()
else:
# use event and queue for sending messages
from gevent.event import Event
from gevent.queue import Queue
from gevent.select import select
self._event = Event()
self._send_queue = Queue()
# spawn a select greenlet
def select_greenlet_runner(fd, event):
"""Sets event when data becomes available to read on fd."""
while True:
event.set()
try:
select([fd], [], [])[0]
except ValueError:
break
self._select_greenlet = gevent.spawn(
select_greenlet_runner,
uwsgi.connection_fd(),
self._event)
self.app(self)
def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if getattr(self, '_state_event', None):
_stop(self._state_event)
self._state_event = None
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
def __state_changed(self, event=None, _evtype=None):
if self.closed:
self.__cleanup_events()
return
try:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
if events & zmq.POLLOUT:
self.__writable.set()
if events & zmq.POLLIN:
self.__readable.set()