def wait_for_exit(self, raise_error=True):
"""Returns a `.Future` which resolves when the process exits.
Usage::
ret = yield proc.wait_for_exit()
This is a coroutine-friendly alternative to `set_exit_callback`
(and a replacement for the blocking `subprocess.Popen.wait`).
By default, raises `subprocess.CalledProcessError` if the process
has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
to suppress this behavior and return the exit status without raising.
.. versionadded:: 4.2
"""
future = Future()
def callback(ret):
if ret != 0 and raise_error:
# Unfortunately we don't have the original args any more.
future.set_exception(CalledProcessError(ret, None))
else:
future.set_result(ret)
self.set_exit_callback(callback)
return future
python类wait()的实例源码
def wait_for_exit(self, raise_error=True):
"""Returns a `.Future` which resolves when the process exits.
Usage::
ret = yield proc.wait_for_exit()
This is a coroutine-friendly alternative to `set_exit_callback`
(and a replacement for the blocking `subprocess.Popen.wait`).
By default, raises `subprocess.CalledProcessError` if the process
has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
to suppress this behavior and return the exit status without raising.
.. versionadded:: 4.2
"""
future = Future()
def callback(ret):
if ret != 0 and raise_error:
# Unfortunately we don't have the original args any more.
future.set_exception(CalledProcessError(ret, None))
else:
future.set_result(ret)
self.set_exit_callback(callback)
return future
def wait_for_exit(self, raise_error=True):
"""Returns a `.Future` which resolves when the process exits.
Usage::
ret = yield proc.wait_for_exit()
This is a coroutine-friendly alternative to `set_exit_callback`
(and a replacement for the blocking `subprocess.Popen.wait`).
By default, raises `subprocess.CalledProcessError` if the process
has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
to suppress this behavior and return the exit status without raising.
.. versionadded:: 4.2
"""
future = Future()
def callback(ret):
if ret != 0 and raise_error:
# Unfortunately we don't have the original args any more.
future.set_exception(CalledProcessError(ret, None))
else:
future.set_result(ret)
self.set_exit_callback(callback)
return future
def chunkify(self, gen):
bs = self.bs
chunks = self.chunks()
chunk = chunks.next()
pos = 0
for src in gen:
srctype = type(src)
src = memoryview(src) if srctype in (str, buffer, bytearray, memoryview) else memoryview(str(src))
slen = len(src)
try:
# fast append
chunk.payload[pos:pos + slen] = src
pos += slen
except ValueError:
# oops - too big - slice & dice
soff = bs - pos
# pad buffer out to end using first n bytes from src
chunk.payload[pos:bs] = src[0:soff]
yield chunk
chunk = chunks.next()
pos = 0
# then carve off full blocks directly from src
while soff + bs <= slen:
chunk.payload[0:bs] = src[soff:soff+bs]
yield chunk
chunk = chunks.next()
soff += bs
# and stash the remainder
pos = slen - soff
chunk.payload[0:pos] = src[soff:soff+pos]
if pos:
yield chunk(pos)
# because every multiprocessing.Process().start() very helpfully
# does a waitpid(WNOHANG) across all known children, and I want
# to use os.wait() to catch exiting children
def test_negotiate(self, group=14):
server = socket.socket()
server.bind(('',0))
server.listen(1)
port = server.getsockname()[1]
pid = os.fork()
# child process - aka, the server
if pid == 0:
sock, _ = server.accept()
server.close()
# parent - aka, the client
else:
sock = socket.socket()
sock.connect(('', port))
server.close()
alice = pyDHE.new(group)
local_key = alice.negotiate(sock)
#sock.close()
if pid == 0:
sock.send(long_to_bytes(local_key))
sock.close()
else:
os.wait()
remote_key = bytes_to_long(sock.recv(1024))
sock.close()
self.assertEqual(local_key, remote_key, "keys do not match")
def send_await(self, msg, deadline=None):
"""Send `msg` and wait for a response with an optional timeout."""
receiver = self.send_async(msg)
response = receiver.get_data(deadline)
IOLOG.debug('%r._send_await() -> %r', self, response)
return response
def _setup_master(self, profiling, parent_id, context_id, in_fd, out_fd):
if profiling:
enable_profiling()
self.broker = Broker()
self.router = Router(self.broker)
self.router.add_handler(self._on_shutdown_msg, SHUTDOWN)
self.master = Context(self.router, 0, 'master')
if parent_id == 0:
self.parent = self.master
else:
self.parent = Context(self.router, parent_id, 'parent')
self.channel = Receiver(self.router, CALL_FUNCTION)
self.stream = Stream(self.router, parent_id)
self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd)
self.stream.receive_side.keep_alive = False
listen(self.broker, 'shutdown', self._on_broker_shutdown)
listen(self.broker, 'exit', self._on_broker_exit)
os.close(in_fd)
try:
os.wait() # Reap first stage.
except OSError:
pass # No first stage exists (e.g. fakessh)
def wait(self):
"""Wait until all servers have completed running."""
try:
if self.children:
self.wait_on_children()
else:
self.pool.waitall()
except KeyboardInterrupt:
pass
def __init__(self, job_state=None):
Job.__init__(self)
self.job_state = job_state
self.procs = []
self.pids = [] # pids in order
self.pipe_status = [] # status in order
self.status = -1 # for 'wait' jobs
def __init__(self):
# pid -> Job instance
# A pipeline that is backgrounded is always run in a SubProgramThunk? So
# you can wait for it once?
self.jobs = {}
def AllDone(self):
"""Test if all jobs are done. Used by 'wait' builtin."""
for job in self.jobs.itervalues():
if job.State() != ProcessState.Done:
return False
return True
def __init__(self):
self.callbacks = {} # pid -> callback
self.last_status = 127 # wait -n error code
def Wait(self):
# This is a list of async jobs
try:
pid, status = os.wait()
except OSError as e:
if e.errno == errno.ECHILD:
#log('WAIT ECHILD')
return False # nothing to wait for caller should stop
else:
# What else can go wrong?
raise
#log('WAIT got %s %s', pid, status)
# TODO: change status in more cases.
if os.WIFSIGNALED(status):
pass
elif os.WIFEXITED(status):
status = os.WEXITSTATUS(status)
#log('exit status: %s', status)
# This could happen via coding error. But this may legitimately happen
# if a grandchild outlives the child (its parent). Then it is reparented
# under this process, so we might receive notification of its exit, even
# though we didn't start it. We can't have any knowledge of such
# processes, so print a warning.
if pid not in self.callbacks:
util.warn("PID %d stopped, but osh didn't start it", pid)
return True # caller should keep waiting
callback = self.callbacks.pop(pid)
callback(pid, status)
self.last_status = status # for wait -n
return True # caller should keep waiting
def test_1_join_on_shutdown(self):
# The usual case: on exit, wait for a non-daemon thread
script = """if 1:
import os
t = threading.Thread(target=joiningfunc,
args=(threading.current_thread(),))
t.start()
time.sleep(0.1)
print 'end of main'
"""
self._run_and_join(script)
def assertScriptHasOutput(self, script, expected_output):
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().decode().replace('\r', '')
self.assertEqual(rc, 0, "Unexpected error")
self.assertEqual(data, expected_output)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertIsNotNone(threading.currentThread().ident)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_1_join_on_shutdown(self):
# The usual case: on exit, wait for a non-daemon thread
script = """if 1:
import os
t = threading.Thread(target=joiningfunc,
args=(threading.current_thread(),))
t.start()
time.sleep(0.1)
print 'end of main'
"""
self._run_and_join(script)
def assertScriptHasOutput(self, script, expected_output):
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().decode().replace('\r', '')
self.assertEqual(rc, 0, "Unexpected error")
self.assertEqual(data, expected_output)
def get_fs_type(path):
cmd = ['/usr/bin/stat', '-f', '-L', '-c', '%T', path]
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
universal_newlines=True)
p.wait()
with p.stdout as f:
return f.readline().strip()
def update_config_from_file(config_opts, config_file, uid_manager):
config_file = os.path.realpath(config_file)
r_pipe, w_pipe = os.pipe()
if os.fork() == 0:
try:
os.close(r_pipe)
if uid_manager and not all(getresuid()):
uid_manager.dropPrivsForever()
include(config_file, config_opts)
with os.fdopen(w_pipe, 'wb') as writer:
pickle.dump(config_opts, writer)
except:
import traceback
etype, evalue, raw_tb = sys.exc_info()
tb = traceback.extract_tb(raw_tb)
tb = [entry for entry in tb if entry[0] == config_file]
print('\n'.join(traceback.format_list(tb)), file=sys.stderr)
print('\n'.join(traceback.format_exception_only(etype, evalue)),
file=sys.stderr)
sys.exit(1)
sys.exit(0)
else:
os.close(w_pipe)
with os.fdopen(r_pipe, 'rb') as reader:
while True:
try:
new_config = reader.read()
break
except OSError as e:
if e.errno != errno.EINTR:
raise
_, ret = os.wait()
if ret != 0:
raise exception.ConfigError('Error in configuration')
if new_config:
config_opts.update(pickle.loads(new_config))