def _open_ipipes(wds, fifos, input_pipes):
"""
This will attempt to open the named pipes in the set of ``fifos`` for
writing, which will only succeed if the subprocess has opened them for
reading already. This modifies and returns the list of write descriptors,
the list of waiting fifo names, and the mapping back to input adapters.
"""
for fifo in fifos.copy():
try:
fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK)
input_pipes[fd] = fifos.pop(fifo)
wds.append(fd)
except OSError as e:
if e.errno != errno.ENXIO:
raise e
return wds, fifos, input_pipes
python类ENXIO的实例源码
def is_sparse(path):
# LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file
# systems, since that seems to return 1, whereas on EXT4 it returns 0.
# Rather than hard code the value based on file system type (which could
# be different even on other file systems), a more reliable way seems to
# be to use SEEK_DATA with an offset of 0 to find the first block of data
# after position 0. If there is no data, an ENXIO will get raised, at
# least on any modern Linux kernels we care about. See lseek(2) for
# details.
with open(path, 'r') as fp:
try:
os.lseek(fp.fileno(), 0, os.SEEK_DATA)
except OSError as error:
# There is no OSError subclass for ENXIO.
if error.errno != errno.ENXIO:
raise
# The expected exception occurred, meaning, there is no data in
# the file, so it's entirely sparse.
return True
# The expected exception did not occur, so there is data in the file.
return False
def myconnect(sock, host, port):
'''myconnect(sock, host, port) -- attempt to make a network connection
with the given socket and host-port pair; raises our new NetworkError
exception and collates error number and reason.'''
try:
sock.connect(host, port)
except socket.error, args:
myargs = updArgs(args) # convert inst to tuple
if len(myargs) == 1: # no #s on some errors
myargs = (errno.ENXIO, myargs[0])
raise NetworkError, \
updArgs(myargs, host + ':' + str(port))
# myopen() --> file object
def open(self, flags):
if self._fd is None:
if not os.path.exists(self.path):
raise Exception('Input pipe does not exist: %s' % self._path)
if not stat.S_ISFIFO(os.stat(self.path).st_mode):
raise Exception('Input pipe must be a fifo object: %s' % self._path)
try:
self._fd = os.open(self.path, flags)
except OSError as e:
if e.errno != errno.ENXIO:
raise e
def pty_make_controlling_tty(tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def pty_make_controlling_tty(tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def pty_make_controlling_tty(tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def pty_make_controlling_tty(tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def myconnect(sock, host, port):
try:
sock.connect((host, port))
except socket.error, args:
myargs = updArgs(args) # convert inst to tuple
if len(myargs) == 1: # no #s on some errors
myargs = (errno.ENXIO, myargs[0])
raise NetworkError, \
updArgs(myargs, host + ':' + str(port))
def run(self):
try:
# Try to open the FIFO nonblocking, so we can periodically check
# if the main thread wants us to wind down. If it does, there's no
# more need for the FIFO, so stop the thread.
while True:
try:
fd = os.open(self.fifo_path, os.O_WRONLY | os.O_NONBLOCK)
except OSError as error:
if error.errno != errno.ENXIO:
raise
elif self._want_join.is_set():
return
else:
break
# Now clear the fd's nonblocking flag to let writes block normally.
fcntl.fcntl(fd, fcntl.F_SETFL, 0)
with open(fd, 'wb') as fifo:
# The queue works around a unified diff limitation: if there's
# no newlines in both don't make it a difference
end_nl = self.feeder(fifo)
self.end_nl_q.put(end_nl)
except Exception as error:
self._exception = error
def myconnect(sock, host, port):
try:
sock.connect((host, port))
except socket.error, args:
myargs = updArgs(args) # conv inst2tuple
if len(myargs) == 1: # no #s on some errs
myargs = (errno.ENXIO, myargs[0])
raise NetworkError, \
updArgs(myargs, host + ':' + str(port))
def recvfd(self):
"""Receive a file descriptor via the pipe."""
if self.closed:
self.debug_msg("recvfd", "failed (closed)")
raise IOError(
"sendfd() called for closed {0}".format(
type(self).__name__))
try:
fcntl_args = struct.pack('i', -1)
fcntl_rv = fcntl.ioctl(self.__pipefd,
fcntl.I_RECVFD, fcntl_args)
fd = struct.unpack('i', fcntl_rv)[0]
except IOError as e:
if e.errno == errno.ENXIO:
# other end of the connection was closed
return -1
self.debug_msg("recvfd", "failed")
raise e
assert fd != -1
# debugging
self.debug_dumpfd("recvfd", fd)
# reset the current file pointer
si = os.fstat(fd)
if stat.S_ISREG(si.st_mode):
os.lseek(fd, os.SEEK_SET, 0)
return fd
def pty_make_controlling_tty(tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)