def testHelpMessageForSQLitePlugin(self):
"""test the --help option for SQLite"""
helper = end_to_end_test_helper.EndToEndTestHelper('not needed',
'not needed')
if platform.system() in ['Linux']:
message_help = (
'Usage: main.py sqlite [OPTIONS]\r\n\r\n'
'Options:\r\n '
'--path TEXT The path to plaso\r\n '
'--name TEXT The plugin name\r\n '
'--testfile TEXT The testfile path\r\n '
'--sql / --no-sql The output example flag for the SQL Query for the '
'plugin.\r\n '
'--help Show this message and exit.')
command = 'python {0} sqlite --help'.format(helper.MAIN_PATH)
child = pexpect.spawn(command)
child.expect_exact(message_help)
else:
raise NotImplementedError("test only implemented for linux platform")
python类spawn()的实例源码
test_help_function_of_sqlite.py 文件源码
项目:PlasoScaffolder
作者: ClaudiaSaxer
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_vi_mode(self):
self.write_config('vi = True\n')
bin_path = get_http_prompt_path()
child = pexpect.spawn(bin_path, env=os.environ)
child.expect_exact('http://localhost:8000> ')
# Enter 'htpie', switch to command mode (ESC),
# move two chars left (hh), and insert (i) a 't'
child.send('htpie')
child.send('\x1b')
child.sendline('hhit')
child.expect_exact('http http://localhost:8000')
# Enter 'exit'
child.send('\x1b')
child.send('i')
child.sendline('exit')
child.expect_exact('Goodbye!', timeout=20)
child.close()
def testHelpMessage(self):
"""test the universal --help Option"""
helper = end_to_end_test_helper.EndToEndTestHelper('not needed',
'not needed')
if platform.system() in ['Linux']:
message_help = ('Usage: main.py [OPTIONS] COMMAND [ARGS]...\r\n\r\n'
'Options:\r\n'
' --help Show this message and exit.\r\n\r\n'
'Commands:\r\n'
' sqlite')
command = 'python {0} --help'.format(helper.MAIN_PATH)
child = pexpect.spawn(command)
child.expect_exact(message_help)
else:
raise NotImplementedError("test only implemented for linux platform")
def testCommands(self):
p = pexpect.spawn(self.ravelCmd)
p.expect("ravel>")
p.sendline("help")
p.expect("Commands")
p.sendline("p SELECT * FROM hosts;")
p.expect("hid")
p.sendline("stat")
p.expect("app path")
p.sendline("apps")
p.expect("offline")
p.sendline("exit")
p.sendeof()
time.sleep(1)
# Part 3 - Orchestratioon
def GetGcloud(args, project=None, service=None):
"""Get gcloud command with arguments.
Functionalities might be expanded later to run gcloud commands.
Args:
args: command with arguments as an array
project: the project on which the glcoud compute will work
service: the service on gcloud that you want to use. default: compute
Returns:
returns thr formatted command for gcloud compute
"""
command = ["gcloud"]
if service:
command.append(service)
if project:
command.extend(["--project", project])
command.extend(args)
return command
# TODO(sohamcodes): pexpect.spawn can be changed to subprocess call
# However, timeout for subprocess is available only on python3
# So, we can implement it later.
def RunCommand(args, timeout=None, logfile=None):
"""Runs a given command through pexpect.run.
This function acts as a wrapper over pxpect.run . You can have exception or
return values based on the exitstatus of the command execution. If exitstatus
is not zero, then it will return -1, unless you want RuntimeError. If there
is TIMEOUT, then exception is raised. If events do not match, command's
output is printed, and -1 is returned.
Args:
args: command with arguments as an array
timeout: timeout for pexpect.run .
logfile: an opened filestream to write the output
Raises:
RuntimeError: Command's exit status is not zero
Returns:
Returns -1, if bad exitstatus is not zero and when events do not match
Otherwise returns 0, if everything is fine
"""
child = pexpect.spawn(args[0], args=args[1:], timeout=timeout,
logfile=logfile)
child.expect(pexpect.EOF)
child.close()
if child.exitstatus:
print args
raise RuntimeError(("Error: {}\nProblem running command. "
"Exit status: {}").format(child.before,
child.exitstatus))
return 0
def bash(command="bash"):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh')
child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False,
encoding='utf-8')
# If the user runs 'env', the value of PS1 will be in the output. To avoid
# replwrap seeing that as the next prompt, we'll embed the marker characters
# for invisible characters in the prompt; these show up when inspecting the
# environment variable, but not when bash displays the prompt.
ps1 = PEXPECT_PROMPT[:5] + u'\[\]' + PEXPECT_PROMPT[5:]
ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + PEXPECT_CONTINUATION_PROMPT[5:]
prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
return REPLWrapper(child, u'\$', prompt_change,
extra_init_cmd="export PAGER=cat")
def disconnect(child):
"""
- Disconnects warrior_cli_class session object(pexpect/paramiko)
- Returns session object(same child)
"""
print_warning("This method is obsolete and will be deprecated soon. Please"
" use 'disconnect' method of 'PexpectConnect' class "
"in 'warrior/Framework/ClassUtils/WNetwork/warrior_cli_class.py'")
if isinstance(child, WNetwork.warrior_cli_class.WarriorCli):
child = child.disconnect()
elif isinstance(child, pexpect.spawn):
wc_obj = WNetwork.warrior_cli_class.WarriorCli()
wc_obj.conn_obj = WNetwork.warrior_cli_class.PexpectConnect()
wc_obj.conn_obj.target_host = child
wc_obj.disconnect()
return child
def _send_cmd(obj_session, **kwargs):
"""method to send command based on the type of object """
if isinstance(obj_session, WNetwork.warrior_cli_class.WarriorCli):
result, response = obj_session._send_cmd(**kwargs)
elif isinstance(obj_session, pexpect.spawn):
wc_obj = WNetwork.warrior_cli_class.WarriorCli()
wc_obj.conn_obj = WNetwork.warrior_cli_class.PexpectConnect()
wc_obj.conn_obj.target_host = obj_session
result, response = wc_obj._send_cmd(**kwargs)
elif isinstance(obj_session, ssh_utils_class.SSHComm):
command = kwargs.get('command')
result, response = obj_session.get_response(command)
print_info(response)
else:
print_warning('session object type is not supported')
return result, response
def test_debug_bots(self, env):
# Make sure debug server runs to completion with bots
p = pexpect.spawn(
'dallinger',
['debug', '--verbose', '--bot'],
env=env,
)
p.logfile = sys.stdout
try:
p.expect_exact('Server is running', timeout=300)
p.expect_exact('Recruitment is complete', timeout=600)
p.expect_exact('Experiment completed', timeout=60)
p.expect_exact('Local Heroku process terminated', timeout=10)
finally:
try:
p.sendcontrol('c')
p.read()
except IOError:
pass
def sshcmd(cmd, host, password,):
''' runs single command on host using ssh
'''
p=expect.spawn("ssh " + host)
i0=p.expect([r'.+password:.*'])
if i0 == 0:
p.sendline(password)
i1=p.expect([r'Sorry.+', r'.*'])
if i1==0:
print('Wrongs password')
return 1
else:
print('inside')
p.sendline(cmd)
else:
print('Not expected')
MiraiScanner.py 文件源码
项目:IoTScanner
作者: SecurityInEmergingEnvironments
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def launchMiraiScan(ip):
global status
cmd = 'perl iotScanner.pl %s' % ip
starttime = time.time()
mChild = pexpect.spawn('/bin/bash', ['-c',cmd], timeout = 600)
# i = mChild.expect(['device ' + ip + ': failed to establish TCP connection',])
i = mChild.expect(['failed to establish TCP connection', 'doesnot have any password',
'still has default password', 'has changed password', 'didnot find dev type after trying all devices',
'due to 404 response', 'failed to establish TCP connection', 'http redirect to',
'unexpected status code',
'didnot find devType for', 'unexpected partial url', TERMINAL_PROMPT, COMMAND_PROMPT])
if i == 0:
print('Run time:', round((time.time() - starttime),3))
setStatus('Non-vulnerable')
elif i == 1 or i == 2:
setStatus('Vulnerable')
else:
setStatus('Non-vulnerable')
def xonsh_shell(exe_dir, shell_path):
if ON_WINDOWS:
result = subprocess.run(
[shell_path or 'xonsh', '-i', '-D',
'VIRTUAL_ENV={}'.format(os.path.dirname(exe_dir))],
shell=NEED_SUBPROCESS_SHELL
)
return result.returncode
else:
terminal = pexpect.spawn(
shell_path or 'xonsh',
args=['-i', '-D', 'VIRTUAL_ENV={}'.format(os.path.dirname(exe_dir))],
dimensions=get_terminal_dimensions()
)
def sigwinch_passthrough(sig, data):
terminal.setwinsize(*get_terminal_dimensions())
signal.signal(signal.SIGWINCH, sigwinch_passthrough)
# Just in case pyenv works with xonsh, supersede it.
terminal.sendline('$PATH.insert(0, "{}")'.format(exe_dir))
terminal.interact(escape_character=None)
terminal.close()
return terminal.exitstatus
def start_MAVProxy_SITL(atype, aircraft=None, setup=False, master='tcp:127.0.0.1:5760',
options=None, logfile=sys.stdout):
"""Launch mavproxy connected to a SITL instance."""
import pexpect
global close_list
MAVPROXY = os.getenv('MAVPROXY_CMD', 'mavproxy.py')
cmd = MAVPROXY + ' --master=%s --out=127.0.0.1:14550' % master
if setup:
cmd += ' --setup'
if aircraft is None:
aircraft = 'test.%s' % atype
cmd += ' --aircraft=%s' % aircraft
if options is not None:
cmd += ' ' + options
ret = pexpect.spawn(cmd, logfile=logfile, encoding=ENCODING, timeout=60)
ret.delaybeforesend = 0
pexpect_autoclose(ret)
return ret
def behind_source(org_config_filename, verbose=False, dry_run=False):
command = 'emacs'
args = ['-batch',
'-l', org_config_filename,
'-l', 'ts-org-interaction.el',
'--eval=(ts-repl)']
spawn = pexpect.spawn(command, args, encoding='utf-8')
emacs_repl_wrapper = pexpect.replwrap.REPLWrapper(
spawn, "Lisp expression: ", None)
if dry_run:
o.DryRunSource.make_fn = make_fn
source = o.DryRunSource.from_emacs_repl(emacs_repl_wrapper,
verbose)
else:
o.Source.make_fn = make_fn
source = o.Source.from_emacs_repl(emacs_repl_wrapper, verbose)
source.get_tree = lambda: source.get_all_items(
['asana_id', 'asana_project_id'])
return source
def bash(command="bash"):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh')
child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False,
encoding='utf-8')
# If the user runs 'env', the value of PS1 will be in the output. To avoid
# replwrap seeing that as the next prompt, we'll embed the marker characters
# for invisible characters in the prompt; these show up when inspecting the
# environment variable, but not when bash displays the prompt.
ps1 = PEXPECT_PROMPT[:5] + u'\[\]' + PEXPECT_PROMPT[5:]
ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + PEXPECT_CONTINUATION_PROMPT[5:]
prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
return REPLWrapper(child, u'\$', prompt_change,
extra_init_cmd="export PAGER=cat")
def _play(self):
self.track_current = 0
self.track_percent = 0
self.loading = True
url = self.track.url
self.playing = pexpect.spawn(
'mpv',
["--no-ytdl", "%s" % url],
timeout=None
)
self.active = True
self.loading = False
thread = threading.Thread(target=self._monitor, args=(self.playing, self))
thread.daemon = True
thread.start()
def change_password(old=None, new=None):
if old is None:
old, new = get_passes()
p = pexpect.spawn('passwd')
p.expect('password')
p.sendline(old)
outcome = p.expect(['New', 'incorrect', 'error'])
p.sendline(new)
try:
outcome = p.expect('ew password:', timeout=1)
if p.match is None:
print p.buffer, 'new password'
else:
p.sendline(new)
outcome = p.expect(['success'] , timeout=1)
if p.match is not None:
return old, new
except:
print p.buffer, 'top level'
return False
def tar2db(firmware_id):
"""Populates the db with information related to the filesystem."""
print(bcolors.OKBLUE + "[-] Writing filesystem information into database..." + bcolors.ENDC)
command = TAR2DB_COMMAND.format(FIRMADYNE_PATH, firmware_id, OUTPUT_DIR)
print(bcolors.ITALIC + command + bcolors.ENDC)
tar2db = pexpect.spawn(command)
# either an error is raised because keys already exist
# which means the info has already been written in the db
# or the command terminates properly
index = tar2db.expect(['Key.*already exists', 'No such file or directory: .*\n', pexpect.EOF])
if index == 0:
print(bcolors.WARNING + "[!] This step was already performed earlier..." + bcolors.ENDC)
return True
elif index == 1:
missing_file = tar2db.after.split('\n')[0].split(':')[1].strip()
print(bcolors.FAIL + "[!] The file {} does not exist...".format(missing_file) + bcolors.ENDC)
return False
else:
print(bcolors.OKGREEN + "[+] Filesystem information successfully written!" + bcolors.ENDC)
return True
def network_setup(firmware_id, arch):
"""Determines the network configuration of the firmware by emulating it for a certain amount of time."""
runtime = 60
print(bcolors.OKBLUE + "[-] Determining the network configuration of the firmware..." + bcolors.ENDC)
print(bcolors.OKBLUE + "[-] The firmware will now be running for {} seconds...".format(runtime) + bcolors.ENDC)
command = INFERNETWORK_COMMAND.format(FIRMADYNE_PATH, firmware_id) + " " + arch
print(bcolors.ITALIC + command + bcolors.ENDC)
setup = pexpect.spawn(command, timeout=runtime + 5)
# info should be provided as regards the interfaces available to access the emulated firmware
setup.expect('Interfaces: \[(.*)\]\n')
if setup.match.group(1) == "":
print(bcolors.WARNING + "[!] No network interface could be determined..." + bcolors.ENDC)
return False
else:
print(bcolors.OKGREEN + "[+] Your firmware will be accessible at {}!".format(setup.match.group(1)) + bcolors.ENDC)
return True
def _bash_repl(command="bash", remove_ansi=True):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
# `repl_bashrc.sh` suppresses user-defined PS1
bashrc = os.path.join(os.path.dirname(__file__), 'repl_bashrc.sh')
child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False,
encoding='utf-8')
# If the user runs 'env', the value of PS1 will be in the output. To avoid
# replwrap seeing that as the next prompt, we'll embed the marker characters
# for invisible characters in the prompt; these show up when inspecting the
# environment variable, but not when bash displays the prompt.
ps1 = PEXPECT_PROMPT[:5] + u'\[\]' + PEXPECT_PROMPT[5:]
ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + PEXPECT_CONTINUATION_PROMPT[5:]
prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
return REPLWrapper(child, u'\$', prompt_change,
remove_ansi=remove_ansi,
extra_init_cmd="export PAGER=cat")
def connectToUserWifi():
try:
child = pexpect.spawn('sudo wifi autoconnect')
child.expect('Connecting *')
time.sleep(20)
except:
response = requests.get('http://joedye.me/pi-order/config').json()
response = json.loads(response)
deleteUserWifiIfPresent()
child = pexpect.spawn('sudo wifi connect --ad-hoc ' + response['wifi-network'])
child.expect('passkey>')
child.sendline(str(response['wifi-password']))
childForAdd = pexpect.spawn('sudo wifi add userWifi '+ response['wifi-network'])
childForAdd.expect('passkey>')
childForAdd.sendline(str(response['wifi-password']))
time.sleep(30)
#print(response['wifi-network'])
#print(response['wifi-password'])
def connect(self):
self.con = pexpect.spawn('/usr/bin/env bash')
for hop in self.hops:
self.con.sendline('ssh -tt {host}'.format(host = hop))
self.hostname = hop
if self.target_host:
self.con.sendline('ssh -tt {host}'.format(host = self.target_host))
self.hostname = self.target_host
self.con.sendline(self.ps1_export_cmd)
self.con.expect(self.ps1_re)
#all_sessions.append(self)
return self
def run(command, fail_on_error=False, interactively=False):
child = pexpect.spawn('/bin/sh', ['-c', command], echo=False)
child.logfile_read = StringIO.StringIO()
child.logfile_send = StringIO.StringIO()
if not interactively:
child.expect(pexpect.EOF)
if fail_on_error and child.exitstatus > 0:
raise Exception(
'%s (exit code %s)' % (
child.logfile_read.getvalue(),
child.exitstatus
)
)
return {
'child': child,
}
def main ():
while True:
ps = pexpect.spawn ('ps')
time.sleep (1)
index = ps.expect (['/usr/bin/ssh', pexpect.EOF, pexpect.TIMEOUT])
if index == 2:
print('TIMEOUT in ps command...')
print(str(ps))
time.sleep (13)
if index == 1:
print(time.asctime(), end=' ')
print('restarting tunnel')
start_tunnel ()
time.sleep (11)
print('tunnel OK')
else:
# print 'tunnel OK'
time.sleep (7)
def clear_bond(self, address=None):
"""Use the 'bluetoothctl' program to erase a stored BLE bond.
"""
con = pexpect.spawn('sudo bluetoothctl')
con.expect("bluetooth", timeout=1)
log.info("Clearing bond for %s", address)
con.sendline("remove " + address.upper())
try:
con.expect(
["Device has been removed", "# "],
timeout=.5
)
except pexpect.TIMEOUT:
log.error("Unable to remove bonds for %s: %s",
address, con.before)
log.info("Removed bonds for %s", address)
def run_interactive_task(cmd, logger, msg):
"""Run a task interactively and log when started.
Run given task using ``pexpect.spawn``. Log the command used.
Performs neither validation of the process - if the process
successfully started or is still running - nor killing of the
process. The user must do both.
:param cmd: Exact command to be executed
:param logger: Logger to write details to
:param msg: Message to be shown to user
:returns: ``pexpect.child`` object
"""
logger.info(msg)
logger.debug('%s%s', CMD_PREFIX, cmd)
child = pexpect.spawnu(cmd)
if settings.getValue('VERBOSITY') == 'debug':
child.logfile_read = sys.stdout
return child
def test_place_match(place):
with pexpect.spawn('python -m labgrid.remote.client -p test add-match e1/g1/r1 e2/g2/*') as spawn:
spawn.expect(pexpect.EOF)
spawn.close()
assert spawn.exitstatus == 0
with pexpect.spawn('python -m labgrid.remote.client -p test del-match e1/g1/r1') as spawn:
spawn.expect(pexpect.EOF)
spawn.close()
assert spawn.exitstatus == 0
with pexpect.spawn('python -m labgrid.remote.client -p test show') as spawn:
spawn.expect(" matches:")
spawn.expect(" e2/g2/*")
spawn.expect(pexpect.EOF)
spawn.close()
assert spawn.exitstatus == 0
def test_autoinstall_simple(tmpdir):
c = tmpdir.join("config.yaml")
c.write("""
targets:
test1:
resources: {}
drivers: {}
autoinstall:
handler: |
print("handler-test-output")
""")
with pexpect.spawn('python -m labgrid.autoinstall.main --once {}'.format(c)) as spawn:
spawn.expect("handler-test-output")
spawn.expect(pexpect.EOF)
spawn.close()
assert spawn.exitstatus == 0
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change,
new_prompt=PEXPECT_PROMPT,
continuation_prompt=PEXPECT_CONTINUATION_PROMPT):
if isinstance(cmd_or_spawn, str):
self.child = pexpect.spawnu(cmd_or_spawn, echo=False)
else:
self.child = cmd_or_spawn
if self.child.echo:
# Existing spawn instance has echo enabled, disable it
# to prevent our input from being repeated to output.
self.child.setecho(False)
self.child.waitnoecho()
if prompt_change is None:
self.prompt = orig_prompt
else:
self.set_prompt(orig_prompt,
prompt_change.format(new_prompt, continuation_prompt))
self.prompt = new_prompt
self.continuation_prompt = continuation_prompt
self._expect_prompt()