python类openpty()的实例源码

common.py 文件源码 项目:deb-python-dcos 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def popen_tty(cmd):
    """Open a process with stdin connected to a pseudo-tty.  Returns a

    :param cmd: command to run
    :type cmd: str
    :returns: (Popen, master) tuple, where master is the master side
       of the of the tty-pair.  It is the responsibility of the caller
       to close the master fd, and to perform any cleanup (including
       waiting for completion) of the Popen object.
    :rtype: (Popen, int)

    """

    import pty
    master, slave = pty.openpty()
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    return (proc, master)
test_bugs.py 文件源码 项目:pyrepl 作者: dajose 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_signal_failure(monkeypatch):
    import os
    import pty
    import signal
    from pyrepl.unix_console import UnixConsole

    def failing_signal(a, b):
        raise ValueError

    def really_failing_signal(a, b):
        raise AssertionError

    mfd, sfd = pty.openpty()
    try:
        c = UnixConsole(sfd, sfd)
        c.prepare()
        c.restore()
        monkeypatch.setattr(signal, 'signal', failing_signal)
        c.prepare()
        monkeypatch.setattr(signal, 'signal', really_failing_signal)
        c.restore()
    finally:
        os.close(mfd)
        os.close(sfd)
test_ioctl.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
hdlc_to_ax25.py 文件源码 项目:GroundStation 作者: ClydeSpace-GroundStation 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        gr.sync_block.__init__(self,
                               name="hdlc_to_ax25",
                               in_sig=None,
                               out_sig=None)

        (self.pty_master, self.pty_slave) = pty.openpty()
        tty = os.ttyname(self.pty_slave)

        if os.path.islink('/tmp/kiss_pty'):
            os.unlink('/tmp/kiss_pty')
        os.symlink(tty, '/tmp/kiss_pty')

        print 'KISS PTY is: /tmp/kiss_pty (%s)' % os.ttyname(self.pty_slave)

        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.count = 0
        self.dropped = 0
test_ioctl.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
test_ioctl.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
process.py 文件源码 项目:pwndemo 作者: zh-explorer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _handles(stdin, stdout, stderr):
        master = None

        if stdout is PTY:
            # Normally we could just use subprocess.PIPE and be happy.
            # Unfortunately, this results in undesired behavior when
            # printf() and similar functions buffer data instead of
            # sending it directly.
            #
            # By opening a PTY for STDOUT, the libc routines will not
            # buffer any data on STDOUT.
            master, slave = pty.openpty()

            # By making STDOUT a PTY, the OS will attempt to interpret
            # terminal control codes.  We don't want this, we want all
            # input passed exactly and perfectly to the process.
            tty.setraw(master)
            tty.setraw(slave)

            # Pick one side of the pty to pass to the child
            stdout = slave

        return stdin, stdout, stderr, master
test_ioctl.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
local_operations.py 文件源码 项目:needle 作者: mwrlabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def command_background_start(self, cmd):
        """Run a background command: run it in a new thread and resume execution immediately."""
        self.printer.debug('[LOCAL CMD] Local Background Command: %s' % cmd)

        def daemon(cmd):
            """Daemon used to run the command so to avoid blocking the UI"""
            # Run command
            master, slave = pty.openpty()
            proc = subprocess.Popen(cmd, shell=True, stdout=slave, stderr=slave, close_fds=True)
            stdout = os.fdopen(master)
            self.printer.info("Monitoring in background...Kill this process when you want to see the dumped content")

        # Run command in a thread
        d = threading.Thread(name='daemon', target=daemon, args=(cmd,))
        d.setDaemon(True)
        d.start()
        time.sleep(2)
test_ioctl.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
test_ioctl.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
test_ioctl.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
test_ioctl.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_ioctl_signed_unsigned_code_param(self):
        if not pty:
            raise unittest.SkipTest('pty module required')
        mfd, sfd = pty.openpty()
        try:
            if termios.TIOCSWINSZ < 0:
                set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
                set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
            else:
                set_winsz_opcode_pos = termios.TIOCSWINSZ
                set_winsz_opcode_maybe_neg, = struct.unpack("i",
                        struct.pack("I", termios.TIOCSWINSZ))

            our_winsz = struct.pack("HHHH",80,25,0,0)
            # test both with a positive and potentially negative ioctl code
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
            new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
        finally:
            os.close(mfd)
            os.close(sfd)
persistent.py 文件源码 项目:ansible-provider-docs 作者: alibaba 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _do_it(self, action):

        master, slave = pty.openpty()
        p = subprocess.Popen(["ansible-connection"], stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdin = os.fdopen(master, 'wb', 0)
        os.close(slave)

        # Need to force a protocol that is compatible with both py2 and py3.
        # That would be protocol=2 or less.
        # Also need to force a protocol that excludes certain control chars as
        # stdin in this case is a pty and control chars will cause problems.
        # that means only protocol=0 will work.
        src = cPickle.dumps(self._play_context.serialize(), protocol=0)
        stdin.write(src)

        stdin.write(b'\n#END_INIT#\n')
        stdin.write(to_bytes(action))
        stdin.write(b'\n\n')

        (stdout, stderr) = p.communicate()
        stdin.close()

        return (p.returncode, stdout, stderr)
common.py 文件源码 项目:dcos-tunnel 作者: dcos 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def popen_tty(cmd):
    """Open a process with stdin connected to a pseudo-tty.  Returns a

    :param cmd: command to run
    :type cmd: str
    :returns: (Popen, master) tuple, where master is the master side
       of the of the tty-pair.  It is the responsibility of the caller
       to close the master fd, and to perform any cleanup (including
       waiting for completion) of the Popen object.
    :rtype: (Popen, int)

    """
    master, slave = pty.openpty()
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    return (proc, master)
process.py 文件源码 项目:black_zone 作者: zh-explorer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _handles(stdin, stdout, stderr):
        master = None

        if stdout is PTY:
            # Normally we could just use subprocess.PIPE and be happy.
            # Unfortunately, this results in undesired behavior when
            # printf() and similar functions buffer data instead of
            # sending it directly.
            #
            # By opening a PTY for STDOUT, the libc routines will not
            # buffer any data on STDOUT.
            master, slave = pty.openpty()

            # By making STDOUT a PTY, the OS will attempt to interpret
            # terminal control codes.  We don't want this, we want all
            # input passed exactly and perfectly to the process.
            tty.setraw(master)
            tty.setraw(slave)

            # Pick one side of the pty to pass to the child
            stdout = slave

        return stdin, stdout, stderr, master
common.py 文件源码 项目:deploy-marathon-bluegreen 作者: softonic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def popen_tty(cmd):
    """Open a process with stdin connected to a pseudo-tty.  Returns a

    :param cmd: command to run
    :type cmd: str
    :returns: (Popen, master) tuple, where master is the master side
       of the of the tty-pair.  It is the responsibility of the caller
       to close the master fd, and to perform any cleanup (including
       waiting for completion) of the Popen object.
    :rtype: (Popen, int)

    """
    master, slave = pty.openpty()
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    return (proc, master)
__init__.py 文件源码 项目:binja_dynamics 作者: nccgroup 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, message_q):
        QThread.__init__(self)
        self.messages = message_q
        # Only the inferior process need use the slave file descriptor
        self.master, self.slave = pty.openpty()
        self.tty = os.ttyname(self.slave)
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getPty(self, term, windowSize, modes):
        self.environ['TERM'] = term
        self.winSize = windowSize
        self.modes = modes
        master, slave = pty.openpty()
        ttyname = os.ttyname(slave)
        self.environ['SSH_TTY'] = ttyname 
        self.ptyTuple = (master, slave, ttyname)
test_tpe.py 文件源码 项目:pyTeliumManager 作者: Ousret 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        self._master, self._slave = pty.openpty()
        self._s_name = os.ttyname(self._slave)

        self._fake = Faker()

        self._fake_device = threading.Thread(target=self.__run)
test_readline.py 文件源码 项目:pyrepl 作者: dajose 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_readline():
    master, slave = pty.openpty()
    readline_wrapper = _ReadlineWrapper(slave, slave)
    os.write(master, b'input\n')

    result = readline_wrapper.get_reader().readline()
    assert result == b'input'
    assert isinstance(result, bytes_type)
test_readline.py 文件源码 项目:pyrepl 作者: dajose 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_readline_returns_unicode():
    master, slave = pty.openpty()
    readline_wrapper = _ReadlineWrapper(slave, slave)
    os.write(master, b'input\n')

    result = readline_wrapper.get_reader().readline(returns_unicode=True)
    assert result == 'input'
    assert isinstance(result, unicode_type)
test_readline.py 文件源码 项目:pyrepl 作者: dajose 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_raw_input():
    master, slave = pty.openpty()
    readline_wrapper = _ReadlineWrapper(slave, slave)
    os.write(master, b'input\n')

    result = readline_wrapper.raw_input('prompt:')
    assert result == b'input'
    assert isinstance(result, bytes_type)
ptyshell.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def spawn(self, argv=None, term=None):
        if argv is None:
            if 'SHELL' in os.environ:
                argv = [os.environ['SHELL']]
            elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
                for shell in ["bash","sh","ksh","zsh","csh","ash"]:
                    for path in os.environ['PATH'].split(':'):
                        fullpath=os.path.join(path.strip(),shell)
                        if os.path.isfile(fullpath):
                            argv=[fullpath]
                            break
                    if argv:
                        break
        if not argv:
            argv= ['/bin/sh']

        if term is not None:
            os.environ['TERM']=term

        master, slave = pty.openpty()
        self.slave=slave
        self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
        flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
        assert flags>=0
        flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
        assert flags>=0
        self.prog = subprocess.Popen(
            shell=False,
            args=argv,
            stdin=slave,
            stdout=slave,
            stderr=subprocess.STDOUT,
            preexec_fn=prepare
        )
terminal.py 文件源码 项目:Prism 作者: Stumblinbear 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, command=None, return_url=None, restart=False):
        self.command = command
        self.terminal_id = prism.generate_random_string(8)
        self.return_url = return_url
        self.restart = restart
        self.process = TerminalProcess(*pty.openpty())

        logging.info('Terminal Opened: %s' % self.terminal_id)
ptyshell.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def spawn(self, argv=None, term=None):
        if argv is None:
            if 'SHELL' in os.environ:
                argv = [os.environ['SHELL']]
            elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android
                for shell in ["bash","sh","ksh","zsh","csh","ash"]:
                    for path in os.environ['PATH'].split(':'):
                        fullpath=os.path.join(path.strip(),shell)
                        if os.path.isfile(fullpath):
                            argv=[fullpath]
                            break
                    if argv:
                        break
        if not argv:
            argv= ['/bin/sh']

        if term is not None:
            os.environ['TERM']=term

        master, slave = pty.openpty()
        self.slave=slave
        self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode
        flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
        assert flags>=0
        flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK)
        assert flags>=0
        self.prog = subprocess.Popen(
            shell=False,
            args=argv,
            stdin=slave,
            stdout=slave,
            stderr=subprocess.STDOUT,
            preexec_fn=prepare
        )
__init__.py 文件源码 项目:binja_dynamics 作者: ehennenfent 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, message_q):
        QThread.__init__(self)
        self.messages = message_q
        # Only the inferior process need use the slave file descriptor
        self.master, self.slave = pty.openpty()
        self.tty = os.ttyname(self.slave)
unix.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getPty(self, term, windowSize, modes):
        self.environ['TERM'] = term
        self.winSize = windowSize
        self.modes = modes
        master, slave = pty.openpty()
        ttyname = os.ttyname(slave)
        self.environ['SSH_TTY'] = ttyname 
        self.ptyTuple = (master, slave, ttyname)
serial_interface.py 文件源码 项目:vbcg 作者: nspi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, name):
        """If possible, connect to serial port"""

        # Try to establish connection to serial port
        try:
            self.serial_connection = serial.Serial(name, 9600)
            self.serial_connection_established = True

        # If it fails because there is no device, use virtual serial port
        except serial.SerialException:

            # Create virtual serial port
            self.master, self.slave = pty.openpty()
            self.vPort = os.ttyname(self.slave)

            # Create instance
            self.serial_connection = serial.Serial(self.vPort, 9600)
            self.serial_connection_established = True

            logging.warn("Trigger device not found -> Using virtual device")

        # Create events
        self.trigger_event = threading.Event()
        self.eventProgramEnd = threading.Event()

        # Store current time
        self.last_trigger_time = time.time()
        self.firstRun = True

        # Call initialization of thread class
        threading.Thread.__init__(self)
runner.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_slave_pty():
    master_pty, slave_pty = pty.openpty()
    yield slave_pty
    os.close(slave_pty)
    os.close(master_pty)


问题


面经


文章

微信
公众号

扫码关注公众号