def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput('cat ' + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
python类getstatusoutput()的实例源码
def __init__(self):
"""
Determines whether pkg-config exists on this machine.
"""
if sys.platform == 'win32':
self.has_pkgconfig = False
else:
try:
self.pkg_config = os.environ['PKG_CONFIG']
except KeyError:
self.pkg_config = 'pkg-config'
self.set_pkgconfig_path()
status, output = getstatusoutput(self.pkg_config + " --help")
self.has_pkgconfig = (status == 0)
if not self.has_pkgconfig:
print("IMPORTANT WARNING:")
print(
" pkg-config is not installed.\n"
" matplotlib may not be able to find some of its dependencies")
def check(self):
if sys.platform == 'win32':
check_include_file(get_include_dirs(), 'ft2build.h', 'freetype')
return 'Using unknown version found on system.'
status, output = getstatusoutput("freetype-config --ftversion")
if status == 0:
version = output
else:
version = None
# Early versions of freetype grep badly inside freetype-config,
# so catch those cases. (tested with 2.5.3).
if version is None or 'No such file or directory\ngrep:' in version:
version = self.version_from_header()
# pkg_config returns the libtool version rather than the
# freetype version so we need to explicitly pass the version
# to _check_for_pkg_config
return self._check_for_pkg_config(
'freetype2', 'ft2build.h',
min_version='2.3', version=version)
def check(self):
if sys.platform == 'win32':
check_include_file(get_include_dirs(), 'png.h', 'png')
return 'Using unknown version found on system.'
status, output = getstatusoutput("libpng-config --version")
if status == 0:
version = output
else:
version = None
try:
return self._check_for_pkg_config(
'libpng', 'png.h',
min_version='1.2', version=version)
except CheckFailed as e:
if has_include_file(get_include_dirs(), 'png.h'):
return str(e) + ' Using unknown version found on system.'
raise
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput('cat ' + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def exec2():
child.expect('#')
if subprocess.getstatusoutput('id root >> /dev/null 2&1 && echo $?') != 0:
__newpasswd = 'edong&1310'
subprocess.getstatusoutput('useradd zach')
run('passwd zach',events={'(?i)password:':__newpasswd})
#TODO run EQUAL TO FOLLOW COMMIT!
'''
child.expect('password:')
child.sendline()
child.expect('password:')
child.sendline(__newpasswd)
'''
child.expect('#')
child.sendline('su - zach')
child.expect('$')
child.sendline('whomai')
def crawl():
count = 0
make_dir('./log')
while True:
count+=1
status, res = subprocess.getstatusoutput('scrapy crawl news')
if status == 0:
print(res)
else:
print('crawl failed {}'.format(res))
for file in os.listdir(os.getcwd()):
if os.path.isfile(file) and 'res_' in file:
with open(file,'r') as fobj:
try:
res = json.load(fobj)
except Exception as e:
print(e)
res = None
if res:
shutil.copy(file,'./log/{}'.format(file))
insert_value(res)
print(res)
os.remove(file)
print('loop {} finished'.format(count))
time.sleep(60*30)
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput(
("type " if mswindows else "cat ") + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def create_backup_directory(self):
"""
Creating timestamped backup directory.
:return: Newly created backup directory or Error.
"""
new_backup_dir = join(self.backupdir,
datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
try:
# Creating backup directory
makedirs(new_backup_dir)
# Changing owner
chown_command = "chown mysql:mysql %s" % new_backup_dir
status, output = subprocess.getstatusoutput(chown_command)
if status == 0:
return new_backup_dir
else:
print("Could not change owner of backup directory!")
except Exception as err:
print("Something went wrong in create_backup_directory(): {}".format(err))
def backup_tables(input_date, backup_table_list):
"""????????:??"""
"""DELTA??? DROP ??? ALTER TABLE ??? ALTER ? RENAME ??,????????"""
for table in backup_table_list:
print("backup table %s" %table)
schema, tablename = table.split('.')
backup_path = config.backup_path.format(date=input_date)+table+".ddl.bak"
print(backup_path)
if os.path.exists(backup_path):
print("backup exists %s" %table)
else:
cmd = "db2look -d {edwdb} -i {edwuser} -w {edwpwd} -z {schema} -e -t {tablename} -nofed -o /etl/etldata/script/yatop_update/{date}/backup/{table}.ddl.bak".format(edwdb=config.edwdb, edwuser=config.edwuser, edwpwd=config.edwpwd, schema=schema,tablename=tablename,date=input_date,table=table)
status, output = subprocess.getstatusoutput(cmd)
if status:
print("\033[1;31;40mcreate ddl error %s\033[0m" %table)
print(output)
sys.exit(-1)
def backup_schedule(input_date):
"""??????JOB_METADATA,? JOB_SEQ"""
for table in ["JOB_METADATA", "JOB_SEQ"]:
if table == "JOB_METADATA":
path = config.job_metadata_path
elif table == "JOB_SEQ":
path = config.job_seq_path
if os.path.exists(path.format(date=input_date)):
print("backup exists %s" %table)
else:
print("export %s..." %table)
cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "export to /etl/etldata/script/yatop_update/{date}/backup/{table}.del of del select * from ETL.{table}"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date, table=table)
print(cmd)
status, output = subprocess.getstatusoutput(cmd)
if status:
print("\033[1;31;40mexport %s error\033[0m" % table)
print(output)
return -1
return 0
def load_schedule(input_date):
"""LOAD JOB_METADATA,? JOB_SEQ"""
print("load JOB_METADATA...")
cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "load from /etl/etldata/script/yatop_update/{date}/backup/JOB_METADATA.del of del modified by identityoverride replace into ETL.JOB_METADATA"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date)
print(cmd)
status, output = subprocess.getstatusoutput(cmd)
if status:
print("\033[1;31;40mload JOB_METADATA error, cat JOB_METADATA.error see detail \033[0m")
with open('JOB_METADATA.error','w') as f:
f.write(output)
return -1
print("load JOB_SEQ...")
cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "load from /etl/etldata/script/yatop_update/{date}/backup/JOB_SEQ.del of del replace into ETL.JOB_SEQ"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date)
print(cmd)
status, output = subprocess.getstatusoutput(cmd)
if status:
print("\033[1;31;40mload JOB_SEQ error, cat JOB_SEQ.error see detail \033[0m")
with open('JOB_SEQ.error','w') as f:
f.write(output)
return -1
return 0
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
(0, 'xyzzy'))
# we use mkdtemp in the next line to create an empty directory
# under our exclusive control; from that, we can invent a pathname
# that we _know_ won't exist. This is guaranteed to fail.
dir = None
try:
dir = tempfile.mkdtemp()
name = os.path.join(dir, "foo")
status, output = subprocess.getstatusoutput(
("type " if mswindows else "cat ") + name)
self.assertNotEqual(status, 0)
finally:
if dir is not None:
os.rmdir(dir)
def detect_ipmi():
# XXX: andreserl 2013-04-09 bug=1064527: Try to detect if node
# is a Virtual Machine. If it is, do not try to detect IPMI.
with open('/proc/cpuinfo', 'r') as cpuinfo:
for line in cpuinfo:
if line.startswith('model name') and 'QEMU' in line:
return (False, None)
(status, output) = subprocess.getstatusoutput('ipmi-locate')
show_re = re.compile('(IPMI\ Version:) (\d\.\d)')
res = show_re.search(output)
if res is None:
found = glob.glob("/dev/ipmi[0-9]")
if len(found):
return (True, "UNKNOWN: %s" % " ".join(found))
return (False, "")
# We've detected IPMI, but it doesn't necessarily mean we can access
# the BMC. Let's test if we can.
cmd = 'bmc-config --checkout --key-pair=Lan_Conf:IP_Address_Source'
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
return (False, "")
return (True, res.group(2))
def _get_disk_info(self):
"""
??????????
:return: list, ?????????????
"""
status, output = getstatusoutput("df -h")
disk_info_list = []
if not status:
lines = output.split('\n')
for line in lines[1:]:
line_sp = line.split()
line_sp = line_sp[:5] + [line_sp[-1]]
if '/dev/' in line_sp[0]:
disk_info_list.append(self.DiskInfo(*line_sp))
return disk_info_list
def log_results(clf_ner, description, filen='', subf=''):
import os
if not os.path.exists('data/conll2003_results'):
os.mkdir('data/conll2003_results')
if not os.path.exists('data/conll2003_results%s' % subf):
os.mkdir('data/conll2003_results%s' % subf)
import subprocess
print("applying to training set")
apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.train', 'data/conll2003_results%s/eng.out_train.txt' % subf)
print("applying to test set")
apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.testa', 'data/conll2003_results%s/eng.out_testa.txt' % subf)
apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.testb', 'data/conll2003_results%s/eng.out_testb.txt' % subf)
# write out results
with open('data/conll2003_results/output_all_%s.txt' % filen, 'a') as f:
f.write('%s\n' % description)
f.write('results on training data\n')
out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_train.txt' % subf)[1]
f.write(out)
f.write('\n')
f.write('results on testa\n')
out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_testa.txt' % subf)[1]
f.write(out)
f.write('\n')
f.write('results on testb\n')
out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_testb.txt' % subf)[1]
f.write(out)
f.write('\n')
f.write('\n')
def _get_nix_font_path(self, name, style):
try:
from commands import getstatusoutput
except ImportError:
from subprocess import getstatusoutput
exit, out = getstatusoutput('fc-list "%s:style=%s" file' %
(name, style))
if not exit:
lines = out.splitlines()
if lines:
path = lines[0].strip().strip(':')
return path
def expandName(fn):
# Many of the files we'll be testing are executables, and therefore
# looked up on the path. Try to look it up there (use the shell).
# Note: This uses a deprecated interface, but boy is it handy.
# If the returned status is non-zero, the lookup did not succeed.
(status, result) = subprocess.getstatusoutput('which ' + fn)
if status == 0:
fn = result
# The file may be a link, or the path may contain one...
return os.path.realpath(fn)
def getDLLs(self):
if not self.effectivelyReadable():
return []
# Uses the system's ldd command to get all the supporting libraries.
# Note: This uses a deprecated interface, but boy is it handy.
(status, result) = subprocess.getstatusoutput('ldd ' + self.name)
if status != 0:
return []
parts = result.split('\n')
processed = []
for n in range(len(parts)):
# One dll at a time. Prune the address from the right.
# If there is a path, it is the second element, use it.
# If that's empty, use the first element.
if 'ldd: warning:' in parts[n]:
continue
names = parts[n].split(' (')[0]
names = names.split('=>')
if len(names) > 1:
name = names[1].strip()
else:
name = ''
if name == '':
name = names[0].strip()
processed.append(name)
return processed
def getoutput(cmd, successful_status=(0,), stacklevel=1):
try:
status, output = getstatusoutput(cmd)
except EnvironmentError:
e = get_exception()
warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
return False, output
if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
return True, output
return False, output
def intltool_version():
'''
Return the version of intltool as a tuple.
'''
import subprocess
if sys.platform == 'win32':
cmd = ["perl", "-e print qx(intltool-update --version) =~ m/(\d+.\d+.\d+)/;"]
try:
ver, ret = subprocess.Popen(cmd ,stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True).communicate()
ver = ver.decode("utf-8")
if ver > "":
version_str = ver
else:
return (0,0,0)
except:
return (0,0,0)
else:
cmd = 'intltool-update --version 2> /dev/null' # pathological case
retcode, version_str = subprocess.getstatusoutput(cmd)
if retcode != 0:
return None
cmd = 'intltool-update --version 2> /dev/null | head -1 | cut -d" " -f3'
retcode, version_str = subprocess.getstatusoutput(cmd)
if retcode != 0: # unlikely but just barely imaginable, so leave it
return None
return tuple([int(num) for num in version_str.split('.')])
def shell_run(text):
import subprocess
val, output = subprocess.getstatusoutput(text)
if not val:
style = 'OUTPUT'
else:
style = 'ERROR'
add_scrollback(output, style)
def get_uwsgi_version():
status, output = subprocess.getstatusoutput(['uwsgi --version'])
return None if status else output
def check_zone(self, zone, contents):
zonefile = self.checkzone_dir + '/' + zone.domain_name
with open(zonefile, 'w') as f:
f.write(contents)
return getstatusoutput("%s %s %s" % (self.checkzone_bin, zone.domain_name, zonefile))
def _get_nix_font_path(self, name, style):
try:
from commands import getstatusoutput
except ImportError:
from subprocess import getstatusoutput
exit, out = getstatusoutput('fc-list "%s:style=%s" file' %
(name, style))
if not exit:
lines = out.splitlines()
if lines:
path = lines[0].strip().strip(':')
return path
def getoutput(cmd, successful_status=(0,), stacklevel=1):
try:
status, output = getstatusoutput(cmd)
except EnvironmentError:
e = get_exception()
warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
return False, output
if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
return True, output
return False, output
def _twitting(self):
url = shorten_url(self.buf[3] if self.buf[3][:8] != "**URL:**" else self.buf[3][9:])
text = self.buf[4] if self.buf[4][:10] != "**Notes:**" else self.buf[4][11:]
if len(text) > TWEET_LIMIT - len(url) - 1 - 3: # one symbol for space, three symbols more
premature_ending = "... "
# FIXME: for some reason twitter counts for three symbols more, than len()
while len(text) > TWEET_LIMIT - len(premature_ending) - len(url) - 3:
text = str.rsplit(text, " ", 1)[0]
twit = "\"" + text + premature_ending + url + "\""
else:
twit = "\"" + text + " " + url + "\""
cmd.getstatusoutput(self.twitter_command + " " + twit)
return twit
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--toc-maker", help="path to ToC making tool")
parser.add_argument("--twitter-poster", default="t update", help="twitter poster command")
parser.add_argument("-t", "--use-twitter", action="store_true")
known_args, unknown_args = parser.parse_known_args()
if not known_args.toc_maker:
known_args.toc_maker = "./gh-md-toc"
if not os.path.isfile(known_args.toc_maker):
s = cmd.getoutput("uname -s").lower()
f = "gh-md-toc.%s.amd64.tgz" % s
URL = "https://github.com/ekalinin/github-markdown-toc.go/releases/download/0.6.0/%s" % f
if not os.path.isfile(f):
if cmd.getstatusoutput("wget %s" % URL)[0] != 0:
raise EnvironmentError("Cannot download toc maker from URL: %s" % URL)
if cmd.getstatusoutput("tar xzf %s" % f)[0] != 0:
raise EnvironmentError("Cannot untar toc maker from file %s" % f)
os.remove(f)
current_permissions = stat.S_IMODE(os.lstat(known_args.toc_maker).st_mode)
os.chmod(known_args.toc_maker, current_permissions & stat.S_IXUSR)
if unknown_args:
filepath = unknown_args[0]
else:
print("You should specify the path for file to work with!")
quit(1)
return known_args, filepath
def get_version(self, package):
"""
Get the version of the package from pkg-config.
"""
if not self.has_pkgconfig:
return None
status, output = getstatusoutput(
self.pkg_config + " %s --modversion" % (package))
if status == 0:
return output
return None
# The PkgConfig class should be used through this singleton
def dynamicinfo():
info = {}
info['sysver'] = subprocess.getstatusoutput("head -1 /etc/issue | awk '{ for(;i++<NF;) \
if ($i==\"\\n\" || $i==\"\l\") continue ; else print $i }'")
#info['hostname'] = subprocess.getstatusoutput("hostname")
#currentproccessnum = "(ps -ef | wc -l) -1"
info['loadavg'] = subprocess.getstatusoutput("more /proc/loadavg | cut -d \" \" -f 1-3")
info['uptime'] = subprocess.getstatusoutput("uptime | cut -d \",\" -f 1")
info['diskusage'] = subprocess.getstatusoutput("df -h | grep ^/dev/* | awk '{print $4,$5}'")
info['ipv4'] = subprocess.getstatusoutput("ip -4 a | grep inet | grep -v \"127.0.0.1\" | cut -d \" \" -f 6,11 | head -1")
return info