python类ENXIO的实例源码

utils.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _open_ipipes(wds, fifos, input_pipes):
    """
    This will attempt to open the named pipes in the set of ``fifos`` for
    writing, which will only succeed if the subprocess has opened them for
    reading already. This modifies and returns the list of write descriptors,
    the list of waiting fifo names, and the mapping back to input adapters.
    """
    for fifo in fifos.copy():
        try:
            fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK)
            input_pipes[fd] = fifos.pop(fifo)
            wds.append(fd)
        except OSError as e:
            if e.errno != errno.ENXIO:
                raise e

    return wds, fifos, input_pipes
test_helpers.py 文件源码 项目:ubuntu-image 作者: CanonicalLtd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_sparse(path):
    # LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file
    # systems, since that seems to return 1, whereas on EXT4 it returns 0.
    # Rather than hard code the value based on file system type (which could
    # be different even on other file systems), a more reliable way seems to
    # be to use SEEK_DATA with an offset of 0 to find the first block of data
    # after position 0.  If there is no data, an ENXIO will get raised, at
    # least on any modern Linux kernels we care about.  See lseek(2) for
    # details.
    with open(path, 'r') as fp:
        try:
            os.lseek(fp.fileno(), 0, os.SEEK_DATA)
        except OSError as error:
            # There is no OSError subclass for ENXIO.
            if error.errno != errno.ENXIO:
                raise
            # The expected exception occurred, meaning, there is no data in
            # the file, so it's entirely sparse.
            return True
    # The expected exception did not occur, so there is data in the file.
    return False
myexc.py 文件源码 项目:python 作者: hienha 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def myconnect(sock, host, port):
    '''myconnect(sock, host, port) -- attempt to make a network connection
    with the given socket and host-port pair; raises our new NetworkError
    exception and collates error number and reason.'''

    try:
        sock.connect(host, port)

    except socket.error, args:
        myargs = updArgs(args)        # convert inst to tuple
        if len(myargs) == 1:        # no #s on some errors
            myargs = (errno.ENXIO, myargs[0])

        raise NetworkError, \
            updArgs(myargs, host + ':' + str(port))


# myopen() --> file object
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def open(self, flags):

        if self._fd is None:
            if not os.path.exists(self.path):
                raise Exception('Input pipe does not exist: %s' % self._path)
            if not stat.S_ISFIFO(os.stat(self.path).st_mode):
                raise Exception('Input pipe must be a fifo object: %s' % self._path)

            try:
                self._fd = os.open(self.path, flags)
            except OSError as e:
                if e.errno != errno.ENXIO:
                    raise e
_fork_pty.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def pty_make_controlling_tty(tty_fd):
    '''This makes the pseudo-terminal the controlling tty. This should be
    more portable than the pty.fork() function. Specifically, this should
    work on Solaris. '''

    child_name = os.ttyname(tty_fd)

    # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
    # if there was no controlling tty to begin with, such as when
    # executed by a cron(1) job.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    os.setsid()

    # Verify we are disconnected from controlling tty by attempting to open
    # it again.  We expect that OSError of ENXIO should always be raised.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
        raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    # Verify we can open child pty.
    fd = os.open(child_name, os.O_RDWR)
    os.close(fd)

    # Verify we now have a controlling tty.
    fd = os.open("/dev/tty", os.O_WRONLY)
    os.close(fd)
_fork_pty.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def pty_make_controlling_tty(tty_fd):
    '''This makes the pseudo-terminal the controlling tty. This should be
    more portable than the pty.fork() function. Specifically, this should
    work on Solaris. '''

    child_name = os.ttyname(tty_fd)

    # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
    # if there was no controlling tty to begin with, such as when
    # executed by a cron(1) job.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    os.setsid()

    # Verify we are disconnected from controlling tty by attempting to open
    # it again.  We expect that OSError of ENXIO should always be raised.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
        raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    # Verify we can open child pty.
    fd = os.open(child_name, os.O_RDWR)
    os.close(fd)

    # Verify we now have a controlling tty.
    fd = os.open("/dev/tty", os.O_WRONLY)
    os.close(fd)
__init__.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
_fork_pty.py 文件源码 项目:Repobot 作者: Desgard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pty_make_controlling_tty(tty_fd):
    '''This makes the pseudo-terminal the controlling tty. This should be
    more portable than the pty.fork() function. Specifically, this should
    work on Solaris. '''

    child_name = os.ttyname(tty_fd)

    # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
    # if there was no controlling tty to begin with, such as when
    # executed by a cron(1) job.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    os.setsid()

    # Verify we are disconnected from controlling tty by attempting to open
    # it again.  We expect that OSError of ENXIO should always be raised.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
        raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    # Verify we can open child pty.
    fd = os.open(child_name, os.O_RDWR)
    os.close(fd)

    # Verify we now have a controlling tty.
    fd = os.open("/dev/tty", os.O_WRONLY)
    os.close(fd)
_fork_pty.py 文件源码 项目:pipenv 作者: pypa 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def pty_make_controlling_tty(tty_fd):
    '''This makes the pseudo-terminal the controlling tty. This should be
    more portable than the pty.fork() function. Specifically, this should
    work on Solaris. '''

    child_name = os.ttyname(tty_fd)

    # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
    # if there was no controlling tty to begin with, such as when
    # executed by a cron(1) job.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    os.setsid()

    # Verify we are disconnected from controlling tty by attempting to open
    # it again.  We expect that OSError of ENXIO should always be raised.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
        raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    # Verify we can open child pty.
    fd = os.open(child_name, os.O_RDWR)
    os.close(fd)

    # Verify we now have a controlling tty.
    fd = os.open("/dev/tty", os.O_WRONLY)
    os.close(fd)
__init__.py 文件源码 项目:ssh-tunnel 作者: aalku 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
myexc.py 文件源码 项目:python 作者: hienha 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def myconnect(sock, host, port):
    try:
        sock.connect((host, port))

    except socket.error, args:
        myargs = updArgs(args)        # convert inst to tuple
        if len(myargs) == 1:        # no #s on some errors
            myargs = (errno.ENXIO, myargs[0])

        raise NetworkError, \
        updArgs(myargs, host + ':' + str(port))
diff.py 文件源码 项目:diffoscope 作者: ReproducibleBuilds 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        try:
            # Try to open the FIFO nonblocking, so we can periodically check
            # if the main thread wants us to wind down.  If it does, there's no
            # more need for the FIFO, so stop the thread.
            while True:
                try:
                    fd = os.open(self.fifo_path, os.O_WRONLY | os.O_NONBLOCK)
                except OSError as error:
                    if error.errno != errno.ENXIO:
                        raise
                    elif self._want_join.is_set():
                        return
                else:
                    break

            # Now clear the fd's nonblocking flag to let writes block normally.
            fcntl.fcntl(fd, fcntl.F_SETFL, 0)

            with open(fd, 'wb') as fifo:
                # The queue works around a unified diff limitation: if there's
                # no newlines in both don't make it a difference
                end_nl = self.feeder(fifo)
                self.end_nl_q.put(end_nl)
        except Exception as error:
            self._exception = error
myexc.py 文件源码 项目:Core-Python-Programming 作者: gdouchufu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def myconnect(sock, host, port):
    try:
        sock.connect((host, port))

    except socket.error, args:
        myargs = updArgs(args)  # conv inst2tuple
        if len(myargs) == 1:  # no #s on some errs
            myargs = (errno.ENXIO, myargs[0])

        raise NetworkError, \
            updArgs(myargs, host + ':' + str(port))
pipeutils.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def recvfd(self):
                """Receive a file descriptor via the pipe."""

                if self.closed:
                        self.debug_msg("recvfd", "failed (closed)")
                        raise IOError(
                            "sendfd() called for closed {0}".format(
                            type(self).__name__))

                try:
                        fcntl_args = struct.pack('i', -1)
                        fcntl_rv = fcntl.ioctl(self.__pipefd,
                            fcntl.I_RECVFD, fcntl_args)
                        fd = struct.unpack('i', fcntl_rv)[0]
                except IOError as e:
                        if e.errno == errno.ENXIO:
                                # other end of the connection was closed
                                return -1
                        self.debug_msg("recvfd", "failed")
                        raise e
                assert fd != -1

                # debugging
                self.debug_dumpfd("recvfd", fd)

                # reset the current file pointer
                si = os.fstat(fd)
                if stat.S_ISREG(si.st_mode):
                        os.lseek(fd, os.SEEK_SET, 0)

                return fd
_fork_pty.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pty_make_controlling_tty(tty_fd):
    '''This makes the pseudo-terminal the controlling tty. This should be
    more portable than the pty.fork() function. Specifically, this should
    work on Solaris. '''

    child_name = os.ttyname(tty_fd)

    # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
    # if there was no controlling tty to begin with, such as when
    # executed by a cron(1) job.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    os.setsid()

    # Verify we are disconnected from controlling tty by attempting to open
    # it again.  We expect that OSError of ENXIO should always be raised.
    try:
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        os.close(fd)
        raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
    except OSError as err:
        if err.errno != errno.ENXIO:
            raise

    # Verify we can open child pty.
    fd = os.open(child_name, os.O_RDWR)
    os.close(fd)

    # Verify we now have a controlling tty.
    fd = os.open("/dev/tty", os.O_WRONLY)
    os.close(fd)
__init__.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)


问题


面经


文章

微信
公众号

扫码关注公众号