python类fdopen()的实例源码

pidlockfile.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
pidlockfile.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
test_verify1.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_FILE_stored_explicitly():
    ffi = FFI()
    ffi.cdef("int myprintf11(const char *, int); FILE *myfile;")
    lib = ffi.verify("""
        #include <stdio.h>
        FILE *myfile;
        int myprintf11(const char *out, int value) {
            return fprintf(myfile, out, value);
        }
    """)
    import os
    fdr, fdw = os.pipe()
    fw1 = os.fdopen(fdw, 'wb', 256)
    lib.myfile = ffi.cast("FILE *", fw1)
    #
    fw1.write(b"X")
    r = lib.myprintf11(b"hello, %d!\n", ffi.cast("int", 42))
    fw1.close()
    assert r == len("hello, 42!\n")
    #
    result = os.read(fdr, 256)
    os.close(fdr)
    # the 'X' might remain in the user-level buffer of 'fw1' and
    # end up showing up after the 'hello, 42!\n'
    assert result == b"Xhello, 42!\n" or result == b"hello, 42!\nX"
test_new_ffi_1.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_ffi_buffer_with_file(self):
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
backend_tests.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ffi_buffer_with_file(self):
        ffi = FFI(backend=self.Backend())
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
cache.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
cache.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
cache.py 文件源码 项目:conda-tools 作者: groutr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _decompress_bz2(filename, blocksize=900*1024):
    """
    Decompress .tar.bz2 to .tar on disk (for faster access)

    Use TemporaryFile to guarentee write access.
    """
    if not filename.endswith('.tar.bz2'):
        return filename

    fd, path = mkstemp()
    with os.fdopen(fd, 'wb') as fo:
        with open(filename, 'rb') as fi:
            z = bz2.BZ2Decompressor()

            for block in iter(lambda: fi.read(blocksize), b''):
                fo.write(z.decompress(block))
    return path
pidlockfile.py 文件源码 项目:ascii-art-py 作者: blinglnav 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def write_pid_to_pidfile(pidfile_path):
    """ Write the PID in the named PID file.

        Get the numeric process ID (“PID”) of the current process
        and write it to the named file as a line of text.

        """
    open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
    open_mode = 0o644
    pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
    pidfile = os.fdopen(pidfile_fd, 'w')

    # According to the FHS 2.3 section on PID files in /var/run:
    #
    #   The file must consist of the process identifier in
    #   ASCII-encoded decimal, followed by a newline character. For
    #   example, if crond was process number 25, /var/run/crond.pid
    #   would contain three characters: two, five, and newline.

    pid = os.getpid()
    pidfile.write("%s\n" % pid)
    pidfile.close()
gnome.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def spawn(argv, stdout=False):
    """Asynchronously run a program. argv[0] is the executable name, which
    must be fully qualified or in the path. If stdout is True, return
    a file object corresponding to the child's standard output; otherwise,
    return the child's process ID.

    argv must be strictly str objects to avoid encoding confusion.
    """

    types = map(type, argv)
    if not (min(types) == max(types) == str):
        raise TypeError("executables and arguments must be str objects")
    logger.debug("Running %r" % " ".join(argv))
    args = GLib.spawn_async(argv=argv, flags=GLib.SpawnFlags.SEARCH_PATH,
                            standard_output=stdout)

    if stdout:
        return os.fdopen(args[2])
    else:
        return args[0]
cache.py 文件源码 项目:Texty 作者: sarthfrey 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
update.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main():
  if sys.platform in ['win32', 'cygwin']:
    return 0

  # This script is called by gclient. gclient opens its hooks subprocesses with
  # (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom
  # output processing that breaks printing '\r' characters for single-line
  # updating status messages as printed by curl and wget.
  # Work around this by setting stderr of the update.sh process to stdin (!):
  # gclient doesn't redirect stdin, and while stdin itself is read-only, a
  # dup()ed sys.stdin is writable, try
  #   fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi')
  # TODO: Fix gclient instead, http://crbug.com/95350
  return subprocess.call(
      [os.path.join(os.path.dirname(__file__), 'update.sh')] +  sys.argv[1:],
      stderr=os.fdopen(
        os.dup(
            sys.stdin.fileno())))
graphdoc.py 文件源码 项目:gprime 作者: GenealogyCollective 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def close(self):
        """ Implements GVSvgDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-4:] != ".svg":
            self._filename += ".svg"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the SVG file.
        os.system( 'dot -Tsvg:cairo -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVSvgzDoc
#
#-------------------------------------------------------------------------------
graphdoc.py 文件源码 项目:gprime 作者: GenealogyCollective 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def close(self):
        """ Implements GVSvgzDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-5:] != ".svgz":
            self._filename += ".svgz"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the SVGZ file.
        os.system( 'dot -Tsvgz -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVPngDoc
#
#-------------------------------------------------------------------------------
graphdoc.py 文件源码 项目:gprime 作者: GenealogyCollective 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def close(self):
        """ Implements GVPngDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-4:] != ".png":
            self._filename += ".png"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the PNG file.
        os.system( 'dot -Tpng -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVJpegDoc
#
#-------------------------------------------------------------------------------
graphdoc.py 文件源码 项目:gprime 作者: GenealogyCollective 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def close(self):
        """ Implements GVGifDoc.close() """
        GVDocBase.close(self)

        # Make sure the extension is correct
        if self._filename[-4:] != ".gif":
            self._filename += ".gif"

        # Create a temporary dot file
        (handle, tmp_dot) = tempfile.mkstemp(".gv" )
        dotfile = os.fdopen(handle, "wb")
        dotfile.write(self._dot.getvalue())
        dotfile.close()
        # Generate the GIF file.
        os.system( 'dot -Tgif -o"%s" "%s"' % (self._filename, tmp_dot) )

        # Delete the temporary dot file
        os.remove(tmp_dot)

#-------------------------------------------------------------------------------
#
# GVPdfGvDoc
#
#-------------------------------------------------------------------------------
cache.py 文件源码 项目:arithmancer 作者: google 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = self.default_timeout
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            f = os.fdopen(fd, 'wb')
            try:
                pickle.dump(int(time() + timeout), f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            finally:
                f.close()
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            pass
util.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def safe_open(path, mode="w", chmod=None, buffering=None):
    """Safely open a file.

    :param str path: Path to a file.
    :param str mode: Same os `mode` for `open`.
    :param int chmod: Same as `mode` for `os.open`, uses Python defaults
        if ``None``.
    :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
        defaults if ``None``.

    """
    # pylint: disable=star-args
    open_args = () if chmod is None else (chmod,)
    fdopen_args = () if buffering is None else (buffering,)
    return os.fdopen(
        os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
        mode, *fdopen_args)
test_vyper.py 文件源码 项目:vyper 作者: admiralobvious 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _init_dirs(self):
        test_dirs = ['a a', 'b', 'c\c', 'D_']
        config = 'improbable'

        root = tempfile.mkdtemp()

        def cleanup():
            try:
                os.removedirs(root)
            except (FileNotFoundError, OSError):
                pass

        os.chdir(root)

        for dir_ in test_dirs:
            os.mkdir(dir_, 0o0750)

            f = '{0}.toml'.format(config)
            flags = os.O_WRONLY | os.O_CREAT
            rel_path = '{0}/{1}'.format(dir_, f)
            abs_file_path = os.path.join(root, rel_path)
            with os.fdopen(os.open(abs_file_path, flags, 0o0640), 'w') as fp:
                fp.write("key = \"value is {0}\"\n".format(dir_))

        return root, config, cleanup
cache.py 文件源码 项目:tesismometro 作者: joapaspe 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = self.default_timeout
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(int(time() + timeout), f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True


问题


面经


文章

微信
公众号

扫码关注公众号