python类O_RDONLY的实例源码

forking.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main():
        '''
        Run code specifed by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
utils.py 文件源码 项目:pocket-cli 作者: rakanalh 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_terminal_size():
        def ioctl_GWINSZ(fd):
            try:
                import fcntl
                import termios
                import struct
                cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
                                                     '1234'))
            except:
                return None
            return cr
        cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
        if not cr:
            try:
                fd = os.open(os.ctermid(), os.O_RDONLY)
                cr = ioctl_GWINSZ(fd)
                os.close(fd)
            except:
                pass
        if not cr:
            try:
                cr = (os.env['LINES'], os.env['COLUMNS'])
            except:
                cr = (25, 80)
        return int(cr[1]), int(cr[0])
__init__.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in backwardcompat due to differences on AIX and Jython,
    that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
    return file_uid
stream_test.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testOutputStreams(self):
        output_spec = {
            'mode': 'http',
            'method': 'PUT',
            'url': 'http://localhost:%d' % _socket_port
        }

        fd = os.open(_pipepath, os.O_RDONLY | os.O_NONBLOCK)
        adapters = {
            fd: make_stream_push_adapter(output_spec)
        }
        cmd = [sys.executable, _oscript, _pipepath]

        try:
            with captureOutput() as stdpipes:
                run_process(cmd, adapters)
        except Exception:
            print('Stdout/stderr from exception: ')
            print(stdpipes)
            raise
        self.assertEqual(stdpipes, ['start\ndone\n', ''])
        self.assertEqual(len(_req_chunks), 1)
        self.assertEqual(_req_chunks[0], (9, 'a message'))
Utils.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def h_file_win32(fname):
    try:
        fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
    except OSError:
        raise IOError('Cannot read from %r' % fname)
    f = os.fdopen(fd, 'rb')
    m = md5()
    try:
        while fname:
            fname = f.read(200000)
            m.update(fname)
    finally:
        f.close()
    return m.digest()

# always save these
Utils.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def h_file_win32(fname):
    try:
        fd = os.open(fname, os.O_BINARY | os.O_RDONLY | os.O_NOINHERIT)
    except OSError:
        raise IOError('Cannot read from %r' % fname)
    f = os.fdopen(fd, 'rb')
    m = md5()
    try:
        while fname:
            fname = f.read(200000)
            m.update(fname)
    finally:
        f.close()
    return m.digest()

# always save these
tailbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def follow_file(filename):
    while True:
        try:
            fd = os.open(filename, os.O_RDONLY | os.O_NONBLOCK)
        except OSError:
            yield None
            continue

        try:
            inode = os.fstat(fd).st_ino
            first = True

            while True:
                try:
                    stat = os.stat(filename)
                except OSError:
                    stat = None

                yield first, time.time(), fd
                if stat is None or inode != stat.st_ino:
                    break

                first = False
        finally:
            os.close(fd)
capture.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start(self):
        try:
            os.fstat(self._savefd)
        except OSError:
            raise ValueError("saved filedescriptor not valid, "
                "did you call start() twice?")
        if self.targetfd == 0 and not self.tmpfile:
            fd = os.open(devnullpath, os.O_RDONLY)
            os.dup2(fd, 0)
            os.close(fd)
            if hasattr(self, '_oldsys'):
                setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
        else:
            os.dup2(self.tmpfile.fileno(), self.targetfd)
            if hasattr(self, '_oldsys'):
                setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
__init__.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in backwardcompat due to differences on AIX and Jython,
    that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
    return file_uid
forking.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def main():
        '''
        Run code specified by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
patiencebar.py 文件源码 项目:patiencebar 作者: ceyzeriat 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_terminal_size():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl, termi_os, struct
            cr = struct.unpack('hh', fcntl.ioctl(fd, termi_os.TIOCGWINSZ,'1234'))
        except:
            return
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = _os.open(_os.ctermid(), _os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            _os.cl_ose(fd)
        except:
            pass
    if not cr: cr = (_os.environ.get('LINES', 25), _os.environ.get('COLUMNS', 80))
    return list(map(int, cr))
console.py 文件源码 项目:PGPool 作者: sLoPPydrive 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_terminal_size_linux():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            cr = struct.unpack('hh',
                               fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
            return cr
        except:
            pass
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        try:
            cr = (os.environ['LINES'], os.environ['COLUMNS'])
        except:
            return None
    return int(cr[1]), int(cr[0])
sftp_server.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        "convert SFTP-style open() flags to python's os.open() flags"
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
onewire.py 文件源码 项目:rpi3-webiopi 作者: thortex 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, slave=None, family=0, extra=None):
        Bus.__init__(self, "ONEWIRE", "/sys/bus/w1/devices/w1_bus_master1/w1_master_slaves", os.O_RDONLY)
        if self.fd > 0:
            os.close(self.fd)
            self.fd = 0

        self.family = family
        if  slave != None:
            addr = slave.split("-")
            if len(addr) == 1:
                self.slave = "%02x-%s" % (family, slave)
            elif len(addr) == 2:
                prefix = int(addr[0], 16)
                if family > 0 and family != prefix:
                    raise Exception("1-Wire slave address %s does not match family %02x" % (slave, family))
                self.slave = slave
        else:
            devices = self.deviceList()
            if len(devices) == 0:
                raise Exception("No device match family %02x" % family)
            self.slave = devices[0]

        loadExtraModule(extra)
__init__.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in backwardcompat due to differences on AIX and Jython,
    that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
    return file_uid
output.py 文件源码 项目:defcon-workshop 作者: devsecops 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _getTerminalSize_linux():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl, termios, struct, os
            cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,'1234'))
        except:
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        try:
            cr = (env['LINES'], env['COLUMNS'])
        except:
            return None
    return int(cr[1]), int(cr[0])
knotty.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getTerminalColumns(self):
        def ioctl_GWINSZ(fd):
            try:
                cr = struct.unpack('hh', fcntl.ioctl(fd, self.termios.TIOCGWINSZ, '1234'))
            except:
                return None
            return cr
        cr = ioctl_GWINSZ(sys.stdout.fileno())
        if not cr:
            try:
                fd = os.open(os.ctermid(), os.O_RDONLY)
                cr = ioctl_GWINSZ(fd)
                os.close(fd)
            except:
                pass
        if not cr:
            try:
                cr = (env['LINES'], env['COLUMNS'])
            except:
                cr = (25, 80)
        return cr
term.py 文件源码 项目:pget 作者: halilozercan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _getTerminalSize_linux():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl, termios, struct, os
            cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
        except:
            return None
        return cr

    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        try:
            cr = (os.environ['LINES'], os.environ['COLUMNS'])
        except:
            return None
    return int(cr[1]), int(cr[0])
forking.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
        '''
        Run code specifed by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
uuid.py 文件源码 项目:Solfege 作者: RannyeriDev 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_random_reader():
    global _random_reader, _random_init
    if not _random_init:
        try:
            import random
            try:
                _random_reader = open("/dev/urandom","rb").read
            except:
                _random_reader = os.fdopen(os.open("/dev/random",os.O_RDONLY|os.O_NONBLOCK),"rb").read
            sec,usec = _gettimeofday()
            random.seed((os.getpid() << 16) ^ os.getuid() ^ sec ^ usec)
        except:
            pass
        _random_init = True
    sec,usec = _gettimeofday()
    i = (sec ^ usec) & 0x1F
    while i>0:
        _randint(0,_maxint)
        i -= 1
    return _random_reader
terminalsize.py 文件源码 项目:AsciiDots-Java 作者: LousyLynx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_terminal_size_linux():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            cr = struct.unpack('hh',
                               fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
            return cr
        except:
            pass
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        try:
            cr = (os.environ['LINES'], os.environ['COLUMNS'])
        except:
            return None
    return int(cr[1]), int(cr[0])
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks:
        https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in compat due to differences on AIX and
    Jython, that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError(
                "%s is a symlink; Will not return uid for symlinks" % path
            )
    return file_uid
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack(
                'hh',
                fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
            )
        except:
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
tempfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _stat(fn):
        fd = _os.open(fn, _os.O_RDONLY)
        _os.close(fd)
tarfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks:
        https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in compat due to differences on AIX and
    Jython, that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError(
                "%s is a symlink; Will not return uid for symlinks" % path
            )
    return file_uid
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack(
                'hh',
                fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
            )
        except:
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
file.py 文件源码 项目:transfert 作者: rbernand 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def open(self, flags):
        os_flags = {
            'r': os.O_RDONLY,
            'w': os.O_WRONLY,
            'w+': os.O_WRONLY | os.O_CREAT,
        }
        try:
            self._fd = os.open(self.url.path, os_flags[flags])
        except KeyError:
            raise ValueError('Unknow flags: \'%s\'' % flags)


问题


面经


文章

微信
公众号

扫码关注公众号