def cmd(command):
result = Result()
p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p.communicate()
result.exit_code = p.returncode
result.stdout = stdout
result.stderr = stderr
result.command = command
if p.returncode != 0:
print 'Error executing command [%s]' % command
print 'stderr: [%s]' % stderr
print 'stdout: [%s]' % stdout
return result
python类Popen()的实例源码
def run_command(args, wait=False):
try:
if (wait):
p = subprocess.Popen(
args,
stdout = subprocess.PIPE)
p.wait()
else:
p = subprocess.Popen(
args,
stdin = None, stdout = None, stderr = None, close_fds = True)
(result, error) = p.communicate()
except subprocess.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
return result
def store_revision_info(src_path, output_dir, arg_string):
# Get git hash
gitproc = Popen(['git', 'rev-parse', 'HEAD'], stdout = PIPE, cwd=src_path)
(stdout, _) = gitproc.communicate()
git_hash = stdout.strip()
# Get local changes
gitproc = Popen(['git', 'diff', 'HEAD'], stdout = PIPE, cwd=src_path)
(stdout, _) = gitproc.communicate()
git_diff = stdout.strip()
# Store a text file in the log directory
rev_info_filename = os.path.join(output_dir, 'revision_info.txt')
with open(rev_info_filename, "w") as text_file:
text_file.write('arguments: %s\n--------------------\n' % arg_string)
text_file.write('git hash: %s\n--------------------\n' % git_hash)
text_file.write('%s' % git_diff)
def run_command(command, wait=False):
try:
if (wait):
p = subprocess.Popen(
[command],
stdout = subprocess.PIPE,
shell = True)
p.wait()
else:
p = subprocess.Popen(
[command],
shell = True,
stdin = None, stdout = None, stderr = None, close_fds = True)
(result, error) = p.communicate()
except subprocess.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
return result
def call(*popenargs, **kwargs):
"""Run command with arguments. Wait for command to complete. Sends
output to logging module. The arguments are the same as for the Popen
constructor."""
from subprocess import Popen, PIPE
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
p = Popen(*popenargs, **kwargs)
stdout, stderr = p.communicate()
if stdout:
for line in stdout.strip().split("\n"):
logger.info(line)
if stderr:
for line in stderr.strip().split("\n"):
logger.error(line)
return p.returncode
def mathjax(s):
with open("temp.log", "w") as f:
f.write(s)
p = Popen([app.config['mjpage'],
'--dollars',
'--output', "CommonHTML",
'--fontURL',
("https://cdnjs.cloudflare.com/ajax/libs/"
"mathjax/2.7.0/fonts/HTML-CSS")], stdout=PIPE, stdin=PIPE,
stderr=PIPE)
#filename = hashlib.sha256(s.encode('utf-8')).hexdigest()
#with open(filename, 'w') as f:
# print(s, file=f)
res = p.communicate(input=s.encode('utf-8'))
out = res[0].decode('utf-8')
err = res[1].decode('utf-8')
soup = BeautifulSoup(out, 'html.parser')
style = str(soup.style)
body = "".join(str(s) for s in soup.body.children)
return style, body
def _compile_proto(full_path, dest):
'Helper to compile protobuf files'
proto_path = os.path.dirname(full_path)
protoc_args = [find_protoc(),
'--python_out={}'.format(dest),
'--proto_path={}'.format(proto_path),
full_path]
proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
outs, errs = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
return False
if proc.returncode != 0:
msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format(
full_path, errs.decode('utf-8'), outs.decode('utf-8'))
raise BadProtobuf(msg)
return True
def run(daemon):
if daemon:
pid_file = './sensor21.pid'
if os.path.isfile(pid_file):
pid = int(open(pid_file).read())
os.remove(pid_file)
try:
p = psutil.Process(pid)
p.terminate()
except:
pass
try:
p = subprocess.Popen(['python3', 'sensor21-server.py'])
open(pid_file, 'w').write(str(p.pid))
except subprocess.CalledProcessError:
raise ValueError("error starting sensor21-server.py daemon")
else:
print("Server running...")
app.run(host='::', port=5002)
def exec_cmd(cmd, **kwds):
"""
Execute arbitrary commands as sub-processes.
"""
stdin = kwds.get('stdin', None)
stdin_flag = None
if not stdin is None:
stdin_flag = subprocess.PIPE
proc = subprocess.Popen(
cmd,
stdin=stdin_flag,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate(stdin)
return (proc.returncode, stdout, stderr)
#=======================================================================
# Extend the Milter Class (where the email is captured)
#=======================================================================
def epubcheck_help():
"""Return epubcheck.jar commandline help text.
:return unicode: helptext from epubcheck.jar
"""
# tc = locale.getdefaultlocale()[1]
with open(os.devnull, "w") as devnull:
p = subprocess.Popen(
[c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
stdout=subprocess.PIPE,
stderr=devnull,
)
result = p.communicate()[0]
return result.decode()
def get_verify_command(self, signature_filename, data_filename,
keystore=None):
"""
Return a suitable command for verifying a file.
:param signature_filename: The pathname to the file containing the
signature.
:param data_filename: The pathname to the file containing the
signed data.
:param keystore: The path to a directory which contains the keys
used in verification. If not specified, the
instance's ``gpg_home`` attribute is used instead.
:return: The verifying command as a list suitable to be
passed to :class:`subprocess.Popen`.
"""
cmd = [self.gpg, '--status-fd', '2', '--no-tty']
if keystore is None:
keystore = self.gpg_home
if keystore:
cmd.extend(['--homedir', keystore])
cmd.extend(['--verify', signature_filename, data_filename])
logger.debug('invoking: %s', ' '.join(cmd))
return cmd
def run_command_with_code(self, cmd, redirect_output=True,
check_exit_code=True):
"""Runs a command in an out-of-process shell.
Returns the output of that command. Working directory is self.root.
"""
if redirect_output:
stdout = subprocess.PIPE
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
return (output, proc.returncode)
def test_loadSADFile_startorder(self):
maxpid=32768
try:
out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE)
res=out.communicate()
maxpid=int(res[0].strip())
except:
pass
retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml')
self.assertEquals(retval, True)
comp_ac = sb.getComponent('ticket_462_ac_1')
self.assertNotEquals(comp_ac, None)
comp = sb.getComponent('ticket_462_1')
self.assertNotEquals(comp, None)
if comp_ac._pid <= maxpid-1:
isless= comp_ac._pid < comp._pid
else:
isless=comp._pid < comp_ac._pid
self.assertTrue(isless)
def test_UserOrGroupNoDaemon(self):
"""Test that we read the correct domainname from the DMD file, the test domain
should have been created by the test runner"""
domainName = scatest.getTestDomainName()
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(scatest.getDomainMgrURI())
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser','--group','somegroup' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group','somegroup' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
def test_BadUserOrBadGroup(self):
"""Test that we read the correct domainname from the DMD file, the test domain
should have been created by the test runner"""
domainName = scatest.getTestDomainName()
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(scatest.getDomainMgrURI())
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user=domuser']
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group=somegroup']
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
def plot(self):
if _domainless._DEBUG == True:
print "Plot:plot()"
# Error checking before launching plot
if self.usesPortIORString == None:
raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component"
if self._usesPortName == None:
raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component"
if self._dataType == None:
raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component"
plotCommand = str(self._eclipsePath) + "/bin/plotter.sh -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString)
if _domainless._DEBUG == True:
print "Plot:plotCommand " + str(plotCommand)
args = _shlex.split(plotCommand)
if _domainless._DEBUG == True:
print "Plot:args " + str(args)
try:
dev_null = open('/dev/null','w')
sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp)
pid = sub_process.pid
self._processes[pid] = sub_process
except Exception, e:
raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
def print_card(pdf, printer_name):
"""
Send the PDF to the printer!
Shells out to `lpr` to do the work.
:param pdf: Binary PDF buffer
:param printer_name: Name of the printer on the system (ie. CUPS name)
"""
process = subprocess.Popen(
['lpr', '-P', printer_name],
stdin=subprocess.PIPE
)
process.communicate(pdf)
if process.returncode != 0:
raise PrintingError('Return code {}'.format(process.returncode))
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
""" Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
raise ValueError('Unsupported method at the moment')
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
'-w', weight_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
proc.stdin.write('%d\t%d\t%f\n' % ijx)
proc.stdin.close()
proc.wait()
devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
""" Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ij in itertools.izip(matrix.row, matrix.col):
proc.stdin.write('%d\t%d\n' % ij)
proc.stdin.close()
proc.wait()
devnull.close()
def read_fastq(self):
if self.file[-2:] == "gz":
proc = subprocess.Popen(["gunzip", "--stdout", self.file], stdout=subprocess.PIPE)
reader = proc.stdout
else:
reader = file(self.file, "r")
while True:
header = reader.next().strip()
seq = reader.next().strip()
reader.next() # incr line
qual = reader.next().strip()
if self.rc:
seq = tk_seq.get_rev_comp(seq)
qual = qual[::-1]
yield FastqRow(header, seq, qual)
reader.close()
def parse_ab3_defines(defines_file): # , pkg_name):
try:
fp = open(defines_file, 'rt')
abd_cont = fp.read()
fp.close()
except:
print('[E] Failed to load autobuild defines file! Do you have read permission?')
return False
script = "ARCH={}\n".format(
get_arch_name()) + abd_cont + gen_laundry_list(['PKGNAME', 'PKGDEP', 'BUILDDEP'])
try:
# Better to be replaced by subprocess.Popen
abd_out = subprocess.check_output(script, shell=True)
except:
print('[E] Malformed Autobuild defines file found! Couldn\'t continue!')
return False
abd_fp = io.StringIO('[wrap]\n' + abd_out.decode('utf-8'))
abd_config = RawConfigParser()
abd_config.read_file(abd_fp)
abd_config_dict = {}
for i in abd_config['wrap']:
abd_config_dict[i.upper()] = abd_config['wrap'][i]
return abd_config_dict
def manage_test_process(queue, f, cpopen):
if queue:
if cpopen is not None:
if cpopen.poll() is None:
return cpopen # not finish
print("OK create new process")
my_env = os.environ.copy()
my_env["CUDA_VISIBLE_DEVICES"] = "" # test on CPU
argument = queue.pop()
popen = subprocess.Popen(argument, stdout=f, stderr=f, env=my_env)
return popen
else:
return cpopen
def runitt():
open(pid, 'w').close()
global variable
variable=str(variable)
variablestr=str(variable)
print('Starting Trade of: ' + variablestr)
process0='./zenbot.sh trade poloniex.' + variablestr
subprocess.Popen(process0,shell=True)
time.sleep(3600)
print('Starting node kill process)
process1='sudo killall node'
subprocess.Popen(process1,shell=True)
print('Starting final sell Of:' + variablestr + ' In Case of Error')
process2='./zenbot.sh sell --order_adjust_time=1000000000 --sell_pct=100 --markup_pct=0 poloniex.' + variablestr
proc2 = subprocess.Popen(process2,shell=True)
proc2.communicate()
print('Starting final sell Of:' + variablestr + ' In Case of Error')
process3='./zenbot.sh sell --order_adjust_time=1000000000 --sell_pct=100 --markup_pct=0 poloniex.' + variablestr
proc3 = subprocess.Popen(process3,shell=True)
proc3.communicate()
os.remove(pid)
print('Done running loop, process file deleted. Waiting for another coin...')
# From now on, any update received will be passed to 'update_handler' NOTE... Later, Zenbot will be modified to cancel on order adjust.
def add_to_vcs(self, summary):
if (
self.git_add and
(SyncStatus.DELETED in summary or SyncStatus.ADDED in summary) and
not self.dry_run and
self.confirm(
question=(
'Do you want to add created and removed files to GIT?'
)
)
):
output, errors = subprocess.Popen(
['git', '-C', app_settings.SYNC_DIRECTORY,
'add', '-A', app_settings.SYNC_DIRECTORY],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
if errors:
raise self.error('Adding file changes to GIT failed!')
def get_audio_streams(self):
with open(os.devnull, 'w') as DEV_NULL:
#Get file info and Parse it
try:
proc = subprocess.Popen([
FFPROBE,
'-i', self.input_video,
'-of', 'json',
'-show_streams'
], stdout=subprocess.PIPE, stderr=DEV_NULL)
except OSError as e:
if e.errno == os.errno.ENOENT:
Logger.error("FFPROBE not found, install on your system to use this script")
sys.exit(0)
output = proc.stdout.read()
return get_audio_streams(json.loads(output))
def launch(self, cfg, path, flags):
logging.debug("Determine the OS and Architecture this application is currently running on")
hostOS = platform.system().lower()
logging.debug("hostOS: " + str(hostOS))
is_64bits = sys.maxsize > 2 ** 32
if is_64bits:
hostArchitecture = 'x64'
else:
hostArchitecture = 'ia32'
logging.debug("hostArchitecture: " + str(hostArchitecture))
if(self.validateConfig(cfg, hostOS, hostArchitecture)):
fnull = open(os.devnull, 'w')
if os.environ.get("WPW_HOME") is not None:
cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
else:
cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
cmd.extend(flags)
proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
return proc
else:
logging.debug("Invalid OS/Architecture combination detected")
def build_mkdocs(self):
"""
Invokes MkDocs to build the static documentation and moves the folder
into the project root folder.
"""
# Setting the working directory
os.chdir(MKDOCS_DIR)
# Building the MkDocs project
pipe = subprocess.PIPE
mkdocs_process = subprocess.Popen(
["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe)
std_op, std_err_op = mkdocs_process.communicate()
if std_err_op:
raise Error("Could not build MkDocs !\n%s" %
std_err_op)
def goglib_get_games_list():
proc = subprocess.Popen(['lgogdownloader', '--exclude', \
'1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
games_detailed_list = proc.stdout.readlines()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 0:
file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'
games_list_file = open(file_path, 'w')
for line in games_detailed_list:
if 'Getting game info' not in line:
games_list_file.write(line)
return 0
else:
return 1
def win64_available(self):
wine_bin, \
wineserver_bin, \
wine_lib = self.get_wine_bin_path()
dev_null = open(os.devnull, 'w')
try:
proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
dev_null.close()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 1:
self.combobox_winearch.set_visible(True)
return True
else:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
except:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
def writerow(self, row):
"""
:param row:
:return:
"""
self._bytes_written += self._out_writer.writerow(row)
row_txt = self._buffer.getvalue()
self._out_csv.write(row_txt)
self._reset_buffer()
self._out_csv.flush()
if self._bytes_written > self.max_bytes:
self._out_csv.close()
self._make_csv_writer()
out_name = str(Path(self._out_csv.name).absolute())
subprocess.Popen(['7z', 'a', '-t7z', '-m0=lzma', '-mx=9', '-mfb=64', '-md=16m',
out_name + '.7z', out_name])
return row_txt