python类fork()的实例源码

ssh_launcher.py 文件源码 项目:EmPyre 作者: EmpireProject 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def generate(self):
        login = self.options['Login']['Value']
        password = self.options['Password']['Value']
        listenerName = self.options['Listener']['Value']
        userAgent = self.options['UserAgent']['Value']
        littleSnitch = self.options['LittleSnitch']['Value']

        isEmpire = self.mainMenu.listeners.is_listener_empyre(listenerName)
        if not isEmpire:
            print helpers.color("[!] EmPyre listener required!")
            return ""

        # generate the launcher code
        launcher = self.mainMenu.stagers.generate_launcher(listenerName, userAgent=userAgent,  littlesnitch=littleSnitch)
        launcher = launcher.replace("'", "\\'")
        launcher = launcher.replace('"', '\\"')
        if launcher == "":
            print helpers.color("[!] Error in launcher command generation.")
            return ""
        script = """
import os
import pty

def wall(host, pw):
    import os,pty
    pid, fd = pty.fork()
    if pid == 0:
        os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s'])
        os._exit(1)

    os.read(fd, 1024)
    os.write(fd, '\\n' + pw + '\\n')

    result = []
    while True:
        try:
            data = os.read(fd, 1024)
            if data == "Password:":
                os.write(fd, pw + '\\n')

        except OSError:
            break
        if not data:
            break
        result.append(data)
    pid, status = os.waitpid(pid, 0)
    return status, ''.join(result)

status, output = wall('%s','%s')
print status
print output

""" % (launcher, login, password)
        return script
ssh_command.py 文件源码 项目:EmPyre 作者: EmpireProject 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def generate(self):
        login = self.options['Login']['Value']
        password = self.options['Password']['Value']
        command = self.options['Command']['Value']

        # generate the launcher code


        script = """

import os
import pty

def wall(host, pw):
    import os,pty
    pid, fd = pty.fork()
    if pid == 0: # Child
        os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s'])
        os._exit(1) # fail to execv

    # read '..... password:', write password
    os.read(fd, 1024)
    os.write(fd, '\\n' + pw + '\\n')

    result = []
    while True:
        try:
            data = os.read(fd, 1024)
            if data == "Password:":
                os.write(fd, pw + '\\n')

        except OSError:
            break
        if not data:
            break
        result.append(data)
    pid, status = os.waitpid(pid, 0)
    return status, ''.join(result)

status, output = wall('%s','%s')
print status
print output

""" % (command, login, password)
        return script
conque_subprocess.py 文件源码 项目:janus 作者: nks5295 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def open(self, command, env={}):
        """ Create subprocess using forkpty() """

        # parse command
        command_arr = shlex.split(command)
        executable = command_arr[0]
        args = command_arr

        # try to fork a new pty
        try:
            self.pid, self.fd = pty.fork()

        except:

            return False

        # child proc, replace with command after altering terminal attributes
        if self.pid == 0:

            # set requested environment variables
            for k in env.keys():
                os.environ[k] = env[k]

            # set tty attributes
            try:
                attrs = tty.tcgetattr(1)
                attrs[0] = attrs[0] ^ tty.IGNBRK
                attrs[0] = attrs[0] | tty.BRKINT | tty.IXANY | tty.IMAXBEL
                attrs[2] = attrs[2] | tty.HUPCL
                attrs[3] = attrs[3] | tty.ICANON | tty.ECHO | tty.ISIG | tty.ECHOKE
                attrs[6][tty.VMIN] = 1
                attrs[6][tty.VTIME] = 0
                tty.tcsetattr(1, tty.TCSANOW, attrs)
            except:

                pass

            # replace this process with the subprocess
            os.execvp(executable, args)

        # else master, do nothing
        else:
            pass
test_builtin.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], {'tty = True', 'tty = False'})
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
process.py 文件源码 项目:black_zone 作者: zh-explorer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty. Harmless if not already connected.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Already disconnected. This happens if running inside cron.
            pass

        os.setsid()

        # Verify we are disconnected from controlling tty
        # by attempting to open it again.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise Exception('Failed to disconnect from ' +
                    'controlling tty. It is still possible to open /dev/tty.')
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise Exception("Could not open child pty, " + child_name)
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise Exception("Could not open controlling tty, /dev/tty")
        else:
            os.close(fd)
pkg5unittest.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ptyPopen(args, executable=None, env=None, shell=False):
                """Less featureful but inspired by subprocess.Popen.
                Runs subprocess in a pty"""
                #
                # Note: In theory the right answer here is to subclass Popen,
                # but we found that in practice we'd have to reimplement most
                # of that class, because its handling of file descriptors is
                # too brittle in its _execute_child() code.
                #
                def __drain(masterf, outlist):
                        # Use a list as a way to pass by reference
                        while True:
                                chunksz = 1024
                                termdata = masterf.read(chunksz)
                                outlist.append(termdata)
                                if len(termdata) < chunksz:
                                        # assume we hit EOF
                                        break

                # This is the arg handling protocol from Popen
                if isinstance(args, six.string_types):
                        args = [args]
                else:
                        args = list(args)

                if shell:
                        args = ["/bin/sh", "-c"] + args
                        if executable:
                                args[0] = executable

                if executable is None:
                        executable = args[0]

                pid,fd = pty.fork()
                if pid == 0:
                        try:
                                # Child
                                if env is None:
                                        os.execvp(executable, args)
                                else:
                                        os.execvpe(executable, args, env)
                        except:
                                traceback.print_exc()
                                os._exit(99)
                else:
                        masterf = os.fdopen(fd, "rb")
                        outlist = []
                        t = threading.Thread(target=__drain,
                            args=(masterf, outlist))
                        t.start()
                        waitedpid, retcode = os.waitpid(pid, 0)
                        retcode = retcode >> 8
                        t.join()
                return retcode, b"".join(outlist)
pkg5unittest.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def fakeroot_create():

        test_root = os.path.join(g_tempdir, "ips.test.{0:d}".format(os.getpid()))
        fakeroot = os.path.join(test_root, "fakeroot")
        cmd_path = os.path.join(fakeroot, "pkg")

        try:
                os.stat(cmd_path)
        except OSError as e:
                pass
        else:
                # fakeroot already exists
                raise RuntimeError("The fakeroot shouldn't already exist.\n"
                    "Path is:{0}".format(cmd_path))

        # when creating the fakeroot we want to make sure pkg doesn't
        # touch the real root.
        env_sanitize(cmd_path)

        #
        # When accessing images via the pkg apis those apis will try
        # to access the image containing the command from which the
        # apis were invoked.  Normally when running the test suite the
        # command is run.py in a developers workspace, and that
        # workspace lives in the root image.  But accessing the root
        # image during a test suite run is verboten.  Hence, here we
        # create a temporary image from which we can run the pkg
        # command.
        #

        # create directories
        mkdir_eexist_ok(test_root)
        mkdir_eexist_ok(fakeroot)

        debug("fakeroot image create {0}".format(fakeroot))
        progtrack = pkg.client.progress.NullProgressTracker()
        api_inst = pkg.client.api.image_create(PKG_CLIENT_NAME,
            CLIENT_API_VERSION, fakeroot,
            pkg.client.api.IMG_TYPE_ENTIRE, False,
            progtrack=progtrack, cmdpath=cmd_path)

        #
        # put a copy of the pkg command in our fake root directory.
        # we do this because when recursive linked image commands are
        # run, the pkg interfaces may fork and exec additional copies
        # of pkg(1), and in this case we want them to run the copy of
        # pkg from the fake root.
        #
        fakeroot_cmdpath = os.path.join(fakeroot, "pkg")
        shutil.copy(os.path.join(g_pkg_path, "usr", "bin", "pkg"),
            fakeroot_cmdpath)

        return fakeroot, fakeroot_cmdpath


问题


面经


文章

微信
公众号

扫码关注公众号