def __init__(self, f, mode='r', bufsize=-1):
if not isinstance(f, six.string_types + (int, file)):
raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
if isinstance(f, six.string_types):
f = open(f, mode, 0)
if isinstance(f, int):
fileno = f
self._name = "<fd:%d>" % fileno
else:
fileno = os.dup(f.fileno())
self._name = f.name
if f.mode != mode:
raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
self._name = f.name
f.close()
super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
set_nonblocking(self)
self.softspace = 0
python类dup()的实例源码
def __init__(self, name, mode='r', closefd=True, opener=None):
if isinstance(name, int):
fileno = name
self._name = "<fd:%d>" % fileno
else:
assert isinstance(name, six.string_types)
with open(name, mode) as fd:
self._name = fd.name
fileno = _original_os.dup(fd.fileno())
notify_opened(fileno)
self._fileno = fileno
self._mode = mode
self._closed = False
set_nonblocking(self)
self._seekable = None
def test_closerange(self):
first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
# We must allocate two consecutive file descriptors, otherwise
# it will mess up other file descriptors (perhaps even the three
# standard ones).
second = os.dup(first)
try:
retries = 0
while second != first + 1:
os.close(first)
retries += 1
if retries > 10:
# XXX test skipped
self.skipTest("couldn't allocate two consecutive fds")
first, second = second, os.dup(second)
finally:
os.close(second)
# close a fd that is open, and one that isn't
os.closerange(first, first + 2)
self.assertRaises(OSError, os.write, first, "a")
def test_closerange(self):
first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
# We must allocate two consecutive file descriptors, otherwise
# it will mess up other file descriptors (perhaps even the three
# standard ones).
second = os.dup(first)
try:
retries = 0
while second != first + 1:
os.close(first)
retries += 1
if retries > 10:
# XXX test skipped
self.skipTest("couldn't allocate two consecutive fds")
first, second = second, os.dup(second)
finally:
os.close(second)
# close a fd that is open, and one that isn't
os.closerange(first, first + 2)
self.assertRaises(OSError, os.write, first, b"a")
def test_small_errpipe_write_fd(self):
"""Issue #15798: Popen should work when stdio fds are available."""
new_stdin = os.dup(0)
new_stdout = os.dup(1)
try:
os.close(0)
os.close(1)
# Side test: if errpipe_write fails to have its CLOEXEC
# flag set this should cause the parent to think the exec
# failed. Extremely unlikely: everyone supports CLOEXEC.
subprocess.Popen([
sys.executable, "-c",
"print('AssertionError:0:CLOEXEC failure.')"]).wait()
finally:
# Restore original stdin and stdout
os.dup2(new_stdin, 0)
os.dup2(new_stdout, 1)
os.close(new_stdin)
os.close(new_stdout)
def test_close_fds_after_preexec(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
# this FD is used as dup2() target by preexec_fn, and should be closed
# in the child process
fd = os.dup(1)
self.addCleanup(os.close, fd)
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True,
preexec_fn=lambda: os.dup2(1, fd))
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertNotIn(fd, remaining_fds)
def get_high_socket_fd(self):
if WIN32:
# The child process will not have any socket handles, so
# calling socket.fromfd() should produce WSAENOTSOCK even
# if there is a handle of the same number.
return socket.socket().detach()
else:
# We want to produce a socket with an fd high enough that a
# freshly created child process will not have any fds as high.
fd = socket.socket().detach()
to_close = []
while fd < 50:
to_close.append(fd)
fd = os.dup(fd)
for x in to_close:
os.close(x)
return fd
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
if duplex:
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close()
s2.close()
else:
fd1, fd2 = os.pipe()
c1 = _multiprocessing.Connection(fd1, writable=False)
c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2
def test_closerange(self):
first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
# We must allocate two consecutive file descriptors, otherwise
# it will mess up other file descriptors (perhaps even the three
# standard ones).
second = os.dup(first)
try:
retries = 0
while second != first + 1:
os.close(first)
retries += 1
if retries > 10:
# XXX test skipped
self.skipTest("couldn't allocate two consecutive fds")
first, second = second, os.dup(second)
finally:
os.close(second)
# close a fd that is open, and one that isn't
os.closerange(first, first + 2)
self.assertRaises(OSError, os.write, first, "a")
def redirect_stdout_to(stream_with_fd):
if stream_with_fd is None:
# no redirect when stream is 'None'
yield
else:
# do not redirect, if streams do not have file descriptors!
try:
_sys_stdout_fileno = sys.stdout.fileno()
_out_stream_fileno = stream_with_fd.fileno()
except:
yield
else:
# save the old stdout stream
old_out_stream = os.dup(sys.stdout.fileno())
os.dup2(stream_with_fd.fileno(), sys.stdout.fileno())
# if all OK, yield back to the caller, catching exceptions
try:
yield
finally:
# restore the previous output stream
os.dup2(old_out_stream, sys.stdout.fileno())
os.close(old_out_stream)
def pya():
global global_pya
import pyaudio
if global_pya == None:
# suppress Jack and ALSA error messages on Linux.
nullfd = os.open("/dev/null", 1)
oerr = os.dup(2)
os.dup2(nullfd, 2)
global_pya = pyaudio.PyAudio()
os.dup2(oerr, 2)
os.close(oerr)
os.close(nullfd)
return global_pya
# find the lowest supported input rate >= rate.
# needed on Linux but not the Mac (which converts as needed).
def _run_pcap_parser(self, rx_tcpdump, interface):
rx, tx = pipe()
# dupe pids for the child process
tcpdump, out = dup(rx_tcpdump), dup(tx)
close(tx)
close(rx_tcpdump)
pid = fork()
if pid:
close(out)
close(tcpdump)
self.child_parser = pid
self.rx = rx
else:
# get rid of unnecessary privileges
drop_privileges()
# start parsing packets
start_pcap_parser(tcpdump, out, interface)
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
if duplex:
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close()
s2.close()
else:
fd1, fd2 = os.pipe()
c1 = _multiprocessing.Connection(fd1, writable=False)
c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2
def generateHits(self, cont):
""" generates the hits.csv file """
hitf = self.outdir + "/hits.csv"
old = os.dup(1)
sys.stdout.flush()
os.close(1)
os.open(hitf, os.O_WRONLY | os.O_CREAT)
cont.printallHits()
sys.stdout.flush()
os.close(1)
os.dup(old)
os.close(old)
lines = open(hitf).readlines()
random.shuffle(lines)
open(hitf, 'w').writelines(lines)
def readStdin(self):
app.log.info('reading from stdin')
# Create a new input stream for the file data.
# Fd is short for file descriptor. os.dup and os.dup2 will duplicate file
# descriptors.
stdinFd = sys.stdin.fileno()
newFd = os.dup(stdinFd)
newStdin = open("/dev/tty")
os.dup2(newStdin.fileno(), stdinFd)
# Create a text buffer to read from alternate stream.
textBuffer = self.newTextBuffer()
try:
with io.open(newFd, "r") as fileInput:
textBuffer.fileFilter(fileInput.read())
except Exception as e:
app.log.exception(e)
app.log.info('finished reading from stdin')
return textBuffer
def test_closerange(self):
first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
# We must allocate two consecutive file descriptors, otherwise
# it will mess up other file descriptors (perhaps even the three
# standard ones).
second = os.dup(first)
try:
retries = 0
while second != first + 1:
os.close(first)
retries += 1
if retries > 10:
# XXX test skipped
self.skipTest("couldn't allocate two consecutive fds")
first, second = second, os.dup(second)
finally:
os.close(second)
# close a fd that is open, and one that isn't
os.closerange(first, first + 2)
self.assertRaises(OSError, os.write, first, b"a")
def test_small_errpipe_write_fd(self):
"""Issue #15798: Popen should work when stdio fds are available."""
new_stdin = os.dup(0)
new_stdout = os.dup(1)
try:
os.close(0)
os.close(1)
# Side test: if errpipe_write fails to have its CLOEXEC
# flag set this should cause the parent to think the exec
# failed. Extremely unlikely: everyone supports CLOEXEC.
subprocess.Popen([
sys.executable, "-c",
"print('AssertionError:0:CLOEXEC failure.')"]).wait()
finally:
# Restore original stdin and stdout
os.dup2(new_stdin, 0)
os.dup2(new_stdout, 1)
os.close(new_stdin)
os.close(new_stdout)
def test_close_fds_after_preexec(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
# this FD is used as dup2() target by preexec_fn, and should be closed
# in the child process
fd = os.dup(1)
self.addCleanup(os.close, fd)
p = subprocess.Popen([sys.executable, fd_status],
stdout=subprocess.PIPE, close_fds=True,
preexec_fn=lambda: os.dup2(1, fd))
output, ignored = p.communicate()
remaining_fds = set(map(int, output.split(b',')))
self.assertNotIn(fd, remaining_fds)
def get_high_socket_fd(self):
if WIN32:
# The child process will not have any socket handles, so
# calling socket.fromfd() should produce WSAENOTSOCK even
# if there is a handle of the same number.
return socket.socket().detach()
else:
# We want to produce a socket with an fd high enough that a
# freshly created child process will not have any fds as high.
fd = socket.socket().detach()
to_close = []
while fd < 50:
to_close.append(fd)
fd = os.dup(fd)
for x in to_close:
os.close(x)
return fd
def stdchannel_redirected(stdchannel, dest_filename):
""" A context manager to temporarily redirect stdout or stderr
:param stdchannel: the channel to redirect
:type stdchannel: sys.std
:param dest_filename: the filename to redirect output to
:type dest_filename: os.devnull
:return: nothing
:rtype: None
"""
try:
oldstdchannel = os.dup(stdchannel.fileno())
dest_file = open(dest_filename, 'w')
os.dup2(dest_file.fileno(), stdchannel.fileno())
yield
finally:
if oldstdchannel is not None:
os.dup2(oldstdchannel, stdchannel.fileno())
if dest_file is not None:
dest_file.close()