def run_bg(cmd, debug=False, cwd=''):
# make sure 'cmd' is a list of strings
if type(cmd) in [str, unicode]:
cmd = [c for c in cmd.split() if c != '']
if debug:
sys.stderr.write(' '.join(cmd)+'\n')
sys.stderr.flush()
try:
( child_pid, child_fd ) = pty.fork()
except OSError as e:
raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
if child_pid == 0:
try:
if cwd != '':
os.chdir(cwd)
os.execvp(cmd[0], cmd)
except Exception, e:
raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
else:
return child_pid, child_fd
python类fork()的实例源码
def run_bg(cmd, debug=False, cwd=''):
# make sure 'cmd' is a list of strings
if type(cmd) in [str, unicode]:
cmd = [c for c in cmd.split() if c != '']
if debug:
sys.stderr.write(' '.join(cmd)+'\n')
sys.stderr.flush()
try:
( child_pid, child_fd ) = pty.fork()
except OSError as e:
raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
if child_pid == 0:
try:
if cwd != '':
os.chdir(cwd)
os.execvp(cmd[0], cmd)
except Exception, e:
raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
else:
return child_pid, child_fd
def open_terminal(command="bash", columns=80, lines=24):
p_pid, master_fd = pty.fork()
if p_pid == 0: # Child.
path, *args = shlex.split(command)
args = [path] + args
env = dict(TERM="linux", LC_ALL="en_GB.UTF-8",
COLUMNS=str(columns), LINES=str(lines))
try:
os.execvpe(path, args, env)
except FileNotFoundError:
print("Could not find the executable in %s. Press any key to exit." % path)
exit()
# set non blocking read
flag = fcntl.fcntl(master_fd, fcntl.F_GETFD)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
# File-like object for I/O with the child process aka command.
p_out = os.fdopen(master_fd, "w+b", 0)
return Terminal(columns, lines), p_pid, p_out
def spawn(self, argv):
'''
Create a spawned process.
Based on the code for pty.spawn().
'''
assert self.master_fd is None
assert isinstance(argv, list)
pid, master_fd = pty.fork()
self.master_fd = master_fd
if pid == pty.CHILD:
os.execlp(argv[0], *argv) # and not ever returned
self._init()
try:
self._copy() # start communication
except Exception:
# unexpected errors
self._del()
raise
self._del()
return self.log.decode()
def main ():
signal.signal (signal.SIGCHLD, signal_handler)
pid, fd = pty.fork()
if pid == 0:
os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
time.sleep(10000)
nonblock (fd)
tty.setraw(fd) #STDIN_FILENO)
print 'Sending SIGKILL to child pid:', pid
time.sleep(2)
os.kill (pid, signal.SIGKILL)
print 'Entering to sleep...'
try:
time.sleep(2)
except:
print 'Sleep interrupted'
try:
os.kill(pid, 0)
print '\tChild is alive. This is ambiguous because it may be a Zombie.'
except OSError as e:
print '\tChild appears to be dead.'
# print str(e)
print
print 'Reading from master fd:', os.read (fd, 1000)
def start_new_process(args, nice_value=0):
"start a new process in a pty and renice it"
data = {}
data['start_time'] = time.time()
pid, master_fd = pty.fork()
if pid == CHILD:
default_signals()
if nice_value:
os.nice(nice_value)
os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args])
else:
data['pid'] = pid
if os.uname()[0] == "Linux":
fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK)
data['fd'] = master_fd
data['file'] = os.fdopen(master_fd)
data['cmd'] = args
data['buf'] = ""
data['otf'] = 0
data['percent'] = 0
data['elapsed'] = 0
return data
def spawn(self):
env = self.env
env['TERM'] = 'linux'
self.pid, self.master = pty.fork()
if self.pid == 0:
if callable(self.command):
try:
try:
self.command()
except:
sys.stderr.write(traceback.format_exc())
sys.stderr.flush()
finally:
os._exit(0)
else:
os.execvpe(self.command[0], self.command, env)
if self.main_loop is None:
fcntl.fcntl(self.master, fcntl.F_SETFL, os.O_NONBLOCK)
atexit.register(self.terminate)
def main ():
signal.signal (signal.SIGCHLD, signal_handler)
pid, fd = pty.fork()
if pid == 0:
os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
time.sleep(10000)
nonblock (fd)
tty.setraw(fd) #STDIN_FILENO)
print 'Sending SIGKILL to child pid:', pid
time.sleep(2)
os.kill (pid, signal.SIGKILL)
print 'Entering to sleep...'
try:
time.sleep(2)
except:
print 'Sleep interrupted'
try:
os.kill(pid, 0)
print '\tChild is alive. This is ambiguous because it may be a Zombie.'
except OSError as e:
print '\tChild appears to be dead.'
# print str(e)
print
print 'Reading from master fd:', os.read (fd, 1000)
def __init__(self, cmd, cwd):
self._cmd_return_code = 0
self._cmd_kill_signal = 0
self._shell_pid, self._master_fd = pty.fork()
if self._shell_pid == pty.CHILD:
os.environ["TERM"] = "linux"
os.chdir(cwd)
os.execv(cmd[0], cmd)
def restart(self, cmd):
if not self.completed:
self.process_input("\n\033[01;31mProcess killed.\033[00m\r\n")
os.kill(self.pid, signal.SIGHUP)
thread = self.thread
thread.stop()
while thread.alive:
QCoreApplication.processEvents()
self.exit_pipe, child_pipe = os.pipe()
pid, fd = pty.fork()
if pid == 0:
try:
os.environ["TERM"] = "xterm-256color"
retval = subprocess.call(cmd, close_fds=True)
os.write(2, "\033[01;34mProcess has completed.\033[00m\n")
os.write(child_pipe, "t")
os._exit(retval)
except:
pass
os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n")
os.write(child_pipe, "f")
os._exit(1)
os.close(child_pipe)
self.process_input("\033[01;34mStarted process with PID %d.\033[00m\r\n" % pid)
self.pid = pid
self.data_pipe = fd
self.completed = False
# Initialize terminal settings
fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0))
attribute = termios.tcgetattr(self.data_pipe)
termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute)
self.thread = TerminalUpdateThread(self, self.data_pipe, self.exit_pipe)
self.thread.start()
def spawn(self, argv=None):
'''
Create a spawned process.
Based on the code for pty.spawn().
'''
assert self.master_fd is None
pid, master_fd = pty.fork()
self.master_fd = master_fd
self.pid = pid
if pid == pty.CHILD:
if self.user is not None:
try:
uid = pwd.getpwnam(self.user).pw_uid
except KeyError:
log.error("No such user: %s", self.user)
else:
os.setuid(uid)
log.debug('switched user for remote process to %s(%s)', self.user, uid)
if self.group is not None:
try:
gid = grp.getgrnam(self.group).gr_gid
except KeyError:
log.error("No such group: %s", self.group)
else:
os.setgid(gid)
log.debug('switched group for remote process to %s(%s)', self.group, gid)
if not argv:
argv = [os.environ['SHELL']]
os.execlp(argv[0], *argv)
# Previous command replaces the process
return
self._init_fd()
try:
self._copy()
except (IOError, OSError):
pass
os.close(master_fd)
self.master_fd = None
def __fork_pty(self):
'''This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
this should work on Solaris.
Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
resolve the issue with Python's pty.fork() not supporting Solaris,
particularly ssh. Based on patch to posixmodule.c authored by Noah
Spurrier::
http://mail.python.org/pipermail/python-dev/2003-May/035281.html
'''
parent_fd, child_fd = os.openpty()
if parent_fd < 0 or child_fd < 0:
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
else:
# Parent.
os.close(child_fd)
return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def sig_test (sig_handler_type, fork_type, child_output):
print 'Testing with:'
print '\tsig_handler_type:', sig_handler_type
print '\tfork_type:', fork_type
print '\tchild_output:', child_output
if sig_handler_type == 'SIG_IGN':
signal.signal (signal.SIGCHLD, signal.SIG_IGN)
else:
signal.signal (signal.SIGCHLD, signal_handler)
pid = -1
fd = -1
if fork_type == 'ptyfork':
pid, fd = pty.fork()
else:
pid = os.fork()
if pid == 0:
if child_output == 'yes':
os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
time.sleep(10000)
#print 'Sending SIGKILL to child pid:', pid
time.sleep(2)
os.kill (pid, signal.SIGKILL)
#print 'Entering to sleep...'
try:
time.sleep(2)
except:
pass
try:
os.kill(pid, 0)
print '\tChild is alive. This is ambiguous because it may be a Zombie.'
except OSError as e:
print '\tChild appears to be dead.'
# print str(e)
print
def __fork_pty(self):
'''This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
this should work on Solaris.
Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
resolve the issue with Python's pty.fork() not supporting Solaris,
particularly ssh. Based on patch to posixmodule.c authored by Noah
Spurrier::
http://mail.python.org/pipermail/python-dev/2003-May/035281.html
'''
parent_fd, child_fd = os.openpty()
if parent_fd < 0 or child_fd < 0:
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
else:
# Parent.
os.close(child_fd)
return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def _create(self, rows=24, cols=80):
pid, fd = pty.fork()
if pid == 0:
if os.getuid() == 0:
cmd = ['/bin/login']
else:
# The prompt has to end with a newline character.
sys.stdout.write(socket.gethostname() + ' login: \n')
login = sys.stdin.readline().strip()
cmd = [
'ssh',
'-oPreferredAuthentications=keyboard-interactive,password',
'-oNoHostAuthenticationForLocalhost=yes',
'-oLogLevel=FATAL',
'-F/dev/null',
'-l', login, 'localhost',
]
env = {
'COLUMNS': str(cols),
'LINES': str(rows),
'PATH': os.environ['PATH'],
'TERM': 'linux',
}
os.execvpe(cmd[0], cmd, env)
else:
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.ioctl(fd, termios.TIOCSWINSZ,
struct.pack('HHHH', rows, cols, 0, 0))
TermSocketHandler.clients[fd] = {
'client': self,
'pid': pid,
'terminal': Terminal(rows, cols)
}
return fd
def __fork_pty(self):
'''This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
this should work on Solaris.
Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
resolve the issue with Python's pty.fork() not supporting Solaris,
particularly ssh. Based on patch to posixmodule.c authored by Noah
Spurrier::
http://mail.python.org/pipermail/python-dev/2003-May/035281.html
'''
parent_fd, child_fd = os.openpty()
if parent_fd < 0 or child_fd < 0:
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
else:
# Parent.
os.close(child_fd)
return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
os.close(fd)
def sig_test (sig_handler_type, fork_type, child_output):
print 'Testing with:'
print '\tsig_handler_type:', sig_handler_type
print '\tfork_type:', fork_type
print '\tchild_output:', child_output
if sig_handler_type == 'SIG_IGN':
signal.signal (signal.SIGCHLD, signal.SIG_IGN)
else:
signal.signal (signal.SIGCHLD, signal_handler)
pid = -1
fd = -1
if fork_type == 'ptyfork':
pid, fd = pty.fork()
else:
pid = os.fork()
if pid == 0:
if child_output == 'yes':
os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
time.sleep(10000)
#print 'Sending SIGKILL to child pid:', pid
time.sleep(2)
os.kill (pid, signal.SIGKILL)
#print 'Entering to sleep...'
try:
time.sleep(2)
except:
pass
try:
os.kill(pid, 0)
print '\tChild is alive. This is ambiguous because it may be a Zombie.'
except OSError as e:
print '\tChild appears to be dead.'
# print str(e)
print
def __fork_pty(self):
"""This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
this should work on Solaris.
Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
resolve the issue with Python's pty.fork() not supporting Solaris,
particularly ssh. Based on patch to posixmodule.c authored by Noah
Spurrier::
http://mail.python.org/pipermail/python-dev/2003-May/035281.html
"""
parent_fd, child_fd = os.openpty()
if parent_fd < 0 or child_fd < 0:
raise ExceptionPexpect, "Error! Could not open pty with os.openpty()."
pid = os.fork()
if pid < 0:
raise ExceptionPexpect, "Error! Failed os.fork()."
elif pid == 0:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, 0)
os.dup2(child_fd, 1)
os.dup2(child_fd, 2)
if child_fd > 2:
os.close(child_fd)
else:
# Parent.
os.close(child_fd)
return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd):
"""This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. """
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty if still connected.
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
os.setsid()
# Verify we are disconnected from controlling tty
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
raise ExceptionPexpect, "Error! We are not disconnected from a controlling tty."
except:
# Good! We are disconnected from a controlling tty.
pass
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
if fd < 0:
raise ExceptionPexpect, "Error! Could not open child pty, " + child_name
else:
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
if fd < 0:
raise ExceptionPexpect, "Error! Could not open controlling tty, /dev/tty"
else:
os.close(fd)
def commit(self):
# ui
self.status.setText(_("Installing '%s'...") % os.path.basename(self.debfile))
# the command
cmd = "/usr/bin/dpkg"
argv = [cmd, "--auto-deconfigure", "-i", self.debfile]
(self.child_pid, self.master_fd) = pty.fork()
if self.child_pid == 0:
os.environ["TERM"] = "dumb"
if not "DEBIAN_FRONTEND" in os.environ:
os.environ["DEBIAN_FRONTEND"] = "noninteractive"
os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
exitstatus = subprocess.call(argv)
os._exit(exitstatus)
while True:
#Read from pty and write to DumbTerminal
try:
(rlist, wlist, xlist) = select.select([self.master_fd],[],[], 0.001)
if len(rlist) > 0:
line = os.read(self.master_fd, 255)
self.parent.konsole.insertWithTermCodes(utf8(line))
except Exception as e:
#print e
from errno import EAGAIN
if hasattr(e, "errno") and e.errno == EAGAIN:
continue
break
KApplication.kApplication().processEvents()
# at this point we got a read error from the pty, that most
# likely means that the client is dead
(pid, status) = os.waitpid(self.child_pid, 0)
self.exitstatus = os.WEXITSTATUS(status)
self.progress.setValue(100)
self.parent.closeButton.setEnabled(True)
self.parent.closeButton.setVisible(True)
self.parent.installationProgress.setVisible(False)
QTimer.singleShot(1, self.parent.changeSize)
def fork(self):
"""pty voodoo"""
(self.child_pid, self.master_fd) = pty.fork()
if self.child_pid == 0:
os.environ["TERM"] = "dumb"
if not "DEBIAN_FRONTEND" in os.environ:
os.environ["DEBIAN_FRONTEND"] = "noninteractive"
os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
return self.child_pid
def __init__(self, cmd, raw_debug = None):
if os.name == "nt":
raise RuntimeError, "Windows does not support pseudo-terminal devices"
if cmd:
# Spawn the new process in a new PTY
self.exit_pipe, child_pipe = os.pipe()
self.exit_callback = None
pid, fd = pty.fork()
if pid == 0:
try:
os.environ["TERM"] = "xterm-256color"
if "PYTHONPATH" in os.environ:
del(os.environ["PYTHONPATH"])
if "LD_LIBRARY_PATH" in os.environ:
del(os.environ["LD_LIBRARY_PATH"])
if sys.platform == "darwin":
if "DYLD_LIBRARY_PATH" in os.environ:
del(os.environ["DYLD_LIBRARY_PATH"])
retval = subprocess.call(cmd, close_fds=True)
os.write(2, "\033[01;34mProcess has completed.\033[00m\n")
os.write(child_pipe, "t")
os._exit(retval)
except:
pass
os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n")
os.write(child_pipe, "f")
os._exit(1)
os.close(child_pipe)
self.pid = pid
self.data_pipe = fd
else:
self.exit_pipe = -1
self.pid = -1
self.data_pipe = -1
self.rows = 25
self.cols = 80
self.term = TerminalEmulator(self.rows, self.cols)
self.raw_debug = raw_debug
self.completed = False
self.term.response_callback = self.send_input
if cmd:
# Initialize terminal settings
fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0))
attribute = termios.tcgetattr(self.data_pipe)
termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute)
else:
self.process_input("\033[01;33mEnter desired command arguments, then run again.\033[00m\r\n")
self.completed = True
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
if not sys.stdin.isatty() or not sys.stdout.isatty():
self.skipTest("stdin and stdout must be ttys")
r, w = os.pipe()
try:
pid, fd = pty.fork()
except (OSError, AttributeError) as e:
os.close(r)
os.close(w)
self.skipTest("pty.fork() raised {}".format(e))
if pid == 0:
# Child
try:
# Make sure we don't get stuck if there's a problem
signal.alarm(2)
os.close(r)
# Check the error handlers are accounted for
if stdio_encoding:
sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
encoding=stdio_encoding,
errors='surrogateescape')
sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
encoding=stdio_encoding,
errors='replace')
with open(w, "w") as wpipe:
print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
print(ascii(input(prompt)), file=wpipe)
except:
traceback.print_exc()
finally:
# We don't want to return to unittest...
os._exit(0)
# Parent
os.close(w)
os.write(fd, terminal_input + b"\r\n")
# Get results from the pipe
with open(r, "r") as rpipe:
lines = []
while True:
line = rpipe.readline().strip()
if line == "":
# The other end was closed => the child exited
break
lines.append(line)
# Check the result was got and corresponds to the user's terminal input
if len(lines) != 2:
# Something went wrong, try to get at stderr
with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
% (len(lines), child_output.read()))
os.close(fd)
# Check we did exercise the GNU readline path
self.assertIn(lines[0], {'tty = True', 'tty = False'})
if lines[0] != 'tty = True':
self.skipTest("standard IO in should have been a tty")
input_result = eval(lines[1]) # ascii() -> eval() roundtrip
if stdio_encoding:
expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
else:
expected = terminal_input.decode(sys.stdin.encoding) # what else?
self.assertEqual(input_result, expected)
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
if not sys.stdin.isatty() or not sys.stdout.isatty():
self.skipTest("stdin and stdout must be ttys")
r, w = os.pipe()
try:
pid, fd = pty.fork()
except (OSError, AttributeError) as e:
os.close(r)
os.close(w)
self.skipTest("pty.fork() raised {0}".format(e))
if pid == 0:
# Child
try:
# Make sure we don't get stuck if there's a problem
signal.alarm(2)
os.close(r)
# Check the error handlers are accounted for
if stdio_encoding:
sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
encoding=stdio_encoding,
errors='surrogateescape')
sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
encoding=stdio_encoding,
errors='replace')
with open(w, "w") as wpipe:
print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
print(ascii(input(prompt)), file=wpipe)
except:
traceback.print_exc()
finally:
# We don't want to return to unittest...
os._exit(0)
# Parent
os.close(w)
os.write(fd, terminal_input + b"\r\n")
# Get results from the pipe
with open(r, "r") as rpipe:
lines = []
while True:
line = rpipe.readline().strip()
if line == "":
# The other end was closed => the child exited
break
lines.append(line)
# Check the result was got and corresponds to the user's terminal input
if len(lines) != 2:
# Something went wrong, try to get at stderr
with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
% (len(lines), child_output.read()))
os.close(fd)
# Check we did exercise the GNU readline path
self.assertIn(lines[0], set(['tty = True', 'tty = False']))
if lines[0] != 'tty = True':
self.skipTest("standard IO in should have been a tty")
input_result = eval(lines[1]) # ascii() -> eval() roundtrip
if stdio_encoding:
expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
else:
expected = terminal_input.decode(sys.stdin.encoding) # what else?
self.assertEqual(input_result, expected)
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty. Harmless if not already connected.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
# which exception, shouldnt' we catch explicitly .. ?
except OSError:
# Already disconnected. This happens if running inside cron.
pass
os.setsid()
# Verify we are disconnected from controlling tty
# by attempting to open it again.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
raise Exception('Failed to disconnect from ' +
'controlling tty. It is still possible to open /dev/tty.')
# which exception, shouldnt' we catch explicitly .. ?
except OSError:
# Good! We are disconnected from a controlling tty.
pass
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
if fd < 0:
raise Exception("Could not open child pty, " + child_name)
else:
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
if fd < 0:
raise Exception("Could not open controlling tty, /dev/tty")
else:
os.close(fd)
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
if not sys.stdin.isatty() or not sys.stdout.isatty():
self.skipTest("stdin and stdout must be ttys")
r, w = os.pipe()
try:
pid, fd = pty.fork()
except (OSError, AttributeError) as e:
os.close(r)
os.close(w)
self.skipTest("pty.fork() raised {}".format(e))
if pid == 0:
# Child
try:
# Make sure we don't get stuck if there's a problem
signal.alarm(2)
os.close(r)
# Check the error handlers are accounted for
if stdio_encoding:
sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
encoding=stdio_encoding,
errors='surrogateescape')
sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
encoding=stdio_encoding,
errors='replace')
with open(w, "w") as wpipe:
print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
print(ascii(input(prompt)), file=wpipe)
except:
traceback.print_exc()
finally:
# We don't want to return to unittest...
os._exit(0)
# Parent
os.close(w)
os.write(fd, terminal_input + b"\r\n")
# Get results from the pipe
with open(r, "r") as rpipe:
lines = []
while True:
line = rpipe.readline().strip()
if line == "":
# The other end was closed => the child exited
break
lines.append(line)
# Check the result was got and corresponds to the user's terminal input
if len(lines) != 2:
# Something went wrong, try to get at stderr
with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
% (len(lines), child_output.read()))
os.close(fd)
# Check we did exercise the GNU readline path
self.assertIn(lines[0], {'tty = True', 'tty = False'})
if lines[0] != 'tty = True':
self.skipTest("standard IO in should have been a tty")
input_result = eval(lines[1]) # ascii() -> eval() roundtrip
if stdio_encoding:
expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
else:
expected = terminal_input.decode(sys.stdin.encoding) # what else?
self.assertEqual(input_result, expected)
def run_child(self, child, terminal_input):
r, w = os.pipe() # Pipe test results from child back to parent
try:
pid, fd = pty.fork()
except (OSError, AttributeError) as e:
os.close(r)
os.close(w)
self.skipTest("pty.fork() raised {}".format(e))
raise
if pid == 0:
# Child
try:
# Make sure we don't get stuck if there's a problem
signal.alarm(2)
os.close(r)
with open(w, "w") as wpipe:
child(wpipe)
except:
traceback.print_exc()
finally:
# We don't want to return to unittest...
os._exit(0)
# Parent
os.close(w)
os.write(fd, terminal_input)
# Get results from the pipe
with open(r, "r") as rpipe:
lines = []
while True:
line = rpipe.readline().strip()
if line == "":
# The other end was closed => the child exited
break
lines.append(line)
# Check the result was got and corresponds to the user's terminal input
if len(lines) != 2:
# Something went wrong, try to get at stderr
# Beware of Linux raising EIO when the slave is closed
child_output = bytearray()
while True:
try:
chunk = os.read(fd, 3000)
except OSError: # Assume EIO
break
if not chunk:
break
child_output.extend(chunk)
os.close(fd)
child_output = child_output.decode("ascii", "ignore")
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
% (len(lines), child_output))
os.close(fd)
return lines