python类close()的实例源码

forking.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main():
        '''
        Run code specifed by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
Image.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def _dump(self, file=None, format=None):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format)
        return file
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def close(self):
            """It is advised to call 'close' on the pipe so both
            handles of pipe are closed.
            """
            if isinstance(self.stdin, AsyncFile):
                self.stdin.close()
                self.stdin = None
            if self.stdin_rh:
                win32pipe.DisconnectNamedPipe(self.stdin_rh)
                win32file.CloseHandle(self.stdin_rh)
                self.stdin_rh = None
            if isinstance(self.stdout, AsyncFile):
                self.stdout.close()
                self.stdout = None
            if isinstance(self.stderr, AsyncFile):
                self.stderr.close()
                self.stderr = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _socketpair():
                if hasattr(socket, 'socketpair'):
                    return socket.socketpair()
                srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                srv_sock.bind(('127.0.0.1', 0))
                srv_sock.listen(1)
                write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    write_sock.setblocking(False)
                    try:
                        write_sock.connect(srv_sock.getsockname()[:2])
                    except socket.error as e:
                        if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
                            pass
                        else:
                            raise
                    write_sock.setblocking(True)
                    read_sock = srv_sock.accept()[0]
                except:
                    write_sock.close()
                    raise
                finally:
                    srv_sock.close()
                return (read_sock, write_sock)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def terminate(self):
            if hasattr(self.cmd_write, 'getsockname'):
                self.cmd_write.close()
            self.cmd_read.close()
            for fd in self._fds.itervalues():
                try:
                    self._poller.unregister(fd._fileno)
                except:
                    logger.warning('unregister of %s failed with %s',
                                   fd._fileno, traceback.format_exc())
                fd._notifier = None
            self._fds.clear()
            self._timeouts = []
            if hasattr(self._poller, 'terminate'):
                self._poller.terminate()
            self._poller = None
            self.cmd_read = self.cmd_write = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _terminate_task(self, task):
        """Internal use only.
        """
        self._lock.acquire()
        tid = task._id
        task = self._tasks.get(tid, None)
        if task is None:
            logger.warning('invalid task %s to terminate', tid)
            self._lock.release()
            return -1
        # TODO: if currently waiting I/O or holding locks, warn?
        if task._state == Pycos._Running:
            logger.warning('task to terminate %s/%s is running', task._name, tid)
        else:
            self._suspended.discard(tid)
            self._scheduled.add(tid)
            task._state = Pycos._Scheduled
            task._timeout = None
            task._callers = []
            if self._polling and len(self._scheduled) == 1:
                self._notifier.interrupt()
        task._exceptions.append((GeneratorExit, GeneratorExit('close')))
        self._lock.release()
        return 0
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def close(self):
            """It is advised to call 'close' on the pipe so both
            handles of pipe are closed.
            """
            if isinstance(self.stdin, AsyncFile):
                self.stdin.close()
                self.stdin = None
            if self.stdin_rh:
                win32pipe.DisconnectNamedPipe(self.stdin_rh)
                win32file.CloseHandle(self.stdin_rh)
                self.stdin_rh = None
            if isinstance(self.stdout, AsyncFile):
                self.stdout.close()
                self.stdout = None
            if isinstance(self.stderr, AsyncFile):
                self.stderr.close()
                self.stderr = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def _socketpair():
                if hasattr(socket, 'socketpair'):
                    return socket.socketpair()
                srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                srv_sock.bind(('127.0.0.1', 0))
                srv_sock.listen(1)
                write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    write_sock.setblocking(False)
                    try:
                        write_sock.connect(srv_sock.getsockname()[:2])
                    except socket.error as e:
                        if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
                            pass
                        else:
                            raise
                    write_sock.setblocking(True)
                    read_sock = srv_sock.accept()[0]
                except:
                    write_sock.close()
                    raise
                finally:
                    srv_sock.close()
                return (read_sock, write_sock)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def terminate(self):
            if hasattr(self.cmd_write, 'getsockname'):
                self.cmd_write.close()
            self.cmd_read.close()
            for fd in self._fds.values():
                try:
                    self._poller.unregister(fd._fileno)
                except:
                    logger.warning('unregister of %s failed with %s',
                                   fd._fileno, traceback.format_exc())
                fd._notifier = None
            self._fds.clear()
            self._timeouts = []
            if hasattr(self._poller, 'terminate'):
                self._poller.terminate()
            self._poller = None
            self.cmd_read = self.cmd_write = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _terminate_task(self, task):
        """Internal use only.
        """
        self._lock.acquire()
        tid = task._id
        task = self._tasks.get(tid, None)
        if task is None:
            logger.warning('invalid task %s to terminate', tid)
            self._lock.release()
            return -1
        # TODO: if currently waiting I/O or holding locks, warn?
        if task._state == Pycos._Running:
            logger.warning('task to terminate %s/%s is running', task._name, tid)
        else:
            self._suspended.discard(tid)
            self._scheduled.add(tid)
            task._state = Pycos._Scheduled
            task._timeout = None
            task._callers = []
            if self._polling and len(self._scheduled) == 1:
                self._notifier.interrupt()
        task._exceptions.append((GeneratorExit, GeneratorExit('close')))
        self._lock.release()
        return 0
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 61 收藏 0 点赞 0 评论 0
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        if self.mode in "aw":
            self.fileobj.write(NUL * (BLOCKSIZE * 2))
            self.offset += (BLOCKSIZE * 2)
            # fill up the end with zero-blocks
            # (like option -b20 for tar does)
            blocks, remainder = divmod(self.offset, RECORDSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (RECORDSIZE - remainder))

        if not self._extfileobj:
            self.fileobj.close()
        self.closed = True
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def close(self):
        """Close the _Stream object. No operation should be
           done on it afterwards.
        """
        if self.closed:
            return

        self.closed = True
        try:
            if self.mode == "w" and self.comptype != "tar":
                self.buf += self.cmp.flush()

            if self.mode == "w" and self.buf:
                self.fileobj.write(self.buf)
                self.buf = b""
                if self.comptype == "gz":
                    self.fileobj.write(struct.pack("<L", self.crc))
                    self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF))
        finally:
            if not self._extfileobj:
                self.fileobj.close()
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        self.closed = True
        try:
            if self.mode in ("a", "w", "x"):
                self.fileobj.write(NUL * (BLOCKSIZE * 2))
                self.offset += (BLOCKSIZE * 2)
                # fill up the end with zero-blocks
                # (like option -b20 for tar does)
                blocks, remainder = divmod(self.offset, RECORDSIZE)
                if remainder > 0:
                    self.fileobj.write(NUL * (RECORDSIZE - remainder))
        finally:
            if not self._extfileobj:
                self.fileobj.close()
app_test.py 文件源码 项目:service-fabric-cli 作者: Azure 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
tarfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        if self.mode in "aw":
            self.fileobj.write(NUL * (BLOCKSIZE * 2))
            self.offset += (BLOCKSIZE * 2)
            # fill up the end with zero-blocks
            # (like option -b20 for tar does)
            blocks, remainder = divmod(self.offset, RECORDSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (RECORDSIZE - remainder))

        if not self._extfileobj:
            self.fileobj.close()
        self.closed = True
tarfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        if self.mode in "aw":
            self.fileobj.write(NUL * (BLOCKSIZE * 2))
            self.offset += (BLOCKSIZE * 2)
            # fill up the end with zero-blocks
            # (like option -b20 for tar does)
            blocks, remainder = divmod(self.offset, RECORDSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (RECORDSIZE - remainder))

        if not self._extfileobj:
            self.fileobj.close()
        self.closed = True
popen2.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
pydoc.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def importfile(path):
    """Import a Python source file or compiled file given its path."""
    magic = imp.get_magic()
    file = open(path, 'r')
    if file.read(len(magic)) == magic:
        kind = imp.PY_COMPILED
    else:
        kind = imp.PY_SOURCE
    file.close()
    filename = os.path.basename(path)
    name, ext = os.path.splitext(filename)
    file = open(path, 'r')
    try:
        module = imp.load_module(name, file, path, (ext, 'r', kind))
    except:
        raise ErrorDuringImport(path, sys.exc_info())
    file.close()
    return module
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        subpath = self._lookup(key)
        f = open(os.path.join(self._path, subpath), 'r')
        try:
            if self._factory:
                msg = self._factory(f)
            else:
                msg = MaildirMessage(f)
        finally:
            f.close()
        subdir, name = os.path.split(subpath)
        msg.set_subdir(subdir)
        if self.colon in name:
            msg.set_info(name.split(self.colon)[-1])
        msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
        return msg
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        try:
            if self._locked:
                f = open(os.path.join(self._path, str(key)), 'r+')
            else:
                f = open(os.path.join(self._path, str(key)), 'r')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                return f.read()
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            f.close()
util.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _get_soname(f):
            # assuming GNU binutils / ELF
            if not f:
                return None
            cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
                  "objdump -p -j .dynamic 2>/dev/null " + f
            f = os.popen(cmd)
            dump = f.read()
            rv = f.close()
            if rv == 10:
                raise OSError, 'objdump command not found'
            f = os.popen(cmd)
            try:
                data = f.read()
            finally:
                f.close()
            res = re.search(r'\sSONAME\s+([^\s]+)', data)
            if not res:
                return None
            return res.group(1)
util.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _findLib_ldconfig(name):
            # XXX assuming GLIBC's ldconfig (with option -p)
            expr = r'/[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
            f = os.popen('LC_ALL=C LANG=C /sbin/ldconfig -p 2>/dev/null')
            try:
                data = f.read()
            finally:
                f.close()
            res = re.search(expr, data)
            if not res:
                # Hm, this works only for libs needed by the python executable.
                cmd = 'ldd %s 2>/dev/null' % sys.executable
                f = os.popen(cmd)
                try:
                    data = f.read()
                finally:
                    f.close()
                res = re.search(expr, data)
                if not res:
                    return None
            return res.group(0)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def run_with_args(args, parser):
    # type: (argparse.Namespace, argparse.ArgumentParser) -> int
    set_logging_parameters(args, parser)
    start_time = time.time()
    ret = OK
    try:
        if args.profile:
            outline("Profiling...")
            profile("ret = whatstyle(args, parser)", locals(), globals())
        else:
            ret = whatstyle(args, parser)
    except IOError as exc:
        # If the output is piped into a pager like 'less' we get a broken pipe when
        # the pager is quit early and that is ok.
        if exc.errno == errno.EPIPE:
            pass
        elif str(exc) == 'Stream closed':
            pass
        else:
            raise
        if not PY2:
            sys.stderr.close()
    iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
    return ret
utils.py 文件源码 项目:pocket-cli 作者: rakanalh 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_terminal_size():
        def ioctl_GWINSZ(fd):
            try:
                import fcntl
                import termios
                import struct
                cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
                                                     '1234'))
            except:
                return None
            return cr
        cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
        if not cr:
            try:
                fd = os.open(os.ctermid(), os.O_RDONLY)
                cr = ioctl_GWINSZ(fd)
                os.close(fd)
            except:
                pass
        if not cr:
            try:
                cr = (os.env['LINES'], os.env['COLUMNS'])
            except:
                cr = (25, 80)
        return int(cr[1]), int(cr[0])
ImageGrab.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def grab(bbox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp('.png')
        os.close(f)
        subprocess.call(['screencapture', '-x', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
Image.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def close(self):
        """
        Closes the file pointer, if possible.

        This operation will destroy the image core and release its memory.
        The image data will be unusable afterward.

        This function is only required to close images that have not
        had their file read and closed by the
        :py:meth:`~PIL.Image.Image.load` method.
        """
        try:
            self.fp.close()
        except Exception as msg:
            logger.debug("Error closing: %s", msg)

        # Instead of simply setting to None, we're setting up a
        # deferred error that will better explain that the core image
        # object is gone.
        self.im = deferred_error(ValueError("Operation on closed image"))
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def terminate(self):
            """Close pipe and terminate child process.
            """
            self.close()
            super(Popen, self).terminate()
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def close(self):
            """Similar to 'close' of file descriptor.
            """
            if self._handle:
                try:
                    flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
                except:
                    flags = 0

                if flags & win32con.PIPE_SERVER_END:
                    win32pipe.DisconnectNamedPipe(self._handle)
                # TODO: if pipe, need to call FlushFileBuffers?

                def _close_(rc, n):
                    win32file.CloseHandle(self._handle)
                    self._overlap = None
                    if self._notifier:
                        self._notifier.unregister(self._handle)
                    self._handle = None
                    self._read_result = self._write_result = None
                    self._read_task = self._write_task = None
                    self._buflist = []

                if self._overlap.object:
                    self._overlap.object = _close_
                    win32file.CancelIo(self._handle)
                else:
                    _close_(0, 0)
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def close(self):
            """Close file descriptor.
            """
            if self._fileno:
                self._notifier.unregister(self)
                if self._fd:
                    self._fd.close()
                self._fd = self._fileno = None
                self._read_task = self._write_task = None
                self._read_fn = self._write_fn = None
                self._buflist = []


问题


面经


文章

微信
公众号

扫码关注公众号