def __fork_pty(self):
'''This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
this should work on Solaris.
Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
resolve the issue with Python's pty.fork() not supporting Solaris,
particularly ssh. Based on patch to posixmodule.c authored by Noah
Spurrier::
http://mail.python.org/pipermail/python-dev/2003-May/035281.html
'''
parent_fd, child_fd = os.openpty()
if parent_fd < 0 or child_fd < 0:
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
else:
# Parent.
os.close(child_fd)
return pid, parent_fd
python类STDIN_FILENO的实例源码
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
if self.logfile is not None:
self.logfile.write(data)
self.logfile.flush()
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
self.__interact_writen(self.child_fd, data)
break
self.__interact_writen(self.child_fd, data)
def interact(self, escape_character=chr(29),
input_filter=None, output_filter=None):
'''This gives control of the child process to the interactive user (the
human at the keyboard). Keystrokes are sent to the child process, and
the stdout and stderr output of the child process is printed. This
simply echos the child stdout and child stderr to the real stdout and
it echos the real stdin to the child stdin. When the user types the
escape_character this method will return None. The escape_character
will not be transmitted. The default for escape_character is
entered as ``Ctrl - ]``, the very same as BSD telnet. To prevent
escaping, escape_character may be set to None.
If a logfile is specified, then the data sent and received from the
child process in interact mode is duplicated to the given log.
You may pass in optional input and output filter functions. These
functions should take a string and return a string. The output_filter
will be passed all the output from the child process. The input_filter
will be passed all the keyboard input from the user. The input_filter
is run BEFORE the check for the escape_character.
Note that if you change the window size of the parent the SIGWINCH
signal will not be passed through to the child. If you want the child
window size to change when the parent's window size changes then do
something like the following example::
import pexpect, struct, fcntl, termios, signal, sys
def sigwinch_passthrough (sig, data):
s = struct.pack("HHHH", 0, 0, 0, 0)
a = struct.unpack('hhhh', fcntl.ioctl(sys.stdout.fileno(),
termios.TIOCGWINSZ , s))
global p
p.setwinsize(a[0],a[1])
# Note this 'p' global and used in sigwinch_passthrough.
p = pexpect.spawn('/bin/bash')
signal.signal(signal.SIGWINCH, sigwinch_passthrough)
p.interact()
'''
# Flush the buffer.
self.write_to_stdout(self.buffer)
self.stdout.flush()
self.buffer = self.string_type()
mode = tty.tcgetattr(self.STDIN_FILENO)
tty.setraw(self.STDIN_FILENO)
if escape_character is not None and PY3:
escape_character = escape_character.encode('latin-1')
try:
self.__interact_copy(escape_character, input_filter, output_filter)
finally:
tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode)
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
self._log(data, 'read')
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = -1
if escape_character is not None:
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
if data:
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
break
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for fd in self.fds:
try:
os.close(fd)
except:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
self._log(data, 'read')
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = -1
if escape_character is not None:
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
if data:
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
break
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.files = []
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for file in self.files:
try:
file.close()
except OSError:
pass
for fd in self.fds:
try:
os.close(fd)
except OSError:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for fd in self.fds:
try:
os.close(fd)
except:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def __fork_pty(self):
'''This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
this should work on Solaris.
Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
resolve the issue with Python's pty.fork() not supporting Solaris,
particularly ssh. Based on patch to posixmodule.c authored by Noah
Spurrier::
http://mail.python.org/pipermail/python-dev/2003-May/035281.html
'''
parent_fd, child_fd = os.openpty()
if parent_fd < 0 or child_fd < 0:
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
else:
# Parent.
os.close(child_fd)
return pid, parent_fd
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
if self.logfile is not None:
self.logfile.write(data)
self.logfile.flush()
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
self.__interact_writen(self.child_fd, data)
break
self.__interact_writen(self.child_fd, data)
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.files = []
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for file in self.files:
try:
file.close()
except OSError:
pass
for fd in self.fds:
try:
os.close(fd)
except OSError:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
self._log(data, 'read')
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = -1
if escape_character is not None:
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
if data:
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
break
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)