def preprocess_source(self, in_file, additional_args=[]):
import subprocess
self._args.extend(self._build_compiler_flags())
self._args.extend(additional_args)
result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode == 0:
return result.stdout
else:
if result.stderr:
Style.error('Preprocess failed: ')
print(result.stderr)
return ''
python类run()的实例源码
def del_addr(linkname, address):
try:
subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(linkname)]
except subprocess.CalledProcessError as suberror:
return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')]
# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
def build_package(builder_image, package_type, version, out_dir, dependencies):
"""
Build a deb or RPM package using a fpm-within-docker Docker image.
:param package_type str: "rpm" or "deb".
:param version str: The package version.
:param out_dir Path: Directory where package will be output.
:param dependencies list: package names the resulting package should depend
on.
"""
run([
"docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version,
"-e", "PACKAGE_TYPE=" + package_type,
"-v", "{}:/build-inside:rw".format(THIS_DIRECTORY),
"-v", "{}:/source:rw".format(THIS_DIRECTORY.parent),
"-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image,
"/build-inside/build-package.sh", *dependencies
],
check=True)
def execute_task(host, tasks):
"""Call Fabric to execute tasks against a host
Return: CompletedProcess instance
"""
# TODO: add support for groups, multiple hosts
# TODO: add support for providing input data from team files
return subprocess.run(
[
PYTHON2_PATH,
FABRIC_PATH,
'--abort-on-prompts',
'--hosts=%s' % host,
'--fabfile=%s' % FABFILE_PATH,
*tasks,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combine out/err into stdout; stderr will be None
universal_newlines=True,
check=True,
)
def _upload_artifacts_to_version(self):
"""Recursively upload directory contents to S3."""
if not os.listdir(self.artifact_path) or not self.artifact_path:
raise S3ArtifactNotFound
uploaded = False
if self.s3props.get("content_metadata"):
LOG.info("Uploading in multiple parts to set metadata")
uploaded = self.content_metadata_uploads()
if not uploaded:
cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path,
self.s3_version_uri, self.env)
result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
LOG.debug("Upload Command Ouput: %s", result.stdout)
LOG.info("Uploaded artifacts to %s bucket", self.bucket)
def _sync_to_uri(self, uri):
"""Copy and sync versioned directory to uri in S3.
Args:
uri (str): S3 URI to sync version to.
"""
cmd_cp = 'aws s3 cp {} {} --recursive --profile {}'.format(self.s3_version_uri, uri, self.env)
# AWS CLI sync does not work as expected bucket to bucket with exact timestamp sync.
cmd_sync = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(
self.s3_version_uri, uri, self.env)
cp_result = subprocess.run(cmd_cp, check=True, shell=True, stdout=subprocess.PIPE)
LOG.debug("Copy to %s before sync output: %s", uri, cp_result.stdout)
LOG.info("Copied version %s to %s", self.version, uri)
sync_result = subprocess.run(cmd_sync, check=True, shell=True, stdout=subprocess.PIPE)
LOG.debug("Sync to %s command output: %s", uri, sync_result.stdout)
LOG.info("Synced version %s to %s", self.version, uri)
def test_example_manuscript(manuscript):
"""
Test command line execution of manubot to build an example manuscript.
"""
manuscript_dir = directory.joinpath('manuscripts', manuscript)
args = [
'manubot',
'--log-level', 'INFO',
'--content-directory', manuscript_dir.joinpath('content'),
'--output-directory', manuscript_dir.joinpath('output'),
]
if manuscript == 'variables':
args.extend([
'--template-variables-path',
manuscript_dir.joinpath('content/template-variables.json'),
])
process = subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print(process.args)
print(process.stderr.decode())
assert process.returncode == 0
def do_start_build_stuff(ctx):
config = ctx.obj.config
solution_num = ctx.obj.solution_num
try:
file = click.open_file("run_hls.tcl","w")
file.write("open_project " + config["project_name"] + "\n")
file.write("set_top " + config["top_level_function_name"] + "\n")
for src_file in config["src_files"]:
file.write("add_files " + config["src_dir_name"] + "/" + src_file + "\n")
for tb_file in config["tb_files"]:
file.write("add_files -tb " + config["tb_dir_name"] + "/" + tb_file + "\n")
if ctx.params['keep']:
file.write("open_solution -reset \"solution" + str(solution_num) + "\"" + "\n")
else:
file.write("open_solution \"solution" + str(solution_num) + "\"" + "\n")
file.write("set_part \{" + config["part_name"] + "\}" + "\n")
file.write("create_clock -period " + config["clock_period"] + " -name default" + "\n")
return file
except OSError:
click.echo("Woah! Couldn't create a Tcl run file in the current folder!")
raise click.Abort()
# Function to write a default build into the HLS Tcl build script.
def build_end_callback(ctx,sub_command_returns,keep,report):
# Catch the case where no subcommands have been issued and offer a default build
if not sub_command_returns:
if click.confirm("No build stages specified, would you like to run a default sequence using all the build stages?", abort=True):
do_default_build(ctx)
ctx.obj.file.write("exit" + "\n")
ctx.obj.file.close()
# Call the Vivado HLS process
hls_processs = subprocess.run(["vivado_hls", "-f", "run_hls.tcl"])
# Check return status of the HLS process.
if hls_processs.returncode < 0:
raise click.Abort()
elif hls_processs.returncode > 0:
click.echo("Warning: HLS Process returned an error, skipping report opening!")
raise click.Abort()
else:
do_end_build_stuff(ctx,sub_command_returns,report)
# csim subcommand
def volume_plugin(colors):
path=os.path.realpath(__file__)
path=os.path.dirname(path)
p=subprocess.run(['sh',path+"/volume_level.sh"], stdout=subprocess.PIPE)
string=''
for c in str(p.stdout):
if c.isnumeric():
string+=c
level=int(string)
p=subprocess.run(['sh', path+"/volume_muted.sh"], stdout=subprocess.PIPE)
if str(p.stdout)[2]=='y':
muted=True
else:
muted=False
string='%{F'+colors["lwhite"]+'}'
if level>0 and not muted:
if level>=50:
string+='\uf028'
else:
string+='\uf027'
else:
string+='\uf026'
string+=" %{F"+colors["lwhite"]+"}%3.0f"%level+'%'
return string
def generate_ssh_key(note, keypath='github_deploy_key'):
"""
Generates an SSH deploy public and private key.
Returns the public key as a str.
"""
p = subprocess.run(['ssh-keygen', '-t', 'rsa', '-b', '4096', '-C', note,
'-f', keypath, '-N', ''])
if p.returncode:
raise RuntimeError("SSH key generation failed")
with open(keypath + ".pub") as f:
key = f.read()
os.remove(keypath + ".pub")
return key
def guess_github_repo():
"""
Guesses the github repo for the current directory
Returns False if no guess can be made.
"""
p = subprocess.run(['git', 'ls-remote', '--get-url', 'origin'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
if p.stderr or p.returncode:
return False
url = p.stdout.decode('utf-8').strip()
m = GIT_URL.fullmatch(url)
if not m:
return False
return m.group(1)
def checkout_deploy_branch(deploy_branch, canpush=True):
"""
Checkout the deploy branch, creating it if it doesn't exist.
"""
# Create an empty branch with .nojekyll if it doesn't already exist
create_deploy_branch(deploy_branch, push=canpush)
remote_branch = "doctr_remote/{}".format(deploy_branch)
print("Checking out doctr working branch tracking", remote_branch)
clear_working_branch()
# If gh-pages doesn't exist the above create_deploy_branch() will create
# it we can push, but if we can't, it won't and the --track would fail.
if run(['git', 'rev-parse', '--verify', remote_branch], exit=False) == 0:
extra_args = ['--track', remote_branch]
else:
extra_args = []
run(['git', 'checkout', '-b', DOCTR_WORKING_BRANCH] + extra_args)
print("Done")
return canpush
def push_docs(deploy_branch='gh-pages', retries=3):
"""
Push the changes to the branch named ``deploy_branch``.
Assumes that :func:`setup_GitHub_push` has been run and returned True, and
that :func:`commit_docs` has been run. Does not push anything if no changes
were made.
"""
code = 1
while code and retries:
print("Pulling")
code = run(['git', 'pull', '-s', 'recursive', '-X', 'ours',
'doctr_remote', deploy_branch], exit=False)
print("Pushing commit")
code = run(['git', 'push', '-q', 'doctr_remote',
'{}:{}'.format(DOCTR_WORKING_BRANCH, deploy_branch)], exit=False)
if code:
retries -= 1
print("Push failed, retrying")
time.sleep(1)
else:
return
sys.exit("Giving up...")
def run_cmd(cmd, quiet=False):
if not quiet:
logging.info('command: {}'.format(cmd))
# use shlex to keep quoted substrings
result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
stdout = result.stdout.strip().decode()
stderr = result.stderr.strip().decode()
if stdout and not quiet:
logging.debug(stdout)
if stderr and not quiet:
logging.warning(stderr)
return result.stdout.strip()
def meld(got, expected):
if got == expected:
return
import inspect
call_frame = inspect.getouterframes(inspect.currentframe(), 2)
test_name = call_frame[1][3]
from pprint import pformat
import os
from os import path
os.makedirs(test_name, exist_ok=True)
got_fn = path.join(test_name, 'got')
expected_fn = path.join(test_name, 'expected')
with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f:
got_f.write(pformat(got))
expected_f.write(pformat(expected))
import subprocess
subprocess.run(['meld', got_fn, expected_fn])
def call_say(self, txt: str, speed=None, pitch=None, literal=False):
if self.get_option(self.Options.USE_ESPEAK):
args = ["espeak"]
if pitch:
args += ["-p", str(pitch)]
if speed:
args += ["-s", str(speed)]
if literal:
txt = " ".join(txt)
args.append(txt)
else:
args = ["say"]
if pitch:
txt = f"[[ pbas +{pitch}]] {txt}"
if speed:
args += ["-r", str(speed)]
if literal:
txt = f"[[ char LTRL ]] {txt}"
args.append(txt)
if self.enabled:
logger.debug(f"Saying '{txt}'")
subprocess.run(args)
def main():
parser = argparse.ArgumentParser(
description="Backup system/data using dar and par2")
parser.add_argument("-c", "--config", dest="config", required=True,
help="configuration file for dar and archive. " +
"NOTE: the backup archive will be placed under " +
"the same directory as this configuration file")
parser.add_argument("-n", "--dry-run", dest="dry_run", action="store_true",
help="dry run, do not perform any action")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
help="show verbose information")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
settings = DarSettings(args.config, verbose=args.verbose,
dry_run=args.dry_run)
dar = DarBackup(settings)
dar.run(dry_run=args.dry_run)
def run(self):
'''
Restarts the RabbitMQ broker using a method derived from the TEST_DISTRIBUTION
environmental variable.
If TEST_DISTRIBUTION=="arch", will try to restart rabbitmq using the linux ``systemctl``
command.
If TEST_DISTRIBUTION=="ubuntu", will try to restart rabbitmq using the linux ``service``
command.
Will wait for 20 seconds after restarting before returning.
:raises ValueError: if TEST_DISTRIBUTION environmental variable not found
.. note:: the user who invokes this method will likely require sudo access to the linux commands.
This can be provided by editing the sudoers file.
'''
await self.loop.run_in_executor(None, self._run)
await asyncio.sleep(20)
def get_vpn_connections():
try:
output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,NAME', 'connection', 'show'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output.check_returncode()
lines = output.stdout.decode('utf-8').split('\n')
vpn_connections = []
for line in lines:
if line:
elements = line.strip().split(':')
if (elements[0] == 'vpn'):
vpn_connections.append(elements[1])
return vpn_connections
except subprocess.CalledProcessError:
error = utils.format_std_string(output.stderr)
logger.error(error)
return False
def get_interfaces(wifi=True, ethernet=True):
try:
output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,DEVICE', 'device', 'status'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output.check_returncode()
lines = output.stdout.decode('utf-8').split('\n')
interfaces = []
for line in lines:
if line:
elements = line.strip().split(':')
if (wifi and elements[0] == 'wifi') or (ethernet and elements[0] == 'ethernet'):
interfaces.append(elements[1])
return interfaces
except subprocess.CalledProcessError:
error = utils.format_std_string(output.stderr)
logger.error(error)
return False
except Exception as ex:
logger.error(ex)
return False
def get_num_processes(num_servers):
# Since each process is not resource heavy and simply takes time waiting for pings, maximise the number of processes (within constraints of the current configuration)
# Maximum open file descriptors of current configuration
soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
# Find how many file descriptors are already in use by the parent process
ppid = os.getppid()
used_file_descriptors = int(subprocess.run('ls -l /proc/' + str(ppid) + '/fd | wc -l', shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8'))
# Max processes is the number of file descriptors left, before the sof limit (configuration maximum) is reached
max_processes = int((soft_limit - used_file_descriptors) / 2)
if num_servers > max_processes:
return max_processes
else:
return num_servers
def extract_all_features(save_dir, data_dir=DATA_DIR, extension=".cell"):
from naive_bayes import extract_nb_features
from random_forest import extract_rf_features
from svc1 import extract_svc1_features
from svc2 import extract_svc2_features
import subprocess
create_dir_if_not_exists(save_dir + '/knn_cells/')
subprocess.run([
'go', 'run', dirname + '/kNN.go', '-folder', data_dir + '/',
'-new_path', save_dir + '/knn_cells/', '-extension', extension]
)
# extract_features(extract_nb_features, save_dir + '/nb_cells', data_dir=data_dir, extension=extension, model_name="naive bayes")
extract_features(extract_rf_features, save_dir + '/rf_cells', data_dir=data_dir, extension=extension, model_name="random forest")
extract_features(extract_svc1_features, save_dir + '/svc1_cells', data_dir=data_dir, extension=extension, model_name="svc1")
extract_features(extract_svc2_features, save_dir + '/svc2_cells', data_dir=data_dir, extension=extension, model_name="svc2")
stdout.write("Finished extracting features\n")
def fixRPath( self, filename ):
all_rpaths = self.getRPaths( filename )
all_replacements = []
print( 'Checking RPATH for %s' % (filename,) )
for rpath in all_rpaths:
print( ' Looking at RPATH %s' % (rpath,) )
for all_orig_rpaths, replacement in self.all_rpath_replacements:
all_replacements.append( replacement )
for orig in all_orig_rpaths:
if rpath == orig:
print( ' Need to replace %s with %s' % (rpath, replacement) )
subprocess.run( ['install_name_tool', '-rpath', rpath, replacement, filename], check=True )
all_rpaths = self.getRPaths( filename )
for rpath in all_rpaths:
if rpath not in all_replacements:
print( ' Delete unused rpath %s' % (rpath,) )
subprocess.run( ['install_name_tool', '-delete_rpath', rpath, filename], check=True )
def getRPaths( self, filename ):
all_rpaths = []
res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True )
state = self.ST_IDLE
for line in res.stdout.split('\n'):
words = line.strip().split()
if state == self.ST_IDLE:
if words == ['cmd','LC_RPATH']:
state = self.ST_RPATH
elif state == self.ST_RPATH:
if words[0:1] == ['path']:
path = words[1]
all_rpaths.append( path )
state = self.ST_IDLE
return all_rpaths
def getDylibs( self, filename ):
all_dylibs = []
res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True )
state = self.ST_IDLE
for line in res.stdout.split('\n'):
words = line.strip().split()
if state == self.ST_IDLE:
if words == ['cmd','LC_LOAD_DYLIB']:
state = self.ST_DYNLIB
elif state == self.ST_DYNLIB:
if words[0:1] == ['name']:
path = words[1]
all_dylibs.append( path )
state = self.ST_IDLE
return all_dylibs
def write_raspa_file(filename, uuid, simulation_config):
"""Writes RASPA input file for calculating helium void fraction.
Args:
filename (str): path to input file.
run_id (str): identification string for run.
material_id (str): uuid for material.
Writes RASPA input-file.
"""
# Load simulation parameters from config
values = {
'NumberOfCycles' : simulation_config['simulation_cycles'],
'FrameworkName' : uuid}
# Load template and replace values
input_data = load_and_subs_template('helium_void_fraction.input', values)
# Write simulation input-file
with open(filename, "w") as raspa_input_file:
raspa_input_file.write(input_data)
def write_raspa_file(filename, uuid, simulation_config):
"""Writes RASPA input file for calculating surface area.
Args:
filename (str): path to input file.
run_id (str): identification string for run.
material_id (str): uuid for material.
Writes RASPA input-file.
"""
# Load simulation parameters from config
values = {
'NumberOfCycles' : simulation_config['simulation_cycles'],
'FrameworkName' : uuid}
# Load template and replace values
input_data = load_and_subs_template('surface_area.input', values)
# Write simulation input-file
with open(filename, "w") as raspa_input_file:
raspa_input_file.write(input_data)
def flush_git_rm_files():
if git_rm_files:
try:
subprocess.run(["git", "rm", "-f", *git_rm_files], stdout=subprocess.PIPE, stderr=subprocess.PIPE).check_returncode()
except subprocess.CalledProcessError:
pass
# clean up
for path in git_rm_files:
try:
os.unlink(path)
except FileNotFoundError:
pass
git_rm_files.clear()
# @subcommand
# def noop():
# "Do-nothing command. Used for blurb smoke-testing."
# pass
def run_chef_knife(host):
knife = "knife bootstrap --no-host-key-verify " \
"--ssh-user root --ssh-identity-file %s/.ssh/id_rsa_prox " \
"--environment=scicomp-env-compute " \
'--server-url "https://chef.fhcrc.org/organizations/cit" ' \
"--run-list 'role[cit-base]','role[scicomp-base]' " \
"--node-name %s " \
"%s" % (homedir,host,host)
if host == 'hostname':
print('you can also execute this knife command manually:')
print('************************************')
print(knife)
print('************************************')
else:
if os.path.exists('%s/.chef' % homedir):
print('*** executing knife command:')
print(knife)
ret = subprocess.run(knife, shell=True)
else:
print ('chef/knife config dir %s/.chef does not exist.' % homedir)