def popen_tty(cmd):
"""Open a process with stdin connected to a pseudo-tty. Returns a
:param cmd: command to run
:type cmd: str
:returns: (Popen, master) tuple, where master is the master side
of the of the tty-pair. It is the responsibility of the caller
to close the master fd, and to perform any cleanup (including
waiting for completion) of the Popen object.
:rtype: (Popen, int)
"""
import pty
master, slave = pty.openpty()
proc = subprocess.Popen(cmd,
stdin=slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
shell=True)
os.close(slave)
return (proc, master)
python类openpty()的实例源码
def test_signal_failure(monkeypatch):
import os
import pty
import signal
from pyrepl.unix_console import UnixConsole
def failing_signal(a, b):
raise ValueError
def really_failing_signal(a, b):
raise AssertionError
mfd, sfd = pty.openpty()
try:
c = UnixConsole(sfd, sfd)
c.prepare()
c.restore()
monkeypatch.setattr(signal, 'signal', failing_signal)
c.prepare()
monkeypatch.setattr(signal, 'signal', really_failing_signal)
c.restore()
finally:
os.close(mfd)
os.close(sfd)
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def __init__(self):
gr.sync_block.__init__(self,
name="hdlc_to_ax25",
in_sig=None,
out_sig=None)
(self.pty_master, self.pty_slave) = pty.openpty()
tty = os.ttyname(self.pty_slave)
if os.path.islink('/tmp/kiss_pty'):
os.unlink('/tmp/kiss_pty')
os.symlink(tty, '/tmp/kiss_pty')
print 'KISS PTY is: /tmp/kiss_pty (%s)' % os.ttyname(self.pty_slave)
self.message_port_register_in(pmt.intern('in'))
self.set_msg_handler(pmt.intern('in'), self.handle_msg)
self.count = 0
self.dropped = 0
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def _handles(stdin, stdout, stderr):
master = None
if stdout is PTY:
# Normally we could just use subprocess.PIPE and be happy.
# Unfortunately, this results in undesired behavior when
# printf() and similar functions buffer data instead of
# sending it directly.
#
# By opening a PTY for STDOUT, the libc routines will not
# buffer any data on STDOUT.
master, slave = pty.openpty()
# By making STDOUT a PTY, the OS will attempt to interpret
# terminal control codes. We don't want this, we want all
# input passed exactly and perfectly to the process.
tty.setraw(master)
tty.setraw(slave)
# Pick one side of the pty to pass to the child
stdout = slave
return stdin, stdout, stderr, master
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def command_background_start(self, cmd):
"""Run a background command: run it in a new thread and resume execution immediately."""
self.printer.debug('[LOCAL CMD] Local Background Command: %s' % cmd)
def daemon(cmd):
"""Daemon used to run the command so to avoid blocking the UI"""
# Run command
master, slave = pty.openpty()
proc = subprocess.Popen(cmd, shell=True, stdout=slave, stderr=slave, close_fds=True)
stdout = os.fdopen(master)
self.printer.info("Monitoring in background...Kill this process when you want to see the dumped content")
# Run command in a thread
d = threading.Thread(name='daemon', target=daemon, args=(cmd,))
d.setDaemon(True)
d.start()
time.sleep(2)
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def test_ioctl_signed_unsigned_code_param(self):
if not pty:
raise unittest.SkipTest('pty module required')
mfd, sfd = pty.openpty()
try:
if termios.TIOCSWINSZ < 0:
set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
else:
set_winsz_opcode_pos = termios.TIOCSWINSZ
set_winsz_opcode_maybe_neg, = struct.unpack("i",
struct.pack("I", termios.TIOCSWINSZ))
our_winsz = struct.pack("HHHH",80,25,0,0)
# test both with a positive and potentially negative ioctl code
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
finally:
os.close(mfd)
os.close(sfd)
def _do_it(self, action):
master, slave = pty.openpty()
p = subprocess.Popen(["ansible-connection"], stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdin = os.fdopen(master, 'wb', 0)
os.close(slave)
# Need to force a protocol that is compatible with both py2 and py3.
# That would be protocol=2 or less.
# Also need to force a protocol that excludes certain control chars as
# stdin in this case is a pty and control chars will cause problems.
# that means only protocol=0 will work.
src = cPickle.dumps(self._play_context.serialize(), protocol=0)
stdin.write(src)
stdin.write(b'\n#END_INIT#\n')
stdin.write(to_bytes(action))
stdin.write(b'\n\n')
(stdout, stderr) = p.communicate()
stdin.close()
return (p.returncode, stdout, stderr)
def popen_tty(cmd):
"""Open a process with stdin connected to a pseudo-tty. Returns a
:param cmd: command to run
:type cmd: str
:returns: (Popen, master) tuple, where master is the master side
of the of the tty-pair. It is the responsibility of the caller
to close the master fd, and to perform any cleanup (including
waiting for completion) of the Popen object.
:rtype: (Popen, int)
"""
master, slave = pty.openpty()
proc = subprocess.Popen(cmd,
stdin=slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
shell=True)
os.close(slave)
return (proc, master)
def _handles(stdin, stdout, stderr):
master = None
if stdout is PTY:
# Normally we could just use subprocess.PIPE and be happy.
# Unfortunately, this results in undesired behavior when
# printf() and similar functions buffer data instead of
# sending it directly.
#
# By opening a PTY for STDOUT, the libc routines will not
# buffer any data on STDOUT.
master, slave = pty.openpty()
# By making STDOUT a PTY, the OS will attempt to interpret
# terminal control codes. We don't want this, we want all
# input passed exactly and perfectly to the process.
tty.setraw(master)
tty.setraw(slave)
# Pick one side of the pty to pass to the child
stdout = slave
return stdin, stdout, stderr, master
def popen_tty(cmd):
"""Open a process with stdin connected to a pseudo-tty. Returns a
:param cmd: command to run
:type cmd: str
:returns: (Popen, master) tuple, where master is the master side
of the of the tty-pair. It is the responsibility of the caller
to close the master fd, and to perform any cleanup (including
waiting for completion) of the Popen object.
:rtype: (Popen, int)
"""
master, slave = pty.openpty()
proc = subprocess.Popen(cmd,
stdin=slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
shell=True)
os.close(slave)
return (proc, master)
def __init__(self, message_q):
QThread.__init__(self)
self.messages = message_q
# Only the inferior process need use the slave file descriptor
self.master, self.slave = pty.openpty()
self.tty = os.ttyname(self.slave)
def getPty(self, term, windowSize, modes):
self.environ['TERM'] = term
self.winSize = windowSize
self.modes = modes
master, slave = pty.openpty()
ttyname = os.ttyname(slave)
self.environ['SSH_TTY'] = ttyname
self.ptyTuple = (master, slave, ttyname)
def __init__(self):
self._master, self._slave = pty.openpty()
self._s_name = os.ttyname(self._slave)
self._fake = Faker()
self._fake_device = threading.Thread(target=self.__run)
def test_readline():
master, slave = pty.openpty()
readline_wrapper = _ReadlineWrapper(slave, slave)
os.write(master, b'input\n')
result = readline_wrapper.get_reader().readline()
assert result == b'input'
assert isinstance(result, bytes_type)
def test_readline_returns_unicode():
master, slave = pty.openpty()
readline_wrapper = _ReadlineWrapper(slave, slave)
os.write(master, b'input\n')
result = readline_wrapper.get_reader().readline(returns_unicode=True)
assert result == 'input'
assert isinstance(result, unicode_type)
def test_raw_input():
master, slave = pty.openpty()
readline_wrapper = _ReadlineWrapper(slave, slave)
os.write(master, b'input\n')
result = readline_wrapper.raw_input('prompt:')
assert result == b'input'
assert isinstance(result, bytes_type)
def spawn(self, argv=None, term=None):
if argv is None:
if 'SHELL' in os.environ:
argv = [os.environ['SHELL']]
elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
for shell in ["bash","sh","ksh","zsh","csh","ash"]:
for path in os.environ['PATH'].split(':'):
fullpath=os.path.join(path.strip(),shell)
if os.path.isfile(fullpath):
argv=[fullpath]
break
if argv:
break
if not argv:
argv= ['/bin/sh']
if term is not None:
os.environ['TERM']=term
master, slave = pty.openpty()
self.slave=slave
self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
assert flags>=0
flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
assert flags>=0
self.prog = subprocess.Popen(
shell=False,
args=argv,
stdin=slave,
stdout=slave,
stderr=subprocess.STDOUT,
preexec_fn=prepare
)
def __init__(self, command=None, return_url=None, restart=False):
self.command = command
self.terminal_id = prism.generate_random_string(8)
self.return_url = return_url
self.restart = restart
self.process = TerminalProcess(*pty.openpty())
logging.info('Terminal Opened: %s' % self.terminal_id)
def spawn(self, argv=None, term=None):
if argv is None:
if 'SHELL' in os.environ:
argv = [os.environ['SHELL']]
elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
for shell in ["bash","sh","ksh","zsh","csh","ash"]:
for path in os.environ['PATH'].split(':'):
fullpath=os.path.join(path.strip(),shell)
if os.path.isfile(fullpath):
argv=[fullpath]
break
if argv:
break
if not argv:
argv= ['/bin/sh']
if term is not None:
os.environ['TERM']=term
master, slave = pty.openpty()
self.slave=slave
self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
assert flags>=0
flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
assert flags>=0
self.prog = subprocess.Popen(
shell=False,
args=argv,
stdin=slave,
stdout=slave,
stderr=subprocess.STDOUT,
preexec_fn=prepare
)
def __init__(self, message_q):
QThread.__init__(self)
self.messages = message_q
# Only the inferior process need use the slave file descriptor
self.master, self.slave = pty.openpty()
self.tty = os.ttyname(self.slave)
def getPty(self, term, windowSize, modes):
self.environ['TERM'] = term
self.winSize = windowSize
self.modes = modes
master, slave = pty.openpty()
ttyname = os.ttyname(slave)
self.environ['SSH_TTY'] = ttyname
self.ptyTuple = (master, slave, ttyname)
def __init__(self, name):
"""If possible, connect to serial port"""
# Try to establish connection to serial port
try:
self.serial_connection = serial.Serial(name, 9600)
self.serial_connection_established = True
# If it fails because there is no device, use virtual serial port
except serial.SerialException:
# Create virtual serial port
self.master, self.slave = pty.openpty()
self.vPort = os.ttyname(self.slave)
# Create instance
self.serial_connection = serial.Serial(self.vPort, 9600)
self.serial_connection_established = True
logging.warn("Trigger device not found -> Using virtual device")
# Create events
self.trigger_event = threading.Event()
self.eventProgramEnd = threading.Event()
# Store current time
self.last_trigger_time = time.time()
self.firstRun = True
# Call initialization of thread class
threading.Thread.__init__(self)
def make_slave_pty():
master_pty, slave_pty = pty.openpty()
yield slave_pty
os.close(slave_pty)
os.close(master_pty)