def main():
fname = sys.argv[1]
key_old = sys.argv[2]
key_new = sys.argv[3]
params = np.load(fname)
new_params = OrderedDict()
flag = False
for k, v in params.iteritems():
if k == key_old:
k = key_new
flag = True
new_params[k] = v
if flag:
print '[info] %s found in %s'%(key_old, fname)
commands.getoutput('rm -f %s'%(fname))
np.savez(fname, **new_params)
else:
print '[info] %s not found in %s'%(key_old, fname)
python类getoutput()的实例源码
def my_cmd(self, query, flag):
rst = commands.getoutput(query)
print query
rst = ''.join(rst)
rst = rst.split('\r')[-1]
po = rst.split('\n')[:-1]
# print po
# print '\n\nsystem output 1 = \n' + rst
# print po
# print 'neighbours1', neighbours1
if flag:
def p (x): return [x.split(' ')[1], x.split(' ')[2]]
else :
def p (x): return [x.split(' ')[1], x.split(' ')[0]]
result = map(p, po)
# print 'result = '
# print result
return result
def get_dupreport_byins(insname):
flag = True
pc = prpcrypt()
for a in insname.db_name_set.all():
for i in a.db_account_set.all():
if i.role == 'admin':
tar_username = i.user
tar_passwd = pc.decrypt(i.passwd)
flag = False
break
if flag == False:
break
if vars().has_key('tar_username'):
cmd = incept.pttool_path + '/pt-duplicate-key-checker' + ' -u %s -p %s -P %d -h %s ' % (tar_username, tar_passwd, int(insname.port), insname.ip)
dup_result = commands.getoutput(cmd)
return dup_result
def get_diff(dbtag1,tb1,dbtag2,tb2):
if os.path.isfile(path_mysqldiff) :
tar_host1, tar_port1, tar_username1, tar_passwd1,tar_dbname1 = get_conn_info(dbtag1)
tar_host2, tar_port2, tar_username2, tar_passwd2,tar_dbname2 = get_conn_info(dbtag2)
server1 = ' -q --server1={}:{}@{}:{}'.format(tar_username1,tar_passwd1,tar_host1,str(tar_port1))
server2 = ' --server2={}:{}@{}:{}'.format(tar_username2,tar_passwd2,tar_host2,str(tar_port2))
option = ' --difftype=sql'
table = ' {}.{}:{}.{}'.format(tar_dbname1,tb1,tar_dbname2,tb2)
cmd = path_mysqldiff + server1 + server2 + option + table
output = os.popen(cmd)
result = output.read()
# result = commands.getoutput(cmd)
else :
result = "mysqldiff not installed"
return result
check_node_status.py 文件源码
项目:Cassandra-Automation-Scripts
作者: c446984928
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def check_node_status():
try:
b = commands.getoutput(PATH)
logging.info(b)
if "DN" in b or "Status=Up/Down" not in b:
if read_flag() == 'normal':
write_flag('abnormal')
send_mail_tool(b)
elif read_flag() == 'abnormal':
pass
else:
write_flag('abnormal')
send_mail_tool(b)
else:
write_flag('normal')
except:
send_mail_tool("Check node status error!")
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
def discoverhostparams():
command_host_name = utils.hostnameCmdPath.cmd
result_dict = {}
resultString = ""
hostname = commands.getoutput(command_host_name)
result_dict['hostname'] = hostname
resultString = json.dumps(result_dict)
print resultString
sys.exit(utils.PluginStatusCode.OK)
###
# This plugin discovers all the host specific parameters
# Currently it gets only the hostname from the node
# but when we add support for discovering physical
# components like cpu,network,disk etc, all those will be
# addded as part of this module
###
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
try:
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
except Exception as exception:
print(exception.output)
test_04_ApplicationRegistrar.py 文件源码
项目:core-framework
作者: RedhawkSDR
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def pidExists(pid):
process_listing = commands.getoutput('ls /proc').split('\n')
return str(pid) in process_listing
def pidExists(pid):
process_listing = commands.getoutput('ls /proc').split('\n')
return str(pid) in process_listing
# This test suite requires log4cxx support because it checks the domain's log
# output
def test_OrphanProcesses(self):
"""
Tests that all child processes associated with a component get
terminated with that component.
"""
nb, domMgr = self.launchDomainManager()
self.assertNotEqual(domMgr, None)
nb, devMgr = self.launchDeviceManager('/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml')
self.assertNotEqual(devMgr, None)
domMgr.installApplication('/waveforms/orphaned_child/orphaned_child.sad.xml')
appFact = domMgr._get_applicationFactories()[0]
self.assertEqual(appFact._get_name(), 'orphaned_child')
app = appFact.create(appFact._get_name(), [], [])
pid = app._get_componentProcessIds()[0].processId
children = [int(line) for line in commands.getoutput('ps --ppid %d --no-headers -o pid' % (pid,)).split()]
app.releaseObject()
orphans = 0
for pid in children:
try:
os.kill(pid, 9)
orphans += 1
except OSError:
pass
self.assertEqual(orphans, 0)
def test_CppGppOrphanProcesses(self):
"""
Tests that all child processes associated with a component get
terminated with that component.
"""
nb, domMgr = self.launchDomainManager()
self.assertNotEqual(domMgr, None)
nb, devMgr = self.launchDeviceManager('/nodes/test_GPP_node/DeviceManager.dcd.xml')
self.assertNotEqual(devMgr, None)
domMgr.installApplication('/waveforms/orphaned_child/orphaned_child.sad.xml')
appFact = domMgr._get_applicationFactories()[0]
self.assertEqual(appFact._get_name(), 'orphaned_child')
app = appFact.create(appFact._get_name(), [], [])
pid = app._get_componentProcessIds()[0].processId
children = [int(line) for line in commands.getoutput('ps --ppid %d --no-headers -o pid' % (pid,)).split()]
app.releaseObject()
orphans = 0
for pid in children:
try:
os.kill(pid, 9)
orphans += 1
except OSError:
pass
self.assertEqual(orphans, 0)
def pidExists(pid):
process_listing = commands.getoutput('ls /proc').split('\n')
return str(pid) in process_listing
def getUnitTestFiles(rootpath, testFileGlob="test_*.py"):
rootpath = os.path.normpath(rootpath) + "/"
print "Searching for files in %s with prefix %s" % (rootpath, testFileGlob)
test_files = commands.getoutput("find %s -name '%s'" % (rootpath, testFileGlob))
files = test_files.split('\n')
if files == ['']:
files = []
files.sort()
return files
def uuidgen():
return commands.getoutput('uuidgen')
def terminateChildrenPidOnly(self, pid, signals=(signal.SIGINT, signal.SIGTERM)):
ls = commands.getoutput('ls /proc')
entries = ls.split('\n')
for entry in entries:
filename = '/proc/'+entry+'/status'
try:
fp = open(filename,'r')
stuff = fp.readlines()
fp.close()
except:
continue
ret = ''
for line in stuff:
if 'PPid' in line:
ret=line
break
if ret != '':
parentPid = ret.split('\t')[-1][:-1]
if parentPid == pid:
self.terminateChildrenPidOnly(entry, signals)
filename = '/proc/'+pid+'/status'
for sig in signals:
try:
os.kill(int(pid), sig)
except:
continue
done = False
attemptCount = 0
while not done:
try:
fp = open(filename,'r')
fp.close()
attemptCount += 1
if attemptCount == 10:
break
time.sleep(0.1)
except:
done = True
if not done:
continue
def uuidgen():
return commands.getoutput('uuidgen')
# Finds SDR root directory
def uuid1(node=None, clock_seq=None):
"""
Generate a UUID from a host ID, sequence number, and the cuurent time.
The 'node' and 'clock_seq' arguments are ignored.
Attempt to use 'uuidgen'
Attempt to use 'uuid' (for debian)
"""
(result, output) = _commands.getstatusoutput('uuidgen -t')
if (result == 0): return output
return _commands.getoutput('uuid -v 1 -m')
def uuid4():
"""
Generate a random UUID.
Attempt to use 'uuidgen'
Attempt to use 'uuid' (for debian)
"""
(result, output) = _commands.getstatusoutput('uuidgen -r')
if (result == 0): return output
return _commands.getoutput('uuid -v 4 -m')
def add_key_to_database(self):
aircrack_log = config_dir + 'aircrack-log.txt'
# read cracked key
key = commands.getoutput("cat " + aircrack_log + " | grep 'KEY FOUND' | tr '[]' '\n' | egrep '([a-fA-F0-9]:)+' | tr -d ' \t'")
# insert a row in the database
self.table_database.insertRow(0)
item=QtGui.QTableWidgetItem()
item.setText(essid)
self.table_database.setItem(0, 0, item)
item=QtGui.QTableWidgetItem()
item.setText(self.ac)
self.table_database.setItem(0, 1, item)
item=QtGui.QTableWidgetItem()
item.setText(self.canale)
self.table_database.setItem(0, 2, item)
item=QtGui.QTableWidgetItem()
item.setText(key)
self.table_database.setItem(0, 3, item)
item=QtGui.QTableWidgetItem()
item.setText((key_to_ascii(key)))
self.table_database.setItem(0, 4, item)
#
# Database changed
#