def close(self):
with self.lock:
if self.is_closed:
return
self.is_closed = True
log.debug("Closing connection (%s) to %s" % (id(self), self.host))
cur_gthread = eventlet.getcurrent()
if self._read_watcher and self._read_watcher != cur_gthread:
self._read_watcher.kill()
if self._write_watcher and self._write_watcher != cur_gthread:
self._write_watcher.kill()
if self._socket:
self._socket.close()
log.debug("Closed socket to %s" % (self.host,))
if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.host))
# don't leave in-progress operations hanging
self.connected_event.set()
python类getcurrent()的实例源码
def test_calls_init(self):
init_args = []
class Init(corolocal.local):
def __init__(self, *args):
init_args.append((args, eventlet.getcurrent()))
my_local = Init(1, 2, 3)
self.assertEqual(init_args[0][0], (1, 2, 3))
self.assertEqual(init_args[0][1], eventlet.getcurrent())
def do_something():
my_local.foo = 'bar'
self.assertEqual(len(init_args), 2, init_args)
self.assertEqual(init_args[1][0], (1, 2, 3))
self.assertEqual(init_args[1][1], eventlet.getcurrent())
eventlet.spawn(do_something).wait()
def spawn_n(self, function, *args, **kwargs):
"""Create a greenthread to run the *function*, the same as
:meth:`spawn`. The difference is that :meth:`spawn_n` returns
None; the results of *function* are not retrievable.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
g = eventlet.spawn_n(
self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(g)
def avoid_blocking_call(f, *args, **kwargs):
"""Ensure that the method "f" will not block other greenthreads.
Performs the call to the function "f" received as parameter in a
different thread using tpool.execute when called from a greenthread.
This will ensure that the function "f" will not block other greenthreads.
If not called from a greenthread, it will invoke the function "f" directly.
The function "f" will receive as parameters the arguments "args" and
keyword arguments "kwargs". If eventlet is not installed on the system
then this will call directly the function "f".
"""
if eventlet is None:
return f(*args, **kwargs)
# Note that eventlet.getcurrent will always return a greenlet object.
# In case of a greenthread, the parent greenlet will always be the hub
# loop greenlet.
if eventlet.getcurrent().parent:
return tpool.execute(f, *args, **kwargs)
else:
return f(*args, **kwargs)
def close(self):
with self.lock:
if self.is_closed:
return
self.is_closed = True
log.debug("Closing connection (%s) to %s" % (id(self), self.host))
cur_gthread = eventlet.getcurrent()
if self._read_watcher and self._read_watcher != cur_gthread:
self._read_watcher.kill()
if self._write_watcher and self._write_watcher != cur_gthread:
self._write_watcher.kill()
if self._socket:
self._socket.close()
log.debug("Closed socket to %s" % (self.host,))
if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.host))
# don't leave in-progress operations hanging
self.connected_event.set()
def _fetch_current_thread_functor():
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object
# that will not be recycled; the usage of threading.current_thread()
# doesn't appear to currently be monkey patched and therefore isn't
# reliable to use (and breaks badly when used as all threads share
# the same current_thread() object)...
if eventlet is not None and eventlet_patcher is not None:
if eventlet_patcher.is_monkey_patched('thread'):
return eventlet.getcurrent
return threading.current_thread
def _fetch_current_thread_functor():
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object
# that will not be recycled; the usage of threading.current_thread()
# doesn't appear to currently be monkey patched and therefore isn't
# reliable to use (and breaks badly when used as all threads share
# the same current_thread() object)...
if eventlet is not None and eventlet_patcher is not None:
if eventlet_patcher.is_monkey_patched('thread'):
return eventlet.getcurrent
return threading.current_thread
def _fetch_current_thread_functor():
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object
# that will not be recycled; the usage of threading.current_thread()
# doesn't appear to currently be monkey patched and therefore isn't
# reliable to use (and breaks badly when used as all threads share
# the same current_thread() object)...
if eventlet is not None and eventlet_patcher is not None:
if eventlet_patcher.is_monkey_patched('thread'):
return eventlet.getcurrent
return threading.current_thread
def test_wrap_greenlet_running(self):
event = eventlet.event.Event()
def func():
try:
gt = eventlet.getcurrent()
fut = aioeventlet.wrap_greenthread(gt)
except Exception as exc:
event.send_exception(exc)
else:
event.send(fut)
eventlet.spawn_n(func)
msg = "wrap_greenthread: the greenthread is running"
self.assertRaisesRegex(RuntimeError, msg, event.wait)
def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
Returns the :class:`GreenThread <eventlet.GreenThread>`
object that is running the function, which can be used to retrieve the
results.
If the pool is currently at capacity, ``spawn`` will block until one of
the running greenthreads completes its task and frees up a slot.
This function is reentrant; *function* can call ``spawn`` on the same
pool without risk of deadlocking the whole thing.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
# a bit hacky to use the GT without switching to it
gt = eventlet.greenthread.GreenThread(current)
gt.main(function, args, kwargs)
return gt
else:
self.sem.acquire()
gt = eventlet.spawn(function, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done)
return gt
def _spawn_n_impl(self, func, args, kwargs, coro):
try:
try:
func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
raise
except:
if DEBUG:
traceback.print_exc()
finally:
if coro is None:
return
else:
coro = eventlet.getcurrent()
self._spawn_done(coro)
def wait(self):
if self.value is PENDING:
if self.thread not in (None, eventlet.getcurrent()):
self.thread.wait()
for ch in self.children:
ch.wait()
if self.child_errors > 0 and self.value is not ERROR:
errors = [e for e in self.leaf_errors if not e._recovered]
if errors:
self.value = ERROR
self.exception = (ChildError, ChildError(self, self.leaf_errors), None)
def select(read_list, write_list, error_list, timeout=None):
# error checking like this is required by the stdlib unit tests
if timeout is not None:
try:
timeout = float(timeout)
except ValueError:
raise TypeError("Expected number for timeout")
hub = get_hub()
timers = []
current = eventlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
ds = {}
for r in read_list:
ds[get_fileno(r)] = {'read': r}
for w in write_list:
ds.setdefault(get_fileno(w), {})['write'] = w
for e in error_list:
ds.setdefault(get_fileno(e), {})['error'] = e
listeners = []
def on_read(d):
original = ds[get_fileno(d)]['read']
current.switch(([original], [], []))
def on_write(d):
original = ds[get_fileno(d)]['write']
current.switch(([], [original], []))
def on_timeout2():
current.switch(([], [], []))
def on_timeout():
# ensure that BaseHub.run() has a chance to call self.wait()
# at least once before timed out. otherwise the following code
# can time out erroneously.
#
# s1, s2 = socket.socketpair()
# print(select.select([], [s1], [], 0))
timers.append(hub.schedule_call_global(0, on_timeout2))
if timeout is not None:
timers.append(hub.schedule_call_global(timeout, on_timeout))
try:
for k, v in six.iteritems(ds):
if v.get('read'):
listeners.append(hub.add(hub.READ, k, on_read, current.throw, lambda: None))
if v.get('write'):
listeners.append(hub.add(hub.WRITE, k, on_write, current.throw, lambda: None))
try:
return hub.switch()
finally:
for l in listeners:
hub.remove(l)
finally:
for t in timers:
t.cancel()