python类fork()的实例源码

cmd.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run_bg(cmd, debug=False, cwd=''):
    # make sure 'cmd' is a list of strings
    if type(cmd) in [str, unicode]:
        cmd = [c for c in cmd.split() if c != '']
    if debug:
        sys.stderr.write(' '.join(cmd)+'\n')
        sys.stderr.flush()

    try:
        ( child_pid, child_fd ) = pty.fork()
    except OSError as e:
        raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
    if child_pid == 0:
        try:
            if cwd != '':
                os.chdir(cwd)
            os.execvp(cmd[0], cmd)
        except Exception, e:
            raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
    else:
        return child_pid, child_fd
cmd.py 文件源码 项目:ave 作者: sonyxperiadev 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run_bg(cmd, debug=False, cwd=''):
    # make sure 'cmd' is a list of strings
    if type(cmd) in [str, unicode]:
        cmd = [c for c in cmd.split() if c != '']
    if debug:
        sys.stderr.write(' '.join(cmd)+'\n')
        sys.stderr.flush()

    try:
        ( child_pid, child_fd ) = pty.fork()
    except OSError as e:
        raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
    if child_pid == 0:
        try:
            if cwd != '':
                os.chdir(cwd)
            os.execvp(cmd[0], cmd)
        except Exception, e:
            raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
    else:
        return child_pid, child_fd
rogueinabox.py 文件源码 项目:rogueinabox 作者: rogueinabox 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def open_terminal(command="bash", columns=80, lines=24):
    p_pid, master_fd = pty.fork()
    if p_pid == 0:  # Child.
        path, *args = shlex.split(command)
        args = [path] + args
        env = dict(TERM="linux", LC_ALL="en_GB.UTF-8",
                   COLUMNS=str(columns), LINES=str(lines))
        try:
            os.execvpe(path, args, env)
        except FileNotFoundError:
            print("Could not find the executable in %s. Press any key to exit." % path)
            exit()

    # set non blocking read
    flag = fcntl.fcntl(master_fd, fcntl.F_GETFD)
    fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    # File-like object for I/O with the child process aka command.
    p_out = os.fdopen(master_fd, "w+b", 0)
    return Terminal(columns, lines), p_pid, p_out
test_proxy.py 文件源码 项目:remote-docker 作者: phizaz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def spawn(self, argv):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None
        assert isinstance(argv, list)

        pid, master_fd = pty.fork()
        self.master_fd = master_fd

        if pid == pty.CHILD:
            os.execlp(argv[0], *argv)  # and not ever returned

        self._init()
        try:
            self._copy()  # start communication
        except Exception:
            # unexpected errors
            self._del()
            raise
        self._del()

        return self.log.decode()
check_handler.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main ():
    signal.signal (signal.SIGCHLD, signal_handler)
    pid, fd = pty.fork()
    if pid == 0:
        os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)
    nonblock (fd)
    tty.setraw(fd) #STDIN_FILENO)
    print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        print 'Sleep interrupted'
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
    print 'Reading from master fd:', os.read (fd, 1000)
workers.py 文件源码 项目:jack 作者: jack-cli-cd-ripper 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start_new_process(args, nice_value=0):
    "start a new process in a pty and renice it"
    data = {}
    data['start_time'] = time.time()
    pid, master_fd = pty.fork()
    if pid == CHILD:
        default_signals()
        if nice_value:
            os.nice(nice_value)
        os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args])
    else:
        data['pid'] = pid
        if os.uname()[0] == "Linux":
            fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK)
        data['fd'] = master_fd
        data['file'] = os.fdopen(master_fd)
        data['cmd'] = args
        data['buf'] = ""
        data['otf'] = 0
        data['percent'] = 0
        data['elapsed'] = 0
        return data
vterm.py 文件源码 项目:Adwear 作者: Uberi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def spawn(self):
        env = self.env
        env['TERM'] = 'linux'

        self.pid, self.master = pty.fork()

        if self.pid == 0:
            if callable(self.command):
                try:
                    try:
                        self.command()
                    except:
                        sys.stderr.write(traceback.format_exc())
                        sys.stderr.flush()
                finally:
                    os._exit(0)
            else:
                os.execvpe(self.command[0], self.command, env)

        if self.main_loop is None:
            fcntl.fcntl(self.master, fcntl.F_SETFL, os.O_NONBLOCK)

        atexit.register(self.terminate)
check_handler.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main ():
    signal.signal (signal.SIGCHLD, signal_handler)
    pid, fd = pty.fork()
    if pid == 0:
        os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)
    nonblock (fd)
    tty.setraw(fd) #STDIN_FILENO)
    print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        print 'Sleep interrupted'
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
    print 'Reading from master fd:', os.read (fd, 1000)
linux_pty.py 文件源码 项目:TerminalView 作者: Wramberg 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, cmd, cwd):
        self._cmd_return_code = 0
        self._cmd_kill_signal = 0
        self._shell_pid, self._master_fd = pty.fork()
        if self._shell_pid == pty.CHILD:
            os.environ["TERM"] = "linux"
            os.chdir(cwd)
            os.execv(cmd[0], cmd)
TerminalProcess.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def restart(self, cmd):
        if not self.completed:
            self.process_input("\n\033[01;31mProcess killed.\033[00m\r\n")
            os.kill(self.pid, signal.SIGHUP)
            thread = self.thread
            thread.stop()
            while thread.alive:
                QCoreApplication.processEvents()

        self.exit_pipe, child_pipe = os.pipe()
        pid, fd = pty.fork()
        if pid == 0:
            try:
                os.environ["TERM"] = "xterm-256color"
                retval = subprocess.call(cmd, close_fds=True)
                os.write(2, "\033[01;34mProcess has completed.\033[00m\n")
                os.write(child_pipe, "t")
                os._exit(retval)
            except:
                pass
            os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n")
            os.write(child_pipe, "f")
            os._exit(1)

        os.close(child_pipe)
        self.process_input("\033[01;34mStarted process with PID %d.\033[00m\r\n" % pid)

        self.pid = pid
        self.data_pipe = fd
        self.completed = False

        # Initialize terminal settings
        fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0))
        attribute = termios.tcgetattr(self.data_pipe)
        termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute)

        self.thread = TerminalUpdateThread(self, self.data_pipe, self.exit_pipe)
        self.thread.start()
proxy.py 文件源码 项目:dataplicity-agent 作者: wildfoundry 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def spawn(self, argv=None):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None

        pid, master_fd = pty.fork()
        self.master_fd = master_fd
        self.pid = pid
        if pid == pty.CHILD:
            if self.user is not None:
                try:
                    uid = pwd.getpwnam(self.user).pw_uid
                except KeyError:
                    log.error("No such user: %s", self.user)
                else:
                    os.setuid(uid)
                    log.debug('switched user for remote process to %s(%s)', self.user, uid)
            if self.group is not None:
                try:
                    gid = grp.getgrnam(self.group).gr_gid
                except KeyError:
                    log.error("No such group: %s", self.group)
                else:
                    os.setgid(gid)
                    log.debug('switched group for remote process to %s(%s)', self.group, gid)
            if not argv:
                argv = [os.environ['SHELL']]
            os.execlp(argv[0], *argv)
            # Previous command replaces the process
            return

        self._init_fd()
        try:
            self._copy()
        except (IOError, OSError):
            pass

        os.close(master_fd)
        self.master_fd = None
__init__.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
__init__.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
check_signals.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def sig_test (sig_handler_type, fork_type, child_output):
    print 'Testing with:'
    print '\tsig_handler_type:', sig_handler_type
    print '\tfork_type:', fork_type
    print '\tchild_output:', child_output

    if sig_handler_type == 'SIG_IGN':
        signal.signal (signal.SIGCHLD, signal.SIG_IGN)
    else:
        signal.signal (signal.SIGCHLD, signal_handler)
    pid = -1
    fd = -1
    if fork_type == 'ptyfork':
        pid, fd = pty.fork()
    else:
        pid = os.fork()

    if pid == 0:
        if child_output == 'yes':
            os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)

    #print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    #print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        pass
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
__init__.py 文件源码 项目:ssh-tunnel 作者: aalku 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
__init__.py 文件源码 项目:ssh-tunnel 作者: aalku 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
server.py 文件源码 项目:gits 作者: tolstoyevsky 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _create(self, rows=24, cols=80):
        pid, fd = pty.fork()
        if pid == 0:
            if os.getuid() == 0:
                cmd = ['/bin/login']
            else:
                # The prompt has to end with a newline character.
                sys.stdout.write(socket.gethostname() + ' login: \n')
                login = sys.stdin.readline().strip()

                cmd = [
                    'ssh',
                    '-oPreferredAuthentications=keyboard-interactive,password',
                    '-oNoHostAuthenticationForLocalhost=yes',
                    '-oLogLevel=FATAL',
                    '-F/dev/null',
                    '-l', login, 'localhost',
                ]

            env = {
                'COLUMNS': str(cols),
                'LINES': str(rows),
                'PATH': os.environ['PATH'],
                'TERM': 'linux',
            }
            os.execvpe(cmd[0], cmd, env)
        else:
            fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
            fcntl.ioctl(fd, termios.TIOCSWINSZ,
                        struct.pack('HHHH', rows, cols, 0, 0))
            TermSocketHandler.clients[fd] = {
                'client': self,
                'pid': pid,
                'terminal': Terminal(rows, cols)
            }

            return fd
__init__.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
__init__.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
check_signals.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def sig_test (sig_handler_type, fork_type, child_output):
    print 'Testing with:'
    print '\tsig_handler_type:', sig_handler_type
    print '\tfork_type:', fork_type
    print '\tchild_output:', child_output

    if sig_handler_type == 'SIG_IGN':
        signal.signal (signal.SIGCHLD, signal.SIG_IGN)
    else:
        signal.signal (signal.SIGCHLD, signal_handler)
    pid = -1
    fd = -1
    if fork_type == 'ptyfork':
        pid, fd = pty.fork()
    else:
        pid = os.fork()

    if pid == 0:
        if child_output == 'yes':
            os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)

    #print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    #print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        pass
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
pexpect23.py 文件源码 项目:sardana 作者: sardana-org 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __fork_pty(self):
        """This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        """

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect, "Error! Could not open pty with os.openpty()."

        pid = os.fork()
        if pid < 0:
            raise ExceptionPexpect, "Error! Failed os.fork()."
        elif pid == 0:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, 0)
            os.dup2(child_fd, 1)
            os.dup2(child_fd, 2)

            if child_fd > 2:
                os.close(child_fd)
        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
pexpect23.py 文件源码 项目:sardana 作者: sardana-org 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        """This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. """

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty if still connected.
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        if fd >= 0:
            os.close(fd)

        os.setsid()

        # Verify we are disconnected from controlling tty
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise ExceptionPexpect, "Error! We are not disconnected from a controlling tty."
        except:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise ExceptionPexpect, "Error! Could not open child pty, " + child_name
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise ExceptionPexpect, "Error! Could not open controlling tty, /dev/tty"
        else:
            os.close(fd)
KDEAptDialogs.py 文件源码 项目:gdebi 作者: linuxmint 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def commit(self):
        # ui
        self.status.setText(_("Installing '%s'...") % os.path.basename(self.debfile))
        # the command
        cmd = "/usr/bin/dpkg"
        argv = [cmd, "--auto-deconfigure", "-i", self.debfile]
        (self.child_pid, self.master_fd) = pty.fork()

        if self.child_pid == 0:
            os.environ["TERM"] = "dumb"
            if not "DEBIAN_FRONTEND" in os.environ:
                os.environ["DEBIAN_FRONTEND"] = "noninteractive"
            os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
            exitstatus = subprocess.call(argv)
            os._exit(exitstatus)

        while True:
            #Read from pty and write to DumbTerminal
            try:
                (rlist, wlist, xlist) = select.select([self.master_fd],[],[], 0.001)
                if len(rlist) > 0:
                    line = os.read(self.master_fd, 255)
                    self.parent.konsole.insertWithTermCodes(utf8(line))
            except Exception as e:
                #print e
                from errno import EAGAIN
                if hasattr(e, "errno") and e.errno == EAGAIN:
                    continue
                break
            KApplication.kApplication().processEvents()
        # at this point we got a read error from the pty, that most
        # likely means that the client is dead
        (pid, status) = os.waitpid(self.child_pid, 0)
        self.exitstatus = os.WEXITSTATUS(status)

        self.progress.setValue(100)
        self.parent.closeButton.setEnabled(True)
        self.parent.closeButton.setVisible(True)
        self.parent.installationProgress.setVisible(False)
        QTimer.singleShot(1, self.parent.changeSize)
KDEAptDialogs.py 文件源码 项目:gdebi 作者: linuxmint 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def fork(self):
        """pty voodoo"""
        (self.child_pid, self.master_fd) = pty.fork()
        if self.child_pid == 0:
            os.environ["TERM"] = "dumb"
            if not "DEBIAN_FRONTEND" in os.environ:
                os.environ["DEBIAN_FRONTEND"] = "noninteractive"
            os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
        return self.child_pid
TerminalProcess.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, cmd, raw_debug = None):
        if os.name == "nt":
            raise RuntimeError, "Windows does not support pseudo-terminal devices"

        if cmd:
            # Spawn the new process in a new PTY
            self.exit_pipe, child_pipe = os.pipe()
            self.exit_callback = None

            pid, fd = pty.fork()
            if pid == 0:
                try:
                    os.environ["TERM"] = "xterm-256color"
                    if "PYTHONPATH" in os.environ:
                        del(os.environ["PYTHONPATH"])
                    if "LD_LIBRARY_PATH" in os.environ:
                        del(os.environ["LD_LIBRARY_PATH"])
                    if sys.platform == "darwin":
                        if "DYLD_LIBRARY_PATH" in os.environ:
                            del(os.environ["DYLD_LIBRARY_PATH"])

                    retval = subprocess.call(cmd, close_fds=True)
                    os.write(2, "\033[01;34mProcess has completed.\033[00m\n")
                    os.write(child_pipe, "t")
                    os._exit(retval)
                except:
                    pass
                os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n")
                os.write(child_pipe, "f")
                os._exit(1)

            os.close(child_pipe)

            self.pid = pid
            self.data_pipe = fd
        else:
            self.exit_pipe = -1
            self.pid = -1
            self.data_pipe = -1

        self.rows = 25
        self.cols = 80
        self.term = TerminalEmulator(self.rows, self.cols)
        self.raw_debug = raw_debug
        self.completed = False

        self.term.response_callback = self.send_input

        if cmd:
            # Initialize terminal settings
            fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0))
            attribute = termios.tcgetattr(self.data_pipe)
            termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute)
        else:
            self.process_input("\033[01;33mEnter desired command arguments, then run again.\033[00m\r\n")
            self.completed = True
test_builtin.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], {'tty = True', 'tty = False'})
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
test_builtins.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {0}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], set(['tty = True', 'tty = False']))
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
process.py 文件源码 项目:pwndemo 作者: zh-explorer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty. Harmless if not already connected.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Already disconnected. This happens if running inside cron.
            pass

        os.setsid()

        # Verify we are disconnected from controlling tty
        # by attempting to open it again.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise Exception('Failed to disconnect from ' +
                    'controlling tty. It is still possible to open /dev/tty.')
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise Exception("Could not open child pty, " + child_name)
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise Exception("Could not open controlling tty, /dev/tty")
        else:
            os.close(fd)
test_builtin.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], {'tty = True', 'tty = False'})
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
test_builtin.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run_child(self, child, terminal_input):
        r, w = os.pipe()  # Pipe test results from child back to parent
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
            raise
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                with open(w, "w") as wpipe:
                    child(wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input)
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            # Beware of Linux raising EIO when the slave is closed
            child_output = bytearray()
            while True:
                try:
                    chunk = os.read(fd, 3000)
                except OSError:  # Assume EIO
                    break
                if not chunk:
                    break
                child_output.extend(chunk)
            os.close(fd)
            child_output = child_output.decode("ascii", "ignore")
            self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                      % (len(lines), child_output))
        os.close(fd)
        return lines


问题


面经


文章

微信
公众号

扫码关注公众号