def dot2graph(self, dot, format='svg'):
# windows ???????????????????
# ?????? NamedTemporaryFile ?????
with NamedTemporaryFile(delete=False) as dotfile:
dotfile.write(dot)
outfile = NamedTemporaryFile(delete=False)
os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % (
dotfile.name, outfile.name, format))
result = outfile.read()
outfile.close()
os.unlink(dotfile.name)
os.unlink(outfile.name)
return result
python类system()的实例源码
def getMessagePayload(self):
self.logger.debug("Preparing client->device message payload")
salon = -127
try:
salon = read_temp()
except Exception as e:
self.logger.error("error reading local temp")
self.logger.exception(e)
piwnica = -127
relay = 0
try:
os.system("sudo ifconfig eth0 192.168.1.101 netmask 255.255.255.0")
txt = urllib2.urlopen(relay1_addr).read()
lines = string.split(txt, '\n')
piwnica = float(lines[1])
relay = int(lines[0])
except Exception as e:
self.logger.error("error reading data from {0}".format(relay1_addr))
self.logger.exception(e)
payloadDict = {"values":{}}
payloadDict["values"]["relay"] = relay
if salon > -127:
payloadDict["values"]["salon"] = salon
if piwnica > -127:
payloadDict["values"]["piwnica"] = piwnica
payload = json.dumps(payloadDict)
return payload
def get_cpus(node):
with open('/sys/devices/system/node/node%d/cpulist'%node) as f:
line = f.readline().strip()
return _parse_values(line, ',')
def get_distances(node):
with open('/sys/devices/system/node/node%d/distance'%node) as f:
line = f.readline().strip()
return _parse_values(line, ' ')
def is_numactl_available():
try:
return os.system('numactl --hardware > /dev/null 2>&1') == 0
except:
return False
def terminate(self):
"""
Terminate the Domain including the Node(s).
"""
if not self.NodeAlive:
return
# uninstall waveforms
if self.Waveforms != {}:
for waveform_name in self.Waveforms:
#waveform = self.Waveforms.pop(waveform_name)
waveform = self.Waveforms[waveform_name]
waveform.app.releaseObject()
self.Waveforms = {}
# uninstall devices
for device_entry in self.Devices:
if device_entry.reference != None:
device_entry.reference.releaseObject()
# clean up the node
os.system('pkill nodeBooter')
self.NodeAlive = False
dmn_ctx = [CosNaming.NameComponent(self.name,"")]
self.rootContext.unbind(dmn_ctx)
def run():
args = parser.parse_args()
cmds, notes = create_commands(
"a3c",
args.num_workers,
args.remotes,
args.env_id,
args.log_dir,
mode=args.mode,
hparams=hparams.get_hparams(args, ignore_default=True))
if args.dry_run:
print("Dry-run mode due to -n flag, otherwise the following commands would be executed:")
else:
print("Executing the following commands:")
print("\n".join(cmds))
print("")
if not args.dry_run:
if args.mode == "tmux":
os.environ["TMUX"] = ""
os.system("\n".join(cmds))
print('\n'.join(notes))
def run():
args = parser.parse_args()
cmds, notes = create_commands(
"a3c",
args.num_workers,
args.remotes,
args.env_id,
args.log_dir,
mode=args.mode,
hparams=hparams.get_hparams(args, ignore_default=True))
if args.dry_run:
print("Dry-run mode due to -n flag, otherwise the following commands would be executed:")
else:
print("Executing the following commands:")
print("\n".join(cmds))
print("")
if not args.dry_run:
if args.mode == "tmux":
os.environ["TMUX"] = ""
os.system("\n".join(cmds))
print('\n'.join(notes))
def exceptionhandler(exctype, value, traceback):
if exctype == IndexError:
parser.print_usage()
else:
sys.__excepthook__(exctype, value, traceback)
# Set the system exception handler to the above definition.
def check_verifications_done_before_modifying_system(script):
"""
Check if verifications are done before modifying the system
"""
ok = True
modify_cmd = ''
cmds = ("cp", "mkdir", "rm", "chown", "chmod", "apt-get", "apt", "service",
"find", "sed", "mysql", "swapon", "mount", "dd", "mkswap", "useradd")
cmds_before_exit = []
is_exit = False
for cmd in script:
if "ynh_die" == cmd or "exit " == cmd:
is_exit = True
break
cmds_before_exit.append(cmd)
if not is_exit:
return
for cmd in cmds_before_exit:
if "ynh_die" == cmd or "exit " == cmd:
break
if not ok or cmd in cmds:
modify_cmd = cmd
ok = False
break
if not ok:
print_wrong("[YEP-2.4] 'ynh_die' or 'exit' command is executed with system modification before (cmd '%s').\n"
"This system modification is an issue if a verification exit the script.\n"
"You should move this verification before any system modification." % (modify_cmd) , False)
def check_set_usage(script_name, script):
present = False
if script_name in ["backup", "remove"]:
present = "ynh_abort_if_errors" in script or "set -eu" in script
else:
present = "ynh_abort_if_errors" in script
if script_name == "remove":
# Remove script shouldn't use set -eu or ynh_abort_if_errors
if present:
print_wrong("[YEP-2.4] set -eu or ynh_abort_if_errors is present. "
"If there is a crash it could put yunohost system in "
"invalidated states. For details, look at "
"https://dev.yunohost.org/issues/419")
else:
if not present:
print_wrong("[YEP-2.4] ynh_abort_if_errors is missing. For details,"
"look at https://dev.yunohost.org/issues/419")
def convert_bedGraph_to_bigWig(self, remove_bedGraph=False):
for profile_id in sorted(self.profiles["Profiles"]):
bedGraph_file = self.path + profile_id + ".bedGraph"
bigWig_file = self.path + profile_id + ".bw"
# TODO: general case - any set of chromosome names/lengths.
# (use self.profiles["Reference"] to generate chrom.sizes)
cmd = self.tool_dir + "bedGraphToBigWig " + bedGraph_file + " " + self.tool_dir + "hg19.chrom.sizes " + bigWig_file
os.system(cmd)
if remove_bedGraph:
os.remove(outs_hp_bedGraph[hp])
# if remove
# for profile_id
# convert_bedGraph_to_bigWig
# class CopyNumberIO
def help():
for dirname in sys.path:
fullname = os.path.join(dirname, 'pdb.doc')
if os.path.exists(fullname):
sts = os.system('${PAGER-more} '+fullname)
if sts: print '*** Pager exit status:', sts
break
else:
print 'Sorry, can\'t find the help file "pdb.doc"',
print 'along the Python search path'
def copy(self, infile, outfile):
return os.system(self.makepipeline(infile, outfile))
def popen2(cmd, mode="t", bufsize=-1):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd'
may be a sequence, in which case arguments will be passed directly to
the program without shell intervention (as with os.spawnv()). If 'cmd'
is a string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdin, child_stdout) are returned."""
import warnings
msg = "os.popen2 is deprecated. Use the subprocess module."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
import subprocess
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
bufsize=bufsize, stdin=PIPE, stdout=PIPE,
close_fds=True)
return p.stdin, p.stdout
def popen3(cmd, mode="t", bufsize=-1):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd'
may be a sequence, in which case arguments will be passed directly to
the program without shell intervention (as with os.spawnv()). If 'cmd'
is a string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdin, child_stdout, child_stderr) are returned."""
import warnings
msg = "os.popen3 is deprecated. Use the subprocess module."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
import subprocess
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
bufsize=bufsize, stdin=PIPE, stdout=PIPE,
stderr=PIPE, close_fds=True)
return p.stdin, p.stdout, p.stderr
def popen4(cmd, mode="t", bufsize=-1):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd'
may be a sequence, in which case arguments will be passed directly to
the program without shell intervention (as with os.spawnv()). If 'cmd'
is a string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdin, child_stdout_stderr) are returned."""
import warnings
msg = "os.popen4 is deprecated. Use the subprocess module."
warnings.warn(msg, DeprecationWarning, stacklevel=2)
import subprocess
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
bufsize=bufsize, stdin=PIPE, stdout=PIPE,
stderr=subprocess.STDOUT, close_fds=True)
return p.stdin, p.stdout
def findmatch(caps, MIMEtype, key='view', filename="/dev/null", plist=[]):
"""Find a match for a mailcap entry.
Return a tuple containing the command line, and the mailcap entry
used; (None, None) if no match is found. This may invoke the
'test' command of several matching entries before deciding which
entry to use.
"""
entries = lookup(caps, MIMEtype, key)
# XXX This code should somehow check for the needsterminal flag.
for e in entries:
if 'test' in e:
test = subst(e['test'], filename, plist)
if test and os.system(test) != 0:
continue
command = subst(e[key], MIMEtype, filename, plist)
return command, e
return None, None
def test():
import sys
caps = getcaps()
if not sys.argv[1:]:
show(caps)
return
for i in range(1, len(sys.argv), 2):
args = sys.argv[i:i+2]
if len(args) < 2:
print "usage: mailcap [MIMEtype file] ..."
return
MIMEtype = args[0]
file = args[1]
command, e = findmatch(caps, MIMEtype, 'view', file)
if not command:
print "No viewer found for", type
else:
print "Executing:", command
sts = os.system(command)
if sts:
print "Exit status:", sts
def __init__(self, cmd, capturestderr=False, bufsize=-1):
"""The parameter 'cmd' is the shell command to execute in a
sub-process. On UNIX, 'cmd' may be a sequence, in which case arguments
will be passed directly to the program without shell intervention (as
with os.spawnv()). If 'cmd' is a string it will be passed to the shell
(as with os.system()). The 'capturestderr' flag, if true, specifies
that the object should capture standard error output of the child
process. The default is false. If the 'bufsize' parameter is
specified, it specifies the size of the I/O buffers to/from the child
process."""
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
if capturestderr:
errout, errin = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
if capturestderr:
os.dup2(errin, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
if capturestderr:
os.close(errin)
self.childerr = os.fdopen(errout, 'r', bufsize)
else:
self.childerr = None
def popen2(cmd, bufsize=-1, mode='t'):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may
be a sequence, in which case arguments will be passed directly to the
program without shell intervention (as with os.spawnv()). If 'cmd' is a
string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdout, child_stdin) are returned."""
w, r = os.popen2(cmd, mode, bufsize)
return r, w
def popen3(cmd, bufsize=-1, mode='t'):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may
be a sequence, in which case arguments will be passed directly to the
program without shell intervention (as with os.spawnv()). If 'cmd' is a
string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdout, child_stdin, child_stderr) are returned."""
w, r, e = os.popen3(cmd, mode, bufsize)
return r, w, e
def popen2(cmd, bufsize=-1, mode='t'):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may
be a sequence, in which case arguments will be passed directly to the
program without shell intervention (as with os.spawnv()). If 'cmd' is a
string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdout, child_stdin) are returned."""
inst = Popen3(cmd, False, bufsize)
return inst.fromchild, inst.tochild
def popen3(cmd, bufsize=-1, mode='t'):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may
be a sequence, in which case arguments will be passed directly to the
program without shell intervention (as with os.spawnv()). If 'cmd' is a
string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdout, child_stdin, child_stderr) are returned."""
inst = Popen3(cmd, True, bufsize)
return inst.fromchild, inst.tochild, inst.childerr
def popen4(cmd, bufsize=-1, mode='t'):
"""Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may
be a sequence, in which case arguments will be passed directly to the
program without shell intervention (as with os.spawnv()). If 'cmd' is a
string it will be passed to the shell (as with os.system()). If
'bufsize' is specified, it sets the buffer size for the I/O pipes. The
file objects (child_stdout_stderr, child_stdin) are returned."""
inst = Popen4(cmd, bufsize)
return inst.fromchild, inst.tochild
def getpager():
"""Decide what method to use for paging through text."""
if type(sys.stdout) is not types.FileType:
return plainpager
if not sys.stdin.isatty() or not sys.stdout.isatty():
return plainpager
if 'PAGER' in os.environ:
if sys.platform == 'win32': # pipes completely broken in Windows
return lambda text: tempfilepager(plain(text), os.environ['PAGER'])
elif os.environ.get('TERM') in ('dumb', 'emacs'):
return lambda text: pipepager(plain(text), os.environ['PAGER'])
else:
return lambda text: pipepager(text, os.environ['PAGER'])
if os.environ.get('TERM') in ('dumb', 'emacs'):
return plainpager
if sys.platform == 'win32' or sys.platform.startswith('os2'):
return lambda text: tempfilepager(plain(text), 'more <')
if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
return lambda text: pipepager(text, 'less')
import tempfile
(fd, filename) = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
return lambda text: pipepager(text, 'more')
else:
return ttypager
finally:
os.unlink(filename)
def tempfilepager(text, cmd):
"""Page through text by invoking a program on a temporary file."""
import tempfile
filename = tempfile.mktemp()
file = open(filename, 'w')
file.write(text)
file.close()
try:
os.system(cmd + ' "' + filename + '"')
finally:
os.unlink(filename)
def __init__(self,cmd,mode='r',bufsize=None):
if mode != 'r':
raise ValueError,'popen()-emulation only supports read mode'
import tempfile
self.tmpfile = tmpfile = tempfile.mktemp()
os.system(cmd + ' > %s' % tmpfile)
self.pipe = open(tmpfile,'rb')
self.bufsize = bufsize
self.mode = mode
def _platform(*args):
""" Helper to format the platform string in a filename
compatible format e.g. "system-version-machine".
"""
# Format the platform string
platform = string.join(
map(string.strip,
filter(len, args)),
'-')
# Cleanup some possible filename obstacles...
replace = string.replace
platform = replace(platform,' ','_')
platform = replace(platform,'/','-')
platform = replace(platform,'\\','-')
platform = replace(platform,':','-')
platform = replace(platform,';','-')
platform = replace(platform,'"','-')
platform = replace(platform,'(','-')
platform = replace(platform,')','-')
# No need to report 'unknown' information...
platform = replace(platform,'unknown','')
# Fold '--'s and remove trailing '-'
while 1:
cleaned = replace(platform,'--','-')
if cleaned == platform:
break
platform = cleaned
while platform[-1] == '-':
platform = platform[:-1]
return platform
def _syscmd_uname(option,default=''):
""" Interface to the system's uname command.
"""
if sys.platform in ('dos','win32','win16','os2'):
# XXX Others too ?
return default
try:
f = os.popen('uname %s 2> %s' % (option, DEV_NULL))
except (AttributeError,os.error):
return default
output = string.strip(f.read())
rc = f.close()
if not output or rc:
return default
else:
return output