python类O_TRUNC的实例源码

mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
crypto.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def writeKeyToFile(key, filename):
    """Write **key** to **filename**, with ``0400`` permissions.

    If **filename** doesn't exist, it will be created. If it does exist
    already, and is writable by the owner of the current process, then it will
    be truncated to zero-length and overwritten.

    :param bytes key: A key (or some other private data) to write to
        **filename**.
    :param str filename: The path of the file to write to.
    :raises: Any exceptions which may occur.
    """
    logging.info("Writing key to file: %r", filename)
    flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0)
    fd = os.open(filename, flags, 0400)
    os.write(fd, key)
    os.fsync(fd)
    os.close(fd)
Utils.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
Utils.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
sftp_server.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        "convert SFTP-style open() flags to python's os.open() flags"
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
mailbox.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError as e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
sftp_server.py 文件源码 项目:SublimeRemoteGDB 作者: summerwinter 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        """convert SFTP-style open() flags to Python's os.open() flags"""
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
elfinfo.py 文件源码 项目:build 作者: fuchsia-mirror 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def strip(self, stripped_filename):
        with mmapper(self.filename) as mapped:
            fd, file = mapped
            ehdr = self.elf.Ehdr.read(file)

            stripped_ehdr = ehdr._replace(e_shoff=0, e_shnum=0, e_shstrndx=0)
            stripped_size = max(phdr.p_offset + phdr.p_filesz
                                for phdr in gen_phdrs(file, self.elf, ehdr)
                                if phdr.p_type == PT_LOAD)
            assert ehdr.e_phoff + (ehdr.e_phnum *
                                   ehdr.e_phentsize) <= stripped_size

            # Create the new file with the same mode as the original.
            with os.fdopen(os.open(stripped_filename,
                                   os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
                                   os.fstat(fd).st_mode & 0777),
                           'wb') as stripped_file:
                stripped_file.write(self.elf.Ehdr.pack(stripped_ehdr))
                stripped_file.write(file[self.elf.Ehdr.size:stripped_size])
mailbox.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
mailbox.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
basetest.py 文件源码 项目:protoc-gen-lua-bin 作者: u0u0 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, stream, filename):
    self._stream = stream
    self._fd = stream.fileno()
    self._filename = filename

    # Keep original stream for later
    self._uncaptured_fd = os.dup(self._fd)

    # Open file to save stream to
    cap_fd = os.open(self._filename,
                     os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
                     0600)

    # Send stream to this file
    self._stream.flush()
    os.dup2(cap_fd, self._fd)
    os.close(cap_fd)
basetest.py 文件源码 项目:protoc-gen-lua-bin 作者: u0u0 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, stream, filename):
    self._stream = stream
    self._fd = stream.fileno()
    self._filename = filename

    # Keep original stream for later
    self._uncaptured_fd = os.dup(self._fd)

    # Open file to save stream to
    cap_fd = os.open(self._filename,
                     os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
                     0600)

    # Send stream to this file
    self._stream.flush()
    os.dup2(cap_fd, self._fd)
    os.close(cap_fd)
cached_file.py 文件源码 项目:baiji 作者: bodylabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def flags(self):
        '''
        Adapted from http://hg.python.org/cpython/file/84cf25da86e8/Lib/_pyio.py#l154

        See also open(2) which explains the modes

        os.O_BINARY and os.O_TEXT are only available on Windows.
        '''
        return (
            ((self.reading and not self.updating) and os.O_RDONLY or 0) |
            ((self.writing and not self.updating) and os.O_WRONLY or 0) |
            ((self.creating_exclusively and not self.updating) and os.O_EXCL or 0) |
            (self.updating and os.O_RDWR or 0) |
            (self.appending and os.O_APPEND or 0) |
            ((self.writing or self.creating_exclusively) and os.O_CREAT or 0) |
            (self.writing and os.O_TRUNC or 0) |
            ((self.binary and hasattr(os, 'O_BINARY')) and os.O_BINARY or 0) |
            ((self.text and hasattr(os, 'O_TEXT')) and os.O_TEXT or 0)
        )
bootstrap.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _write_file(path, data, mode, owner='root'):
    dirpath = os.path.dirname(os.path.abspath(path))
    log.info('Opening {} for locking'.format(dirpath))
    with utils.Directory(dirpath) as d:
        log.info('Taking exclusive lock on {}'.format(dirpath))
        with d.lock():
            umask_original = os.umask(0)
            try:
                flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
                log.info('Writing {} with mode {:o}'.format(path, mode))
                tmppath = path + '.tmp'
                with os.fdopen(os.open(tmppath, flags, mode), 'wb') as f:
                    f.write(data)
                os.rename(tmppath, path)
                user = pwd.getpwnam(owner)
                os.chown(path, user.pw_uid, user.pw_gid)
            finally:
                os.umask(umask_original)
sftp_server.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        """convert SFTP-style open() flags to Python's os.open() flags"""
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
custom.py 文件源码 项目:azure-cli 作者: Azure 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def store_acs_service_principal(subscription_id, client_secret, service_principal,
                                config_path=os.path.join(get_config_dir(),
                                                         'acsServicePrincipal.json')):
    obj = {}
    if client_secret:
        obj['client_secret'] = client_secret
    if service_principal:
        obj['service_principal'] = service_principal

    fullConfig = load_acs_service_principals(config_path=config_path)
    if not fullConfig:
        fullConfig = {}
    fullConfig[subscription_id] = obj

    with os.fdopen(os.open(config_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o600),
                   'w+') as spFile:
        json.dump(fullConfig, spFile)
sftp_server.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        """convert SFTP-style open() flags to Python's os.open() flags"""
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
dbm.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def open(filename, flag='r', mode=0666):
    "open a DBM database"
    if not isinstance(filename, str):
        raise TypeError("expected string")

    openflag = 0

    try:
        openflag = {
            'r': os.O_RDONLY,
            'rw': os.O_RDWR,
            'w': os.O_RDWR | os.O_CREAT,
            'c': os.O_RDWR | os.O_CREAT,
            'n': os.O_RDWR | os.O_CREAT | os.O_TRUNC,
            }[flag]
    except KeyError:
        raise error("arg 2 to open should be 'r', 'w', 'c', or 'n'")

    a_db = getattr(lib, funcs['open'])(filename, openflag, mode)
    if a_db == 0:
        raise error("Could not open file %s.db" % filename)
    return dbm(a_db)
mailbox.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
mailbox.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except OSError as e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
mailbox.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
mailbox.py 文件源码 项目:empyrion-python-api 作者: huhlig 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
sftp_server.py 文件源码 项目:wetland 作者: ohmyadd 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        """convert SFTP-style open() flags to Python's os.open() flags"""
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
tarfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
tarfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0666)
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def set_sequences(self, sequences):
        """Set sequences using the given name-to-key-list dictionary."""
        f = open(os.path.join(self._path, '.mh_sequences'), 'r+')
        try:
            os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
            for name, keys in sequences.iteritems():
                if len(keys) == 0:
                    continue
                f.write('%s:' % name)
                prev = None
                completing = False
                for key in sorted(set(keys)):
                    if key - 1 == prev:
                        if not completing:
                            completing = True
                            f.write('-')
                    elif completing:
                        completing = False
                        f.write('%s %s' % (prev, key))
                    else:
                        f.write(' %s' % key)
                    prev = key
                if completing:
                    f.write(str(prev) + '\n')
                else:
                    f.write('\n')
        finally:
            _sync_close(f)
tarfile.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, name, mode):
        mode = {
            "r": os.O_RDONLY,
            "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
        }[mode]
        if hasattr(os, "O_BINARY"):
            mode |= os.O_BINARY
        self.fd = os.open(name, mode, 0o666)


问题


面经


文章

微信
公众号

扫码关注公众号