def _processStreams(self):
if len(self._closedfds) == 3:
return
if not self._streamLock.acquire(False):
self._streamLock.acquire()
self._streamLock.release()
return
try:
if self._stdin.len > 0 and self._stdin.pos == 0:
# Polling stdin is redundant if there is nothing to write
# turn on only if data is waiting to be pushed
self._poller.modify(self._fdin, select.EPOLLOUT)
pollres = NoIntrPoll(self._poller.poll, 1)
for fd, event in pollres:
stream = self._fdMap[fd]
if event & select.EPOLLOUT and self._stdin.len > 0:
buff = self._stdin.read(BUFFSIZE)
written = os.write(fd, buff)
stream.pos -= len(buff) - written
if stream.pos == stream.len:
stream.truncate(0)
self._poller.modify(fd, 0)
elif event & (select.EPOLLIN | select.EPOLLPRI):
data = os.read(fd, BUFFSIZE)
oldpos = stream.pos
stream.pos = stream.len
stream.write(data)
stream.pos = oldpos
elif event & (select.EPOLLHUP | select.EPOLLERR):
self._poller.unregister(fd)
self._closedfds.append(fd)
# I don't close the fd because the original Popen
# will do it.
if self.stdin.closed and self._fdin not in self._closedfds:
self._poller.unregister(self._fdin)
self._closedfds.append(self._fdin)
self._proc.stdin.close()
finally:
self._streamLock.release()
评论列表
文章目录