def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.launch_service(service)
return launcher
python类spawn_n()的实例源码
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf)
launcher.launch_service(service)
return launcher
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
profiler_info = _serialize_profile_info()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
if profiler_info and profiler:
profiler.init(**profiler_info)
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def test_greenlet(self):
new_mod = """import eventlet
eventlet.monkey_patch()
from eventlet import event
import threading
evt = event.Event()
def test():
print(repr(threading.currentThread()))
evt.send()
eventlet.spawn_n(test)
evt.wait()
print(len(threading._active))
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, "\n".join(lines))
assert lines[0].startswith('<_MainThread'), lines[0]
self.assertEqual(lines[1], "1", lines[1])
def _test_multiple_waiters(self, exception):
evt = event.Event()
results = []
def wait_on_event(i_am_done):
evt.wait()
results.append(True)
i_am_done.send()
if exception:
raise Exception()
waiters = []
count = 5
for i in range(count):
waiters.append(event.Event())
eventlet.spawn_n(wait_on_event, waiters[-1])
eventlet.sleep() # allow spawns to start executing
evt.send()
for w in waiters:
w.wait()
self.assertEqual(len(results), count)
def test_connect_tcp(self):
def accept_once(listenfd):
try:
conn, addr = listenfd.accept()
fd = conn.makefile(mode='wb')
conn.close()
fd.write(b'hello\n')
fd.close()
finally:
listenfd.close()
server = eventlet.listen(('0.0.0.0', 0))
eventlet.spawn_n(accept_once, server)
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb')
client.close()
assert fd.readline() == b'hello\n'
assert fd.read() == b''
fd.close()
check_hub()
def spawn_n(self, function, *args, **kwargs):
"""Create a greenthread to run the *function*, the same as
:meth:`spawn`. The difference is that :meth:`spawn_n` returns
None; the results of *function* are not retrievable.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
g = eventlet.spawn_n(
self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(g)
def spawn_n(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn_n.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
_context = common_context.get_current()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def startloopreport(self):
if self.toxsession.report.tw.hasmarkup:
eventlet.spawn_n(self.toxsession.report._loopreport)
def runtestsmulti(self, envlist):
pool = GreenPool(size=self._toxconfig.option.numproc)
for env in envlist:
pool.spawn_n(self.runtests, env)
pool.waitall()
if not self.toxsession.config.option.sdistonly:
retcode = self._toxsession._summary()
return retcode
def start_ec2(ipaddr):
eventlet.spawn_n(AppManager.start_ec2, ipaddr)
return redirect(url_for('view.show_home'))
# Stop EC2 Instance
def stop_ec2(ipaddr):
eventlet.spawn_n(AppManager.stop_ec2, ipaddr)
return redirect(url_for('view.show_home'))
def __init__(self, log, vpp_cmd_queue_len=None):
self.LOG = log
jsonfiles = []
for root, dirnames, filenames in os.walk('/usr/share/vpp/api/'):
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
self._vpp = vpp_papi.VPP(jsonfiles)
# Sometimes a callback fires unexpectedly. We need to catch them
# because vpp_papi will traceback otherwise
self._vpp.register_event_callback(self._queue_cb)
self.registered_callbacks = {}
for event in self.CallbackEvents:
self.registered_callbacks[event] = []
# NB: a real threading lock
self.event_q_lock = Lock()
self.event_q = []
if vpp_cmd_queue_len is not None:
self._vpp.connect("python-VPPInterface",
rx_qlen=vpp_cmd_queue_len)
else:
self._vpp.connect("python-VPPInterface")
eventlet.spawn_n(self.vpp_watcher_thread)
def test_wrap_greenlet(self):
def func():
eventlet.sleep(0.010)
return "ok"
gt = eventlet.spawn_n(func)
fut = aioeventlet.wrap_greenthread(gt)
result = self.loop.run_until_complete(fut)
self.assertEqual(result, "ok")
def test_wrap_greenlet_exc(self):
self.loop.set_debug(True)
def func():
raise ValueError(7)
gt = eventlet.spawn_n(func)
fut = aioeventlet.wrap_greenthread(gt)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
def test_wrap_greenlet_running(self):
event = eventlet.event.Event()
def func():
try:
gt = eventlet.getcurrent()
fut = aioeventlet.wrap_greenthread(gt)
except Exception as exc:
event.send_exception(exc)
else:
event.send(fut)
eventlet.spawn_n(func)
msg = "wrap_greenthread: the greenthread is running"
self.assertRaisesRegex(RuntimeError, msg, event.wait)
def test_wrap_greenlet_dead(self):
event = eventlet.event.Event()
def func():
event.send('done')
gt = eventlet.spawn_n(func)
event.wait()
msg = "wrap_greenthread: the greenthread already finished"
self.assertRaisesRegex(RuntimeError, msg, aioeventlet.wrap_greenthread, gt)
def test_closure(self):
def spam_to_me(address):
sock = eventlet.connect(address)
while True:
try:
sock.sendall(b'hello world')
# Arbitrary delay to not use all available CPU, keeps the test
# running quickly and reliably under a second
time.sleep(0.001)
except socket.error as e:
if get_errno(e) == errno.EPIPE:
return
raise
server = eventlet.listen(('127.0.0.1', 0))
sender = eventlet.spawn(spam_to_me, server.getsockname())
client, address = server.accept()
server.close()
def reader():
try:
while True:
data = client.recv(1024)
assert data
# Arbitrary delay to not use all available CPU, keeps the test
# running quickly and reliably under a second
time.sleep(0.001)
except socket.error as e:
# we get an EBADF because client is closed in the same process
# (but a different greenthread)
if get_errno(e) != errno.EBADF:
raise
def closer():
client.close()
reader = eventlet.spawn(reader)
eventlet.spawn_n(closer)
reader.wait()
sender.wait()
def test_waiting_for_event(self):
evt = event.Event()
value = 'some stuff'
def send_to_event():
evt.send(value)
eventlet.spawn_n(send_to_event)
self.assertEqual(evt.wait(), value)
def test_028_ssl_handshake_errors(self):
errored = [False]
def server(sock):
try:
wsgi.server(sock=sock, site=hello_world, log=self.logfile)
errored[0] = 'SSL handshake error caused wsgi.server to exit.'
except greenthread.greenlet.GreenletExit:
pass
except Exception as e:
errored[0] = 'SSL handshake error raised exception %s.' % e
raise
for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'):
srv_sock = eventlet.wrap_ssl(
eventlet.listen(('localhost', 0)),
certfile=certificate_file, keyfile=private_key_file,
server_side=True)
addr = srv_sock.getsockname()
g = eventlet.spawn_n(server, srv_sock)
client = eventlet.connect(addr)
if data: # send non-ssl request
client.sendall(data.encode())
else: # close sock prematurely
client.close()
eventlet.sleep(0) # let context switch back to server
assert not errored[0], errored[0]
# make another request to ensure the server's still alive
try:
client = ssl.wrap_socket(eventlet.connect(addr))
client.write(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
result = recvall(client)
assert result.startswith(b'HTTP'), result
assert result.endswith(b'hello world')
except ImportError:
pass # TODO(openssl): should test with OpenSSL
greenthread.kill(g)
def test_connect_ssl(self):
def accept_once(listenfd):
try:
conn, addr = listenfd.accept()
conn.write(b'hello\r\n')
greenio.shutdown_safe(conn)
conn.close()
finally:
greenio.shutdown_safe(listenfd)
listenfd.close()
server = eventlet.wrap_ssl(
eventlet.listen(('0.0.0.0', 0)),
tests.private_key_file,
tests.certificate_file,
server_side=True
)
eventlet.spawn_n(accept_once, server)
raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
client = ssl.wrap_socket(raw_client)
fd = client.makefile('rb', 8192)
assert fd.readline() == b'hello\r\n'
try:
self.assertEqual(b'', fd.read(10))
except greenio.SSL.ZeroReturnError:
# if it's a GreenSSL object it'll do this
pass
greenio.shutdown_safe(client)
client.close()
check_hub()
def free(self):
""" Returns the number of greenthreads available for use.
If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
block the calling greenthread until a slot becomes available."""
return self.sem.counter
def run_spawn_n():
eventlet.spawn_n(dummy, 1)
def run_spawn_n_kw():
eventlet.spawn_n(dummy, i=1)
def run_pool_spawn_n():
pool.spawn_n(dummy, 1)
def run_spawn_n():
eventlet.spawn_n(dummy, 1)
def run_spawn_n_kw():
eventlet.spawn_n(dummy, i=1)
def step(debug):
output_buffer[:] = []
server_sock = eventlet.listen(('localhost', 0))
server_addr = server_sock.getsockname()
sock_wrap = NaughtySocketAcceptWrap(server_sock)
eventlet.spawn_n(
eventlet.wsgi.server,
debug=debug,
log=BufferLog,
max_size=128,
site=tests.wsgi_test.Site(),
sock=server_sock,
)
try:
# req #1 - normal
sock1 = eventlet.connect(server_addr)
sock1.settimeout(0.1)
fd1 = sock1.makefile('rwb')
fd1.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd1.flush()
tests.wsgi_test.read_http(sock1)
# let the server socket ops catch up, set bomb
eventlet.sleep(0)
output_buffer.append("arming...")
sock_wrap.arm()
# req #2 - old conn, post-arm - timeout
fd1.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd1.flush()
try:
tests.wsgi_test.read_http(sock1)
assert False, 'Expected ConnectionClosed exception'
except tests.wsgi_test.ConnectionClosed:
pass
fd1.close()
sock1.close()
finally:
# reset streams, then output trapped tracebacks
sock_wrap.unwrap()
# check output asserts in tests.wsgi_test.TestHttpd
# test_143_server_connection_timeout_exception
return output_buffer[:]