def cmd(command):
result = Result()
p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p.communicate()
result.exit_code = p.returncode
result.stdout = stdout
result.stderr = stderr
result.command = command
if p.returncode != 0:
print 'Error executing command [%s]' % command
print 'stderr: [%s]' % stderr
print 'stdout: [%s]' % stdout
return result
python类PIPE的实例源码
def run_command(args, wait=False):
try:
if (wait):
p = subprocess.Popen(
args,
stdout = subprocess.PIPE)
p.wait()
else:
p = subprocess.Popen(
args,
stdin = None, stdout = None, stderr = None, close_fds = True)
(result, error) = p.communicate()
except subprocess.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
return result
def preprocess_source(self, in_file, additional_args=[]):
import subprocess
self._args.extend(self._build_compiler_flags())
self._args.extend(additional_args)
result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode == 0:
return result.stdout
else:
if result.stderr:
Style.error('Preprocess failed: ')
print(result.stderr)
return ''
def popen(cmd, mode="r", buffering=-1):
if not isinstance(cmd, str):
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
if mode not in ("r", "w"):
raise ValueError("invalid mode %r" % mode)
if buffering == 0 or buffering is None:
raise ValueError("popen() does not support unbuffered streams")
import subprocess, io
if mode == "r":
proc = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
else:
proc = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
# Helper for popen() -- a proxy for a file whose close waits for the process
def store_revision_info(src_path, output_dir, arg_string):
# Get git hash
gitproc = Popen(['git', 'rev-parse', 'HEAD'], stdout = PIPE, cwd=src_path)
(stdout, _) = gitproc.communicate()
git_hash = stdout.strip()
# Get local changes
gitproc = Popen(['git', 'diff', 'HEAD'], stdout = PIPE, cwd=src_path)
(stdout, _) = gitproc.communicate()
git_diff = stdout.strip()
# Store a text file in the log directory
rev_info_filename = os.path.join(output_dir, 'revision_info.txt')
with open(rev_info_filename, "w") as text_file:
text_file.write('arguments: %s\n--------------------\n' % arg_string)
text_file.write('git hash: %s\n--------------------\n' % git_hash)
text_file.write('%s' % git_diff)
def run_command(command, wait=False):
try:
if (wait):
p = subprocess.Popen(
[command],
stdout = subprocess.PIPE,
shell = True)
p.wait()
else:
p = subprocess.Popen(
[command],
shell = True,
stdin = None, stdout = None, stderr = None, close_fds = True)
(result, error) = p.communicate()
except subprocess.CalledProcessError as e:
sys.stderr.write(
"common::run_command() : [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
return result
def mathjax(s):
with open("temp.log", "w") as f:
f.write(s)
p = Popen([app.config['mjpage'],
'--dollars',
'--output', "CommonHTML",
'--fontURL',
("https://cdnjs.cloudflare.com/ajax/libs/"
"mathjax/2.7.0/fonts/HTML-CSS")], stdout=PIPE, stdin=PIPE,
stderr=PIPE)
#filename = hashlib.sha256(s.encode('utf-8')).hexdigest()
#with open(filename, 'w') as f:
# print(s, file=f)
res = p.communicate(input=s.encode('utf-8'))
out = res[0].decode('utf-8')
err = res[1].decode('utf-8')
soup = BeautifulSoup(out, 'html.parser')
style = str(soup.style)
body = "".join(str(s) for s in soup.body.children)
return style, body
def _compile_proto(full_path, dest):
'Helper to compile protobuf files'
proto_path = os.path.dirname(full_path)
protoc_args = [find_protoc(),
'--python_out={}'.format(dest),
'--proto_path={}'.format(proto_path),
full_path]
proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
outs, errs = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
return False
if proc.returncode != 0:
msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format(
full_path, errs.decode('utf-8'), outs.decode('utf-8'))
raise BadProtobuf(msg)
return True
def exec_cmd(cmd, **kwds):
"""
Execute arbitrary commands as sub-processes.
"""
stdin = kwds.get('stdin', None)
stdin_flag = None
if not stdin is None:
stdin_flag = subprocess.PIPE
proc = subprocess.Popen(
cmd,
stdin=stdin_flag,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate(stdin)
return (proc.returncode, stdout, stderr)
#=======================================================================
# Extend the Milter Class (where the email is captured)
#=======================================================================
def del_addr(linkname, address):
try:
subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(linkname)]
except subprocess.CalledProcessError as suberror:
return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')]
# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
def epubcheck_help():
"""Return epubcheck.jar commandline help text.
:return unicode: helptext from epubcheck.jar
"""
# tc = locale.getdefaultlocale()[1]
with open(os.devnull, "w") as devnull:
p = subprocess.Popen(
[c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
stdout=subprocess.PIPE,
stderr=devnull,
)
result = p.communicate()[0]
return result.decode()
def run_command_with_code(self, cmd, redirect_output=True,
check_exit_code=True):
"""Runs a command in an out-of-process shell.
Returns the output of that command. Working directory is self.root.
"""
if redirect_output:
stdout = subprocess.PIPE
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
return (output, proc.returncode)
def filter_region(view, txt, command):
try:
contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
contents.write(txt.encode('utf-8'))
contents.close()
script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False)
script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8'))
script.close()
p = subprocess.Popen([script.name],
stdout=PIPE,
stderr=PIPE,
startupinfo=get_startup_info())
out, err = p.communicate()
return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip()
finally:
os.remove(script.name)
os.remove(contents.name)
def test_loadSADFile_startorder(self):
maxpid=32768
try:
out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE)
res=out.communicate()
maxpid=int(res[0].strip())
except:
pass
retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml')
self.assertEquals(retval, True)
comp_ac = sb.getComponent('ticket_462_ac_1')
self.assertNotEquals(comp_ac, None)
comp = sb.getComponent('ticket_462_1')
self.assertNotEquals(comp, None)
if comp_ac._pid <= maxpid-1:
isless= comp_ac._pid < comp._pid
else:
isless=comp._pid < comp_ac._pid
self.assertTrue(isless)
def test_UserOrGroupNoDaemon(self):
"""Test that we read the correct domainname from the DMD file, the test domain
should have been created by the test runner"""
domainName = scatest.getTestDomainName()
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(scatest.getDomainMgrURI())
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser','--group','somegroup' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group','somegroup' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser' ]
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
def test_BadUserOrBadGroup(self):
"""Test that we read the correct domainname from the DMD file, the test domain
should have been created by the test runner"""
domainName = scatest.getTestDomainName()
# Test that we don't already have a bound domain
try:
domMgr = self._root.resolve(scatest.getDomainMgrURI())
self.assertEqual(domMgr, None)
except CosNaming.NamingContext.NotFound:
pass # This exception is expected
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user=domuser']
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group=somegroup']
nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
def print_card(pdf, printer_name):
"""
Send the PDF to the printer!
Shells out to `lpr` to do the work.
:param pdf: Binary PDF buffer
:param printer_name: Name of the printer on the system (ie. CUPS name)
"""
process = subprocess.Popen(
['lpr', '-P', printer_name],
stdin=subprocess.PIPE
)
process.communicate(pdf)
if process.returncode != 0:
raise PrintingError('Return code {}'.format(process.returncode))
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
""" Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
raise ValueError('Unsupported method at the moment')
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
'-w', weight_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
proc.stdin.write('%d\t%d\t%f\n' % ijx)
proc.stdin.close()
proc.wait()
devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
""" Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ij in itertools.izip(matrix.row, matrix.col):
proc.stdin.write('%d\t%d\n' % ij)
proc.stdin.close()
proc.wait()
devnull.close()
def read_fastq(self):
if self.file[-2:] == "gz":
proc = subprocess.Popen(["gunzip", "--stdout", self.file], stdout=subprocess.PIPE)
reader = proc.stdout
else:
reader = file(self.file, "r")
while True:
header = reader.next().strip()
seq = reader.next().strip()
reader.next() # incr line
qual = reader.next().strip()
if self.rc:
seq = tk_seq.get_rev_comp(seq)
qual = qual[::-1]
yield FastqRow(header, seq, qual)
reader.close()
def add_to_vcs(self, summary):
if (
self.git_add and
(SyncStatus.DELETED in summary or SyncStatus.ADDED in summary) and
not self.dry_run and
self.confirm(
question=(
'Do you want to add created and removed files to GIT?'
)
)
):
output, errors = subprocess.Popen(
['git', '-C', app_settings.SYNC_DIRECTORY,
'add', '-A', app_settings.SYNC_DIRECTORY],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
).communicate()
if errors:
raise self.error('Adding file changes to GIT failed!')
def get_audio_streams(self):
with open(os.devnull, 'w') as DEV_NULL:
#Get file info and Parse it
try:
proc = subprocess.Popen([
FFPROBE,
'-i', self.input_video,
'-of', 'json',
'-show_streams'
], stdout=subprocess.PIPE, stderr=DEV_NULL)
except OSError as e:
if e.errno == os.errno.ENOENT:
Logger.error("FFPROBE not found, install on your system to use this script")
sys.exit(0)
output = proc.stdout.read()
return get_audio_streams(json.loads(output))
def build_mkdocs(self):
"""
Invokes MkDocs to build the static documentation and moves the folder
into the project root folder.
"""
# Setting the working directory
os.chdir(MKDOCS_DIR)
# Building the MkDocs project
pipe = subprocess.PIPE
mkdocs_process = subprocess.Popen(
["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe)
std_op, std_err_op = mkdocs_process.communicate()
if std_err_op:
raise Error("Could not build MkDocs !\n%s" %
std_err_op)
def goglib_get_games_list():
proc = subprocess.Popen(['lgogdownloader', '--exclude', \
'1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
games_detailed_list = proc.stdout.readlines()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 0:
file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'
games_list_file = open(file_path, 'w')
for line in games_detailed_list:
if 'Getting game info' not in line:
games_list_file.write(line)
return 0
else:
return 1
def win64_available(self):
wine_bin, \
wineserver_bin, \
wine_lib = self.get_wine_bin_path()
dev_null = open(os.devnull, 'w')
try:
proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
dev_null.close()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 1:
self.combobox_winearch.set_visible(True)
return True
else:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
except:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
def execute_task(host, tasks):
"""Call Fabric to execute tasks against a host
Return: CompletedProcess instance
"""
# TODO: add support for groups, multiple hosts
# TODO: add support for providing input data from team files
return subprocess.run(
[
PYTHON2_PATH,
FABRIC_PATH,
'--abort-on-prompts',
'--hosts=%s' % host,
'--fabfile=%s' % FABFILE_PATH,
*tasks,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine out/err into stdout; stderr will be None
universal_newlines=True,
check=True,
)
def index():
if request.args.get('code'):
unlock_code = request.args.get('code')
# unlock, new password
re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
if re:
jid = re.jid
re.password_code = None
s.merge(re)
s.commit()
# set new password and send email
email_address = re.email
password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
p.communicate(args)
sendMail(email_address, 'new password', password)
content = render_template('success.html', message='password was sent')
else:
content = render_template('error.html', message='link invalid')
else:
content = render_template('index.html')
return content
def output_articles(articles):
if len(articles) == 0:
print('No articles found')
return
try:
pager = subprocess.Popen(['less'],
stdin=subprocess.PIPE,
stdout=sys.stdout)
for article in articles:
if int(article['reading_time']) <= 0:
article['reading_time'] = 'Unknown'
content = format_article(article, line=True)
if six.PY3:
content = bytearray(content, 'utf-8')
pager.stdin.write(content)
pager.stdin.close()
pager.wait()
except (KeyboardInterrupt, ValueError):
pass
def spawn(self, cmd, stdin_content="", stdin=False, shell=False, timeout=2):
"""
Spawn a new process using subprocess
"""
try:
if type(cmd) != list:
raise PJFInvalidType(type(cmd), list)
if type(stdin_content) != str:
raise PJFInvalidType(type(stdin_content), str)
if type(stdin) != bool:
raise PJFInvalidType(type(stdin), bool)
self._in = stdin_content
try:
self.process = subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, shell=shell)
self.finish_read(timeout, stdin_content, stdin)
if self.process.poll() is not None:
self.close()
except KeyboardInterrupt:
return
except OSError:
raise PJFProcessExecutionError("Binary <%s> does not exist" % cmd[0])
except Exception as e:
raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
def _upload_artifacts_to_version(self):
"""Recursively upload directory contents to S3."""
if not os.listdir(self.artifact_path) or not self.artifact_path:
raise S3ArtifactNotFound
uploaded = False
if self.s3props.get("content_metadata"):
LOG.info("Uploading in multiple parts to set metadata")
uploaded = self.content_metadata_uploads()
if not uploaded:
cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path,
self.s3_version_uri, self.env)
result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
LOG.debug("Upload Command Ouput: %s", result.stdout)
LOG.info("Uploaded artifacts to %s bucket", self.bucket)