python类close()的实例源码

asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def close(self):
        """Close pipe.
        """
        self.first = None
        self.last = None
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __del__(self):
        self.close()
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, trace):
        self.close()
        return True
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self):
        """'close' must be called when done with socket.
        """
        self._unregister()
        if self._rsock:
            self._rsock.close()
            self._rsock = None
        self._read_fn = self._write_fn = None
        self._read_task = self._write_task = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def terminate(self):
                if self.iocp:
                    self.async_poller.terminate()
                    self.cmd_rsock.close()
                    self.cmd_wsock.close()
                    self.cmd_rsock_buf = None
                    iocp, self.iocp = self.iocp, None
                    win32file.CloseHandle(iocp)
                    self._timeouts = []
                    self.cmd_rsock = self.cmd_wsock = None
                    self.__class__._instance = None
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def terminate(self):
            """Close pipe and terminate child process.
            """
            self.close()
            super(Popen, self).terminate()
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def close(self):
            """Similar to 'close' of file descriptor.
            """
            if self._handle:
                try:
                    flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
                except:
                    flags = 0

                if flags & win32con.PIPE_SERVER_END:
                    win32pipe.DisconnectNamedPipe(self._handle)
                # TODO: if pipe, need to call FlushFileBuffers?

                def _close_(rc, n):
                    win32file.CloseHandle(self._handle)
                    self._overlap = None
                    if self._notifier:
                        self._notifier.unregister(self._handle)
                    self._handle = None
                    self._read_result = self._write_result = None
                    self._read_task = self._write_task = None
                    self._buflist = []

                if self._overlap.object:
                    self._overlap.object = _close_
                    win32file.CancelIo(self._handle)
                else:
                    _close_(0, 0)
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, trace):
        self.close()
        return True
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def close(self):
        """Close pipe.
        """
        self.first = None
        self.last = None
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __del__(self):
        self.close()
asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, trace):
        self.close()
        return True
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, trace):
        self.close()
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def terminate(self):
                if self.iocp:
                    self.async_poller.terminate()
                    self.cmd_rsock.close()
                    self.cmd_wsock.close()
                    self.cmd_rsock_buf = None
                    iocp, self.iocp = self.iocp, None
                    win32file.CloseHandle(iocp)
                    self._timeouts = []
                    self.cmd_rsock = self.cmd_wsock = None
                    self.__class__._instance = None
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def close(self):
        if not self._location:
            self.unregister()
            self._subscribers = set()
            self._scheduler._lock.acquire()
            self._scheduler._channels.pop(self._name, None)
            self._scheduler._lock.release()
conftest.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def tmpfilepath():
    fd, fname = tempfile.mkstemp(prefix='djpt-test-')
    os.close(fd)
    yield fname
    os.remove(fname)
editor.py 文件源码 项目:txt2evernote 作者: Xunius 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, content):
        if not isinstance(content, str):
            raise Exception("Note content must be an instance "
                            "of string, '%s' given." % type(content))

        (tempfileHandler, tempfileName) = tempfile.mkstemp(suffix=".markdown")
        os.write(tempfileHandler, self.ENMLtoText(content))
        os.close(tempfileHandler)

        self.content = content
        self.tempfile = tempfileName
executors.py 文件源码 项目:spoonybard 作者: notnownikki 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def close(self):
        """Closes any open connections"""
        pass
executors.py 文件源码 项目:spoonybard 作者: notnownikki 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self):
        """Close the ssh connection"""
        ftp = self.client.open_sftp()
        ftp.remove(self.script_filename)
        ftp.close()
        self.client.close()
executors.py 文件源码 项目:spoonybard 作者: notnownikki 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def upload_file(self, filename, content, mode=None):
        ftp = self.client.open_sftp()
        file = ftp.file(filename, 'w', -1)
        file.write(content)
        file.flush()
        if mode:
            ftp.chmod(self.script_filename, 0o744)
        ftp.close()
executors.py 文件源码 项目:spoonybard 作者: notnownikki 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_tmp_script_filename(self):
        channel = self.transport.open_session()
        channel.get_pty()
        out = channel.makefile()
        channel.exec_command('mktemp')
        tmpfilename = out.readline().strip()
        channel.recv_exit_status()
        channel.close()
        return tmpfilename


问题


面经


文章

微信
公众号

扫码关注公众号