def main():
'''
Run code specifed by data received over pipe
'''
assert is_forking(sys.argv)
handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb')
process.current_process()._inheriting = True
preparation_data = load(from_parent)
prepare(preparation_data)
self = load(from_parent)
process.current_process()._inheriting = False
from_parent.close()
exitcode = self._bootstrap()
exit(exitcode)
python类pipe()的实例源码
def shell(cmd):
"""Run each line of a shell script; raise an exception if any line returns
a nonzero value.
"""
pin, pout = os.pipe()
proc = sp.Popen('/bin/bash', stdin=sp.PIPE)
for line in cmd.split('\n'):
line = line.strip()
if line.startswith('#'):
print('\033[33m> ' + line + '\033[0m')
else:
print('\033[32m> ' + line + '\033[0m')
if line.startswith('cd '):
os.chdir(line[3:])
proc.stdin.write((line + '\n').encode('utf-8'))
proc.stdin.write(('echo $? 1>&%d\n' % pout).encode('utf-8'))
ret = ""
while not ret.endswith('\n'):
ret += os.read(pin, 1)
ret = int(ret.strip())
if ret != 0:
print("\033[31mLast command returned %d; bailing out.\033[0m" % ret)
sys.exit(-1)
proc.stdin.close()
proc.wait()
def shell(cmd):
"""Run each line of a shell script; raise an exception if any line returns
a nonzero value.
"""
pin, pout = os.pipe()
proc = sp.Popen('/bin/bash', stdin=sp.PIPE)
for line in cmd.split('\n'):
line = line.strip()
if line.startswith('#'):
print('\033[33m> ' + line + '\033[0m')
else:
print('\033[32m> ' + line + '\033[0m')
if line.startswith('cd '):
os.chdir(line[3:])
proc.stdin.write((line + '\n').encode('utf-8'))
proc.stdin.write(('echo $? 1>&%d\n' % pout).encode('utf-8'))
ret = ""
while not ret.endswith('\n'):
ret += os.read(pin, 1)
ret = int(ret.strip())
if ret != 0:
print("\033[31mLast command returned %d; bailing out.\033[0m" % ret)
sys.exit(-1)
proc.stdin.close()
proc.wait()
def close(self):
"""Close the pipe and calls the _obs_notify() method."""
if self.__filehandle:
try:
try:
file_dispatcher.close(self)
except OSError, oe:
if oe.errno not in self.__ignore_errno:
if self.__logger: self.__logger.exception("Unusual error closing pipe dispatcher")
else: print_exc(file=stderr)
try:
self.__filehandle.close()
except OSError, oe:
if oe.errno not in self.__ignore_errno:
if self.__logger: self.__logger.exception("Unusual error closing pipe filehandle")
else: print_exc(file=stderr)
finally:
self.__filehandle = None
self._obs_notify(event=self.PIPE_CLOSED)
def fetch_data(self, clear=False):
"""Return all the accumulated data from the pipe as a string.
If `clear` is `True`, clear the accumulated data.
"""
if self.__data:
datastr = ''.join(self.__data)
if clear:
self.__data[:] = []
if datastr and self._universal_newlines:
# Take care of a newline split across cleared reads.
stripnl = self.__endedcr
if clear:
self.__endedcr = (datastr[-1] == '\r')
if stripnl and datastr[0] == '\n':
return self._translate_newlines(datastr[1:])
else:
return self._translate_newlines(datastr)
else:
return datastr
else:
return ''
def test_pipe_iostream(self):
r, w = os.pipe()
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
ws.write(b"hel")
ws.write(b"lo world")
rs.read_until(b' ', callback=self.stop)
data = self.wait()
self.assertEqual(data, b"hello ")
rs.read_bytes(3, self.stop)
data = self.wait()
self.assertEqual(data, b"wor")
ws.close()
rs.read_until_close(self.stop)
data = self.wait()
self.assertEqual(data, b"ld")
rs.close()
def test_pipe_iostream_big_write(self):
r, w = os.pipe()
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
NUM_BYTES = 1048576
# Write 1MB of data, which should fill the buffer
ws.write(b"1" * NUM_BYTES)
rs.read_bytes(NUM_BYTES, self.stop)
data = self.wait()
self.assertEqual(data, b"1" * NUM_BYTES)
ws.close()
rs.close()
def read_from_fd(self):
try:
chunk = os.read(self.fd, self.read_chunk_size)
except (IOError, OSError) as e:
if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
return None
elif errno_from_exception(e) == errno.EBADF:
# If the writing half of a pipe is closed, select will
# report it as readable but reads will fail with EBADF.
self.close(exc_info=True)
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
def test_pipe_iostream(self):
r, w = os.pipe()
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
ws.write(b"hel")
ws.write(b"lo world")
rs.read_until(b' ', callback=self.stop)
data = self.wait()
self.assertEqual(data, b"hello ")
rs.read_bytes(3, self.stop)
data = self.wait()
self.assertEqual(data, b"wor")
ws.close()
rs.read_until_close(self.stop)
data = self.wait()
self.assertEqual(data, b"ld")
rs.close()
def test_pipe_iostream_big_write(self):
r, w = os.pipe()
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
NUM_BYTES = 1048576
# Write 1MB of data, which should fill the buffer
ws.write(b"1" * NUM_BYTES)
rs.read_bytes(NUM_BYTES, self.stop)
data = self.wait()
self.assertEqual(data, b"1" * NUM_BYTES)
ws.close()
rs.close()
def test_pipe_iostream(self):
r, w = os.pipe()
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
ws.write(b"hel")
ws.write(b"lo world")
rs.read_until(b' ', callback=self.stop)
data = self.wait()
self.assertEqual(data, b"hello ")
rs.read_bytes(3, self.stop)
data = self.wait()
self.assertEqual(data, b"wor")
ws.close()
rs.read_until_close(self.stop)
data = self.wait()
self.assertEqual(data, b"ld")
rs.close()
def test_pipe_iostream_big_write(self):
r, w = os.pipe()
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
NUM_BYTES = 1048576
# Write 1MB of data, which should fill the buffer
ws.write(b"1" * NUM_BYTES)
rs.read_bytes(NUM_BYTES, self.stop)
data = self.wait()
self.assertEqual(data, b"1" * NUM_BYTES)
ws.close()
rs.close()
def read_from_fd(self):
try:
chunk = os.read(self.fd, self.read_chunk_size)
except (IOError, OSError) as e:
if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
return None
elif errno_from_exception(e) == errno.EBADF:
# If the writing half of a pipe is closed, select will
# report it as readable but reads will fail with EBADF.
self.close(exc_info=True)
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
def test_ThreadLineReader():
def sync_write(data):
reader.clear_processed()
os.write(wp, data)
reader.wait_processed()
rp, wp = os.pipe()
reader = ThreadLineReader(rp)
reader.start()
assert reader.readline() is None
sync_write('foo\n')
assert reader.readline() is None
reader.set_next_flag()
sync_write('bar\n')
assert reader.readline() == 'bar'
reader.terminate()
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.launch_service(service)
return launcher
def __init__(self, reactor, proc, name, fileno, forceReadHack=False):
"""Initialize, specifying a Process instance to connect to.
"""
abstract.FileDescriptor.__init__(self, reactor)
fdesc.setNonBlocking(fileno)
self.proc = proc
self.name = name
self.fd = fileno
if forceReadHack:
self.enableReadHack = True
else:
# Detect if this fd is actually a write-only fd. If it's
# valid to read, don't try to detect closing via read.
# This really only means that we cannot detect a TTY's write
# pipe being closed.
try:
os.read(self.fileno(), 0)
except OSError:
# It's a write-only pipe end, enable hack
self.enableReadHack = True
if self.enableReadHack:
self.startReading()
def _testmain():
s = PeriodicSource("hello", 1, name="src")
d1 = Drain(name="d1")
c = ConsoleSink(name="c")
tf = TransformDrain(lambda x:"Got %r" % x)
t = TermSink(name="t", keepterm=False)
s > d1 > c
d1 > tf > t
p = PipeEngine(s)
p.graph(type="png",target="> /tmp/pipe.png")
p.start()
print p.threadid
time.sleep(5)
p.stop()
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
if duplex:
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close()
s2.close()
else:
fd1, fd2 = os.pipe()
c1 = _multiprocessing.Connection(fd1, writable=False)
c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2
def main():
'''
Run code specified by data received over pipe
'''
assert is_forking(sys.argv)
handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb')
process.current_process()._inheriting = True
preparation_data = load(from_parent)
prepare(preparation_data)
self = load(from_parent)
process.current_process()._inheriting = False
from_parent.close()
exitcode = self._bootstrap()
exit(exitcode)
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def test_FILE_stored_explicitly():
ffi = FFI()
ffi.cdef("int myprintf11(const char *, int); FILE *myfile;")
lib = ffi.verify("""
#include <stdio.h>
FILE *myfile;
int myprintf11(const char *out, int value) {
return fprintf(myfile, out, value);
}
""")
import os
fdr, fdw = os.pipe()
fw1 = os.fdopen(fdw, 'wb', 256)
lib.myfile = ffi.cast("FILE *", fw1)
#
fw1.write(b"X")
r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42))
fw1.close()
assert r == len("hello, 42!\n")
#
result = os.read(fdr, 256)
os.close(fdr)
# the 'X' might remain in the user-level buffer of 'fw1' and
# end up showing up after the 'hello, 42!\n'
assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
def test_FILE_stored_explicitly():
ffi = FFI()
ffi.cdef("int myprintf11(const char *, int); FILE *myfile;")
lib = ffi.verify("""
#include <stdio.h>
FILE *myfile;
int myprintf11(const char *out, int value) {
return fprintf(myfile, out, value);
}
""")
import os
fdr, fdw = os.pipe()
fw1 = os.fdopen(fdw, 'wb', 256)
lib.myfile = ffi.cast("FILE *", fw1)
#
fw1.write(b"X")
r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42))
fw1.close()
assert r == len("hello, 42!\n")
#
result = os.read(fdr, 256)
os.close(fdr)
# the 'X' might remain in the user-level buffer of 'fw1' and
# end up showing up after the 'hello, 42!\n'
assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
subprocess_attach_write_pipe.py 文件源码
项目:annotated-py-asyncio
作者: hhstore
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def task():
rfd, wfd = os.pipe()
args = [sys.executable, '-c', code, str(rfd)]
proc = yield from asyncio.create_subprocess_exec(
*args,
pass_fds={rfd},
stdout=subprocess.PIPE)
pipe = open(wfd, 'wb', 0)
transport, _ = yield from loop.connect_write_pipe(asyncio.Protocol,
pipe)
transport.write(b'data')
stdout, stderr = yield from proc.communicate()
print("stdout = %r" % stdout.decode())
transport.close()
def _newpipe(encoder, decoder):
"""Create new pipe via `os.pipe()` and return `(_GIPCReader, _GIPCWriter)`
tuple.
os.pipe() implementation on Windows (https://goo.gl/CiIWvo):
- CreatePipe(&read, &write, NULL, 0)
- anonymous pipe, system handles buffer size
- anonymous pipes are implemented using named pipes with unique names
- asynchronous (overlapped) read and write operations not supported
os.pipe() implementation on Unix (http://linux.die.net/man/7/pipe):
- based on pipe()
- common Linux: pipe buffer is 4096 bytes, pipe capacity is 65536 bytes
"""
r, w = os.pipe()
return (_GIPCReader(r, decoder), _GIPCWriter(w, encoder))
# Define default encoder and decoder functions for pipe data serialization.
def _write(self, bindata):
"""Write `bindata` to pipe in a gevent-cooperative manner.
POSIX-compliant system notes (http://linux.die.net/man/7/pipe:):
- Since Linux 2.6.11, the pipe capacity is 65536 bytes
- Relevant for large messages (O_NONBLOCK enabled,
n > PIPE_BUF (4096 Byte, usually)):
"If the pipe is full, then write(2) fails, with errno set
to EAGAIN. Otherwise, from 1 to n bytes may be written (i.e.,
a "partial write" may occur; the caller should check the
return value from write(2) to see how many bytes were
actually written), and these bytes may be interleaved with
writes by other processes."
EAGAIN is handled within _write_nonblocking; partial writes here.
"""
bindata = memoryview(bindata)
while True:
# Causes OSError when read end is closed (broken pipe).
bytes_written = _write_nonblocking(self._fd, bindata)
if bytes_written == len(bindata):
break
bindata = bindata[bytes_written:]
def __init__(self, conf, wait_interval=0.01):
"""Constructor.
:param conf: an instance of ConfigOpts
:param wait_interval: The interval to sleep for between checks
of child process exit.
"""
self.conf = conf
# conf.register_opts(_options.service_opts)
self.children = {}
self.sigcaught = None
self.running = True
self.wait_interval = wait_interval
self.launcher = None
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.signal_handler = SignalHandler()
self.handle_signal()
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher(self.conf)
launcher.launch_service(service)
return launcher
def _first_stage():
import os,sys,zlib
R,W=os.pipe()
r,w=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.dup2(r,101)
for f in R,r,W,w:os.close(f)
os.environ['ARGV0']=e=sys.executable
os.execv(e,['mitogen:CONTEXT_NAME'])
os.write(1,'EC0\n')
C=zlib.decompress(sys.stdin.read(input()))
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('%s\n'%len(C)+C)
os.write(1,'EC1\n')
sys.exit(0)
def get_os_language(self):
try:
lang_code, code_page = locale.getdefaultlocale()
#('en_GB', 'cp1252'), en_US,
self.lang_code = lang_code
return lang_code
except:
#Mac fail to run this
pass
if sys.platform == "darwin":
try:
oot = os.pipe()
p = subprocess.Popen(["/usr/bin/defaults", 'read', 'NSGlobalDomain', 'AppleLanguages'], stdout=oot[1])
p.communicate()
lang_code = self.get_default_language_code_for_mac(os.read(oot[0], 10000))
self.lang_code = lang_code
return lang_code
except:
pass
lang_code = 'Unknown'
return lang_code
def init_signals(self):
"""\
Initialize master signal handling. Most of the signals
are queued. Child signals only wake up the master.
"""
# close old PIPE
if self.PIPE:
[os.close(p) for p in self.PIPE]
# initialize the pipe
self.PIPE = pair = os.pipe()
for p in pair:
util.set_non_blocking(p)
util.close_on_exec(p)
self.log.close_on_exec()
# initialize all signals
[signal.signal(s, self.signal) for s in self.SIGNALS]
signal.signal(signal.SIGCHLD, self.handle_chld)