def spawn(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
return func(*args, **kwargs)
return eventlet.spawn(context_wrapper, *args, **kwargs)
python类spawn()的实例源码
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def coro_wrap_greenthread():
result = []
gt = eventlet.spawn(eventlet_slow_append, result, 1, 0.020)
value = yield From(aioeventlet.wrap_greenthread(gt))
result.append(value)
gt = eventlet.spawn(eventlet_slow_append, result, 2, 0.010)
value = yield From(aioeventlet.wrap_greenthread(gt))
result.append(value)
gt = eventlet.spawn(eventlet_slow_error)
try:
yield From(aioeventlet.wrap_greenthread(gt))
except ValueError as exc:
result.append(str(exc))
result.append(4)
raise Return(result)
def test_soon_spawn(self):
result = []
def func1():
result.append("spawn")
def func2():
result.append("spawn_after")
self.loop.stop()
def schedule_greenthread():
eventlet.spawn(func1)
eventlet.spawn_after(0.010, func2)
self.loop.call_soon(schedule_greenthread)
self.loop.run_forever()
self.assertEqual(result, ["spawn", "spawn_after"])
def test_yield_future_not_running(self):
result = []
def func(event, fut):
event.send('link')
value = aioeventlet.yield_future(fut)
result.append(value)
self.loop.stop()
event = eventlet.event.Event()
fut = asyncio.Future(loop=self.loop)
eventlet.spawn(func, event, fut)
event.wait()
self.loop.call_soon(fut.set_result, 21)
self.loop.run_forever()
self.assertEqual(result, [21])
def test_yield_future_invalid_type(self):
def func(obj):
return aioeventlet.yield_future(obj)
@asyncio.coroutine
def coro_func():
print("do something")
def regular_func():
return 3
for obj in (coro_func, regular_func):
gt = eventlet.spawn(func, coro_func)
# ignore logged traceback
with tests.mock.patch('traceback.print_exception') as m_print:
self.assertRaises(TypeError, gt.wait)
def test_hub_exceptions(self):
debug.hub_exceptions(True)
server = eventlet.listen(('0.0.0.0', 0))
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
client_2, addr = server.accept()
def hurl(s):
s.recv(1)
{}[1] # keyerror
with capture_stderr() as fake:
gt = eventlet.spawn(hurl, client_2)
eventlet.sleep(0)
client.send(b' ')
eventlet.sleep(0)
# allow the "hurl" greenlet to trigger the KeyError
# not sure why the extra context switch is needed
eventlet.sleep(0)
self.assertRaises(KeyError, gt.wait)
debug.hub_exceptions(False)
# look for the KeyError exception in the traceback
assert 'KeyError: 1' in fake.getvalue(), "Traceback not in:\n" + fake.getvalue()
def test_exceptionleaks(self):
# tests expected behaviour with all versions of greenlet
def test_gt(sem):
try:
raise KeyError()
except KeyError:
sem.release()
hubs.get_hub().switch()
# semaphores for controlling execution order
sem = eventlet.Semaphore()
sem.acquire()
g = eventlet.spawn(test_gt, sem)
try:
sem.acquire()
assert sys.exc_info()[0] is None
finally:
g.kill()
def test_kill(self):
""" Checks that killing a process after the hub runloop dies does
not immediately return to hub greenlet's parent and schedule a
redundant timer. """
hub = hubs.get_hub()
def dummyproc():
hub.switch()
g = eventlet.spawn(dummyproc)
eventlet.sleep(0) # let dummyproc run
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
# kill dummyproc, this schedules a timer to return execution to
# this greenlet before throwing an exception in dummyproc.
# it is from this timer that execution should be returned to this
# greenlet, and not by propogating of the terminating greenlet.
g.kill()
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub, there should be no existing timers
# that switch back to this greenlet and so this hub.switch()
# call should block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
def test_parent(self):
""" Checks that a terminating greenthread whose parent
was a previous, now-defunct hub greenlet returns execution to
the hub runloop and not the hub greenlet's parent. """
hub = hubs.get_hub()
def dummyproc():
pass
g = eventlet.spawn(dummyproc)
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
assert not g.dead # check dummyproc hasn't completed
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub which will allow
# completion of dummyproc.
# this should return execution back to the runloop and not
# this greenlet so that hub.switch() would block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
assert g.dead # sanity check that dummyproc has completed
def test_raised_multiple_readers(self):
debug.hub_prevent_multiple_readers(True)
def handle(sock, addr):
sock.recv(1)
sock.sendall(b"a")
raise eventlet.StopServe()
listener = eventlet.listen(('127.0.0.1', 0))
eventlet.spawn(eventlet.serve, listener, handle)
def reader(s):
s.recv(1)
s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
a = eventlet.spawn(reader, s)
eventlet.sleep(0)
self.assertRaises(RuntimeError, s.recv, 1)
s.sendall(b'b')
a.wait()
def test_zero_timeout_and_back(self):
listen = eventlet.listen(('', 0))
# Keep reference to server side of socket
server = eventlet.spawn(listen.accept)
client = eventlet.connect(listen.getsockname())
client.settimeout(0.05)
# Now must raise socket.timeout
self.assertRaises(socket.timeout, client.recv, 1)
client.settimeout(0)
# Now must raise socket.error with EAGAIN
try:
client.recv(1)
assert False
except socket.error as e:
assert get_errno(e) == errno.EAGAIN
client.settimeout(0.05)
# Now socket.timeout again
self.assertRaises(socket.timeout, client.recv, 1)
server.wait()
def test_pipe_writes_large_messages(self):
r, w = os.pipe()
r = greenio.GreenPipe(r, 'rb')
w = greenio.GreenPipe(w, 'wb')
large_message = b"".join([1024 * six.int2byte(i) for i in range(65)])
def writer():
w.write(large_message)
w.close()
gt = eventlet.spawn(writer)
for i in range(65):
buf = r.read(1024)
expected = 1024 * six.int2byte(i)
self.assertEqual(
buf, expected,
"expected=%r..%r, found=%r..%r iter=%d"
% (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
gt.wait()
def test_socket_file_read_non_int():
listen_socket = eventlet.listen(('localhost', 0))
def server():
conn, _ = listen_socket.accept()
conn.recv(1)
conn.sendall(b'response')
conn.close()
eventlet.spawn(server)
sock = eventlet.connect(listen_socket.getsockname())
fd = sock.makefile('rwb')
fd.write(b'?')
fd.flush()
with eventlet.Timeout(1):
try:
fd.read("This shouldn't work")
assert False
except TypeError:
pass
def test_putting_to_queue(self):
timer = eventlet.Timeout(0.1)
try:
size = 2
self.pool = IntPool(min_size=0, max_size=size)
queue = Queue()
results = []
def just_put(pool_item, index):
self.pool.put(pool_item)
queue.put(index)
for index in six.moves.range(size + 1):
pool_item = self.pool.get()
eventlet.spawn(just_put, pool_item, index)
for _ in six.moves.range(size + 1):
x = queue.get()
results.append(x)
self.assertEqual(sorted(results), list(six.moves.range(size + 1)))
finally:
timer.cancel()
def test_calls_init(self):
init_args = []
class Init(corolocal.local):
def __init__(self, *args):
init_args.append((args, eventlet.getcurrent()))
my_local = Init(1, 2, 3)
self.assertEqual(init_args[0][0], (1, 2, 3))
self.assertEqual(init_args[0][1], eventlet.getcurrent())
def do_something():
my_local.foo = 'bar'
self.assertEqual(len(init_args), 2, init_args)
self.assertEqual(init_args[1][0], (1, 2, 3))
self.assertEqual(init_args[1][1], eventlet.getcurrent())
eventlet.spawn(do_something).wait()
def test_calling_methods(self):
class Caller(corolocal.local):
def callme(self):
return self.foo
my_local = Caller()
my_local.foo = "foo1"
self.assertEqual("foo1", my_local.callme())
def do_something():
my_local.foo = "foo2"
self.assertEqual("foo2", my_local.callme())
eventlet.spawn(do_something).wait()
my_local.foo = "foo3"
self.assertEqual("foo3", my_local.callme())
def test_select_mark_file_as_reopened():
# https://github.com/eventlet/eventlet/pull/294
# Fix API inconsistency in select and Hub.
# mark_as_closed takes one argument, but called without arguments.
# on_error takes file descriptor, but called with an exception object.
s = original_socket.socket()
s.setblocking(0)
s.bind(('127.0.0.1', 0))
s.listen(5)
gt = eventlet.spawn(select.select, [s], [s], [s])
eventlet.sleep(0.01)
with eventlet.Timeout(0.5) as t:
with tests.assert_raises(hubs.IOClosed):
hubs.get_hub().mark_as_reopened(s.fileno())
gt.wait()
t.cancel()
def test_ssl_close(self):
def serve(listener):
sock, addr = listener.accept()
sock.recv(8192)
try:
self.assertEqual(b'', sock.recv(8192))
except greenio.SSL.ZeroReturnError:
pass
sock = listen_ssl_socket()
server_coro = eventlet.spawn(serve, sock)
raw_client = eventlet.connect(sock.getsockname())
client = ssl.wrap_socket(raw_client)
client.sendall(b'X')
greenio.shutdown_safe(client)
client.close()
server_coro.wait()
def test_ssl_unwrap(self):
def serve():
sock, addr = listener.accept()
self.assertEqual(sock.recv(6), b'before')
sock_ssl = ssl.wrap_socket(sock, tests.private_key_file, tests.certificate_file,
server_side=True)
sock_ssl.do_handshake()
self.assertEqual(sock_ssl.recv(6), b'during')
sock2 = sock_ssl.unwrap()
self.assertEqual(sock2.recv(5), b'after')
sock2.close()
listener = eventlet.listen(('127.0.0.1', 0))
server_coro = eventlet.spawn(serve)
client = eventlet.connect(listener.getsockname())
client.sendall(b'before')
client_ssl = ssl.wrap_socket(client)
client_ssl.do_handshake()
client_ssl.sendall(b'during')
client2 = client_ssl.unwrap()
client2.sendall(b'after')
server_coro.wait()