def close(self):
"""Close pipe.
"""
self.first = None
self.last = None
python类close()的实例源码
def __del__(self):
self.close()
def __exit__(self, exc_type, exc_value, trace):
self.close()
return True
def close(self):
"""'close' must be called when done with socket.
"""
self._unregister()
if self._rsock:
self._rsock.close()
self._rsock = None
self._read_fn = self._write_fn = None
self._read_task = self._write_task = None
def terminate(self):
if self.iocp:
self.async_poller.terminate()
self.cmd_rsock.close()
self.cmd_wsock.close()
self.cmd_rsock_buf = None
iocp, self.iocp = self.iocp, None
win32file.CloseHandle(iocp)
self._timeouts = []
self.cmd_rsock = self.cmd_wsock = None
self.__class__._instance = None
def terminate(self):
"""Close pipe and terminate child process.
"""
self.close()
super(Popen, self).terminate()
def close(self):
"""Similar to 'close' of file descriptor.
"""
if self._handle:
try:
flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
except:
flags = 0
if flags & win32con.PIPE_SERVER_END:
win32pipe.DisconnectNamedPipe(self._handle)
# TODO: if pipe, need to call FlushFileBuffers?
def _close_(rc, n):
win32file.CloseHandle(self._handle)
self._overlap = None
if self._notifier:
self._notifier.unregister(self._handle)
self._handle = None
self._read_result = self._write_result = None
self._read_task = self._write_task = None
self._buflist = []
if self._overlap.object:
self._overlap.object = _close_
win32file.CancelIo(self._handle)
else:
_close_(0, 0)
def __exit__(self, exc_type, exc_value, trace):
self.close()
return True
def close(self):
"""Close pipe.
"""
self.first = None
self.last = None
def __del__(self):
self.close()
def __exit__(self, exc_type, exc_value, trace):
self.close()
return True
def __exit__(self, exc_type, exc_value, trace):
self.close()
def terminate(self):
if self.iocp:
self.async_poller.terminate()
self.cmd_rsock.close()
self.cmd_wsock.close()
self.cmd_rsock_buf = None
iocp, self.iocp = self.iocp, None
win32file.CloseHandle(iocp)
self._timeouts = []
self.cmd_rsock = self.cmd_wsock = None
self.__class__._instance = None
def close(self):
if not self._location:
self.unregister()
self._subscribers = set()
self._scheduler._lock.acquire()
self._scheduler._channels.pop(self._name, None)
self._scheduler._lock.release()
def tmpfilepath():
fd, fname = tempfile.mkstemp(prefix='djpt-test-')
os.close(fd)
yield fname
os.remove(fname)
def __init__(self, content):
if not isinstance(content, str):
raise Exception("Note content must be an instance "
"of string, '%s' given." % type(content))
(tempfileHandler, tempfileName) = tempfile.mkstemp(suffix=".markdown")
os.write(tempfileHandler, self.ENMLtoText(content))
os.close(tempfileHandler)
self.content = content
self.tempfile = tempfileName
def close(self):
"""Closes any open connections"""
pass
def close(self):
"""Close the ssh connection"""
ftp = self.client.open_sftp()
ftp.remove(self.script_filename)
ftp.close()
self.client.close()
def upload_file(self, filename, content, mode=None):
ftp = self.client.open_sftp()
file = ftp.file(filename, 'w', -1)
file.write(content)
file.flush()
if mode:
ftp.chmod(self.script_filename, 0o744)
ftp.close()
def get_tmp_script_filename(self):
channel = self.transport.open_session()
channel.get_pty()
out = channel.makefile()
channel.exec_command('mktemp')
tmpfilename = out.readline().strip()
channel.recv_exit_status()
channel.close()
return tmpfilename