python类run()的实例源码

compiler.py 文件源码 项目:pygrunt 作者: elementbound 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def preprocess_source(self, in_file, additional_args=[]):
        import subprocess

        self._args.extend(self._build_compiler_flags())
        self._args.extend(additional_args)

        result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

        if result.returncode == 0:
            return result.stdout
        else:
            if result.stderr:
                Style.error('Preprocess failed: ')
                print(result.stderr)

            return ''
nettools.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def del_addr(linkname, address):
        try:
            subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')]


# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
create-linux-packages.py 文件源码 项目:kubernaut 作者: datawire 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def build_package(builder_image, package_type, version, out_dir, dependencies):
    """
    Build a deb or RPM package using a fpm-within-docker Docker image.

    :param package_type str: "rpm" or "deb".
    :param version str: The package version.
    :param out_dir Path: Directory where package will be output.
    :param dependencies list: package names the resulting package should depend
        on.
    """
    run([
        "docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version,
        "-e", "PACKAGE_TYPE=" + package_type,
        "-v", "{}:/build-inside:rw".format(THIS_DIRECTORY),
        "-v", "{}:/source:rw".format(THIS_DIRECTORY.parent),
        "-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image,
        "/build-inside/build-package.sh", *dependencies
    ],
        check=True)
fabric.py 文件源码 项目:err-fabric 作者: netquity 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def execute_task(host, tasks):
        """Call Fabric to execute tasks against a host

        Return: CompletedProcess instance
        """
        # TODO: add support for groups, multiple hosts
        # TODO: add support for providing input data from team files
        return subprocess.run(
            [
                PYTHON2_PATH,
                FABRIC_PATH,
                '--abort-on-prompts',
                '--hosts=%s' % host,
                '--fabfile=%s' % FABFILE_PATH,
                *tasks,
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # Combine out/err into stdout; stderr will be None
            universal_newlines=True,
            check=True,
        )
s3deploy.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _upload_artifacts_to_version(self):
        """Recursively upload directory contents to S3."""
        if not os.listdir(self.artifact_path) or not self.artifact_path:
            raise S3ArtifactNotFound

        uploaded = False
        if self.s3props.get("content_metadata"):
            LOG.info("Uploading in multiple parts to set metadata")
            uploaded = self.content_metadata_uploads()

        if not uploaded:
            cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path,
                                                                                      self.s3_version_uri, self.env)
            result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
            LOG.debug("Upload Command Ouput: %s", result.stdout)

        LOG.info("Uploaded artifacts to %s bucket", self.bucket)
s3deploy.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _sync_to_uri(self, uri):
        """Copy and sync versioned directory to uri in S3.

        Args:
            uri (str): S3 URI to sync version to.
        """
        cmd_cp = 'aws s3 cp {} {} --recursive --profile {}'.format(self.s3_version_uri, uri, self.env)
        # AWS CLI sync does not work as expected bucket to bucket with exact timestamp sync.
        cmd_sync = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(
            self.s3_version_uri, uri, self.env)

        cp_result = subprocess.run(cmd_cp, check=True, shell=True, stdout=subprocess.PIPE)
        LOG.debug("Copy to %s before sync output: %s", uri, cp_result.stdout)
        LOG.info("Copied version %s to %s", self.version, uri)

        sync_result = subprocess.run(cmd_sync, check=True, shell=True, stdout=subprocess.PIPE)
        LOG.debug("Sync to %s command output: %s", uri, sync_result.stdout)
        LOG.info("Synced version %s to %s", self.version, uri)
test_manubot.py 文件源码 项目:manubot 作者: greenelab 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def test_example_manuscript(manuscript):
    """
    Test command line execution of manubot to build an example manuscript.
    """
    manuscript_dir = directory.joinpath('manuscripts', manuscript)
    args = [
        'manubot',
        '--log-level', 'INFO',
        '--content-directory', manuscript_dir.joinpath('content'),
        '--output-directory', manuscript_dir.joinpath('output'),
    ]
    if manuscript == 'variables':
        args.extend([
            '--template-variables-path',
            manuscript_dir.joinpath('content/template-variables.json'),
        ])
    process = subprocess.run(
        args,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    print(process.args)
    print(process.stderr.decode())
    assert process.returncode == 0
build_commands.py 文件源码 项目:hlsclt 作者: benjmarshall 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def do_start_build_stuff(ctx):
    config = ctx.obj.config
    solution_num = ctx.obj.solution_num
    try:
        file = click.open_file("run_hls.tcl","w")
        file.write("open_project " + config["project_name"] + "\n")
        file.write("set_top " + config["top_level_function_name"] + "\n")
        for src_file in config["src_files"]:
            file.write("add_files " + config["src_dir_name"] + "/" + src_file + "\n")
        for tb_file in config["tb_files"]:
            file.write("add_files -tb " + config["tb_dir_name"] + "/" + tb_file + "\n")
        if ctx.params['keep']:
            file.write("open_solution -reset \"solution" + str(solution_num) + "\"" + "\n")
        else:
            file.write("open_solution \"solution" + str(solution_num) + "\"" + "\n")
        file.write("set_part \{" + config["part_name"] + "\}" + "\n")
        file.write("create_clock -period " + config["clock_period"] + " -name default" + "\n")
        return file
    except OSError:
        click.echo("Woah! Couldn't create a Tcl run file in the current folder!")
        raise click.Abort()

# Function to write a default build into the HLS Tcl build script.
build_commands.py 文件源码 项目:hlsclt 作者: benjmarshall 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def build_end_callback(ctx,sub_command_returns,keep,report):
    # Catch the case where no subcommands have been issued and offer a default build
    if not sub_command_returns:
        if click.confirm("No build stages specified, would you like to run a default sequence using all the build stages?", abort=True):
            do_default_build(ctx)
    ctx.obj.file.write("exit" + "\n")
    ctx.obj.file.close()
    # Call the Vivado HLS process
    hls_processs = subprocess.run(["vivado_hls", "-f", "run_hls.tcl"])
    # Check return status of the HLS process.
    if hls_processs.returncode < 0:
        raise click.Abort()
    elif hls_processs.returncode > 0:
        click.echo("Warning: HLS Process returned an error, skipping report opening!")
        raise click.Abort()
    else:
        do_end_build_stuff(ctx,sub_command_returns,report)

# csim subcommand
mylemonbar.py 文件源码 项目:myDotFiles 作者: GuidoFe 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def volume_plugin(colors):
    path=os.path.realpath(__file__)
    path=os.path.dirname(path)
    p=subprocess.run(['sh',path+"/volume_level.sh"], stdout=subprocess.PIPE)
    string=''
    for c in str(p.stdout):
        if c.isnumeric():
            string+=c
    level=int(string)
    p=subprocess.run(['sh', path+"/volume_muted.sh"], stdout=subprocess.PIPE)
    if str(p.stdout)[2]=='y':
        muted=True
    else:
        muted=False
    string='%{F'+colors["lwhite"]+'}'
    if level>0 and not muted:
        if level>=50:
            string+='\uf028' 
        else:
            string+='\uf027'
    else:
        string+='\uf026'
    string+="  %{F"+colors["lwhite"]+"}%3.0f"%level+'%'
    return string
local.py 文件源码 项目:doctr 作者: drdoctr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def generate_ssh_key(note, keypath='github_deploy_key'):
    """
    Generates an SSH deploy public and private key.

    Returns the public key as a str.
    """
    p = subprocess.run(['ssh-keygen', '-t', 'rsa', '-b', '4096', '-C', note,
        '-f', keypath, '-N', ''])

    if p.returncode:
        raise RuntimeError("SSH key generation failed")

    with open(keypath + ".pub") as f:
        key = f.read()

    os.remove(keypath + ".pub")

    return key
local.py 文件源码 项目:doctr 作者: drdoctr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def guess_github_repo():
    """
    Guesses the github repo for the current directory

    Returns False if no guess can be made.
    """
    p = subprocess.run(['git', 'ls-remote', '--get-url', 'origin'],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
    if p.stderr or p.returncode:
        return False

    url = p.stdout.decode('utf-8').strip()
    m = GIT_URL.fullmatch(url)
    if not m:
        return False
    return m.group(1)
travis.py 文件源码 项目:doctr 作者: drdoctr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def checkout_deploy_branch(deploy_branch, canpush=True):
    """
    Checkout the deploy branch, creating it if it doesn't exist.
    """
    # Create an empty branch with .nojekyll if it doesn't already exist
    create_deploy_branch(deploy_branch, push=canpush)
    remote_branch = "doctr_remote/{}".format(deploy_branch)
    print("Checking out doctr working branch tracking", remote_branch)
    clear_working_branch()
    # If gh-pages doesn't exist the above create_deploy_branch() will create
    # it we can push, but if we can't, it won't and the --track would fail.
    if run(['git', 'rev-parse', '--verify', remote_branch], exit=False) == 0:
        extra_args = ['--track', remote_branch]
    else:
        extra_args = []
    run(['git', 'checkout', '-b', DOCTR_WORKING_BRANCH] + extra_args)
    print("Done")

    return canpush
travis.py 文件源码 项目:doctr 作者: drdoctr 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def push_docs(deploy_branch='gh-pages', retries=3):
    """
    Push the changes to the branch named ``deploy_branch``.

    Assumes that :func:`setup_GitHub_push` has been run and returned True, and
    that :func:`commit_docs` has been run. Does not push anything if no changes
    were made.

    """

    code = 1
    while code and retries:
        print("Pulling")
        code = run(['git', 'pull', '-s', 'recursive', '-X', 'ours',
            'doctr_remote', deploy_branch], exit=False)
        print("Pushing commit")
        code = run(['git', 'push', '-q', 'doctr_remote',
            '{}:{}'.format(DOCTR_WORKING_BRANCH, deploy_branch)], exit=False)
        if code:
            retries -= 1
            print("Push failed, retrying")
            time.sleep(1)
        else:
            return
    sys.exit("Giving up...")
build_doc.py 文件源码 项目:python-everywhere 作者: wdv4758h 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_cmd(cmd, quiet=False):
    if not quiet:
        logging.info('command: {}'.format(cmd))

    # use shlex to keep quoted substrings
    result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    stdout = result.stdout.strip().decode()
    stderr = result.stderr.strip().decode()

    if stdout and not quiet:
        logging.debug(stdout)

    if stderr and not quiet:
        logging.warning(stderr)

    return result.stdout.strip()
utils.py 文件源码 项目:zinc 作者: PressLabs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def meld(got, expected):
    if got == expected:
        return
    import inspect
    call_frame = inspect.getouterframes(inspect.currentframe(), 2)
    test_name = call_frame[1][3]
    from pprint import pformat
    import os
    from os import path
    os.makedirs(test_name, exist_ok=True)
    got_fn = path.join(test_name, 'got')
    expected_fn = path.join(test_name, 'expected')
    with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f:
        got_f.write(pformat(got))
        expected_f.write(pformat(expected))
    import subprocess
    subprocess.run(['meld', got_fn, expected_fn])
plugin.py 文件源码 项目:neoreader 作者: MaxwellBo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def call_say(self, txt: str, speed=None, pitch=None, literal=False):
        if self.get_option(self.Options.USE_ESPEAK):
            args = ["espeak"]
            if pitch:
                args += ["-p", str(pitch)]
            if speed:
                args += ["-s", str(speed)]
            if literal:
                txt = " ".join(txt)
            args.append(txt)
        else:
            args = ["say"]
            if pitch:
                txt = f"[[ pbas +{pitch}]] {txt}"
            if speed:
                args += ["-r", str(speed)]
            if literal:
                txt = f"[[ char LTRL ]] {txt}"
            args.append(txt)

        if self.enabled:
            logger.debug(f"Saying '{txt}'")
            subprocess.run(args)
dar_backup.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(
        description="Backup system/data using dar and par2")
    parser.add_argument("-c", "--config", dest="config", required=True,
                        help="configuration file for dar and archive. " +
                        "NOTE: the backup archive will be placed under " +
                        "the same directory as this configuration file")
    parser.add_argument("-n", "--dry-run", dest="dry_run", action="store_true",
                        help="dry run, do not perform any action")
    parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
                        help="show verbose information")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.INFO)

    settings = DarSettings(args.config, verbose=args.verbose,
                           dry_run=args.dry_run)
    dar = DarBackup(settings)
    dar.run(dry_run=args.dry_run)
rabbit.py 文件源码 项目:mooq 作者: jeremyarr 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self):
        '''
        Restarts the RabbitMQ broker using a method derived from the TEST_DISTRIBUTION
        environmental variable.


        If TEST_DISTRIBUTION=="arch", will try to restart rabbitmq using the linux ``systemctl``
        command.

        If TEST_DISTRIBUTION=="ubuntu", will try to restart rabbitmq using the linux ``service``
        command.

        Will wait for 20 seconds after restarting before returning.

        :raises ValueError: if TEST_DISTRIBUTION environmental variable not found

        .. note:: the user who invokes this method will likely require sudo access to the linux commands.
            This can be provided by editing the sudoers file.
        '''

        await self.loop.run_in_executor(None, self._run)
        await asyncio.sleep(20)
networkmanager.py 文件源码 项目:NordVPN-NetworkManager 作者: Chadsr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_vpn_connections():
    try:
        output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,NAME', 'connection', 'show'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output.check_returncode()

        lines = output.stdout.decode('utf-8').split('\n')

        vpn_connections = []
        for line in lines:
            if line:
                elements = line.strip().split(':')

                if (elements[0] == 'vpn'):
                    vpn_connections.append(elements[1])

        return vpn_connections

    except subprocess.CalledProcessError:
        error = utils.format_std_string(output.stderr)
        logger.error(error)
        return False
networkmanager.py 文件源码 项目:NordVPN-NetworkManager 作者: Chadsr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_interfaces(wifi=True, ethernet=True):
    try:
        output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,DEVICE', 'device', 'status'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output.check_returncode()

        lines = output.stdout.decode('utf-8').split('\n')

        interfaces = []
        for line in lines:
            if line:
                elements = line.strip().split(':')

                if (wifi and elements[0] == 'wifi') or (ethernet and elements[0] == 'ethernet'):
                    interfaces.append(elements[1])

        return interfaces

    except subprocess.CalledProcessError:
        error = utils.format_std_string(output.stderr)
        logger.error(error)
        return False

    except Exception as ex:
        logger.error(ex)
        return False
benchmarking.py 文件源码 项目:NordVPN-NetworkManager 作者: Chadsr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_num_processes(num_servers):
    # Since each process is not resource heavy and simply takes time waiting for pings, maximise the number of processes (within constraints of the current configuration)

    # Maximum open file descriptors of current configuration
    soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)

    # Find how many file descriptors are already in use by the parent process
    ppid = os.getppid()
    used_file_descriptors = int(subprocess.run('ls -l /proc/' + str(ppid) + '/fd | wc -l', shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8'))

    # Max processes is the number of file descriptors left, before the sof limit (configuration maximum) is reached
    max_processes = int((soft_limit - used_file_descriptors) / 2)

    if num_servers > max_processes:
        return max_processes
    else:
        return num_servers
feature_extraction.py 文件源码 项目:website-fingerprinting 作者: AxelGoetz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def extract_all_features(save_dir, data_dir=DATA_DIR, extension=".cell"):
    from naive_bayes import extract_nb_features
    from random_forest import extract_rf_features
    from svc1 import extract_svc1_features
    from svc2 import extract_svc2_features
    import subprocess

    create_dir_if_not_exists(save_dir + '/knn_cells/')
    subprocess.run([
        'go', 'run', dirname + '/kNN.go', '-folder', data_dir + '/',
        '-new_path', save_dir + '/knn_cells/', '-extension', extension]
    )

    # extract_features(extract_nb_features, save_dir + '/nb_cells', data_dir=data_dir, extension=extension, model_name="naive bayes")
    extract_features(extract_rf_features, save_dir + '/rf_cells', data_dir=data_dir, extension=extension, model_name="random forest")
    extract_features(extract_svc1_features, save_dir + '/svc1_cells', data_dir=data_dir, extension=extension, model_name="svc1")
    extract_features(extract_svc2_features, save_dir + '/svc2_cells', data_dir=data_dir, extension=extension, model_name="svc2")

    stdout.write("Finished extracting features\n")
build_fix_install_rpath.py 文件源码 项目:scm-workbench 作者: barry-scott 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fixRPath( self, filename ):
        all_rpaths = self.getRPaths( filename )

        all_replacements = []
        print( 'Checking RPATH for %s' % (filename,) )
        for rpath in all_rpaths:
            print( '    Looking at RPATH %s' % (rpath,) )
            for all_orig_rpaths, replacement in self.all_rpath_replacements:
                all_replacements.append( replacement )
                for orig in all_orig_rpaths:
                    if rpath == orig:
                        print( '    Need to replace %s with %s' % (rpath, replacement) )
                        subprocess.run( ['install_name_tool', '-rpath', rpath, replacement, filename], check=True )

        all_rpaths = self.getRPaths( filename )
        for rpath in all_rpaths:
            if rpath not in all_replacements:
                print( '    Delete unused rpath %s' % (rpath,) )
                subprocess.run( ['install_name_tool', '-delete_rpath', rpath, filename], check=True )
build_fix_install_rpath.py 文件源码 项目:scm-workbench 作者: barry-scott 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getRPaths( self, filename ):
        all_rpaths = []
        res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True )
        state = self.ST_IDLE
        for line in res.stdout.split('\n'):
            words = line.strip().split()
            if state == self.ST_IDLE:
                if words == ['cmd','LC_RPATH']:
                    state = self.ST_RPATH

            elif state == self.ST_RPATH:
                if words[0:1] == ['path']:
                    path = words[1]
                    all_rpaths.append( path )
                    state = self.ST_IDLE

        return all_rpaths
build_fix_install_rpath.py 文件源码 项目:scm-workbench 作者: barry-scott 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getDylibs( self, filename ):
        all_dylibs = []
        res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True )
        state = self.ST_IDLE
        for line in res.stdout.split('\n'):
            words = line.strip().split()
            if state == self.ST_IDLE:
                if words == ['cmd','LC_LOAD_DYLIB']:
                    state = self.ST_DYNLIB

            elif state == self.ST_DYNLIB:
                if words[0:1] == ['name']:
                    path = words[1]
                    all_dylibs.append( path )
                    state = self.ST_IDLE

        return all_dylibs
helium_void_fraction.py 文件源码 项目:HTSOHM-dev 作者: akaija 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def write_raspa_file(filename, uuid, simulation_config):
    """Writes RASPA input file for calculating helium void fraction.

    Args:
        filename (str): path to input file.
        run_id (str): identification string for run.
        material_id (str): uuid for material.

    Writes RASPA input-file.

    """
    # Load simulation parameters from config
    values = {
            'NumberOfCycles'                : simulation_config['simulation_cycles'],
            'FrameworkName'                 : uuid}

    # Load template and replace values
    input_data = load_and_subs_template('helium_void_fraction.input', values)

    # Write simulation input-file
    with open(filename, "w") as raspa_input_file:
        raspa_input_file.write(input_data)
surface_area.py 文件源码 项目:HTSOHM-dev 作者: akaija 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def write_raspa_file(filename, uuid, simulation_config):
    """Writes RASPA input file for calculating surface area.

    Args:
        filename (str): path to input file.
        run_id (str): identification string for run.
        material_id (str): uuid for material.

    Writes RASPA input-file.

    """
    # Load simulation parameters from config
    values = {
            'NumberOfCycles'                : simulation_config['simulation_cycles'],
            'FrameworkName'                 : uuid}

    # Load template and replace values
    input_data = load_and_subs_template('surface_area.input', values)

    # Write simulation input-file
    with open(filename, "w") as raspa_input_file:
        raspa_input_file.write(input_data)
blurb.py 文件源码 项目:core-workflow 作者: python 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def flush_git_rm_files():
    if git_rm_files:
        try:
            subprocess.run(["git", "rm", "-f", *git_rm_files], stdout=subprocess.PIPE, stderr=subprocess.PIPE).check_returncode()
        except subprocess.CalledProcessError:
            pass

        # clean up
        for path in git_rm_files:
            try:
                os.unlink(path)
            except FileNotFoundError:
                pass

        git_rm_files.clear()


# @subcommand
# def noop():
#     "Do-nothing command.  Used for blurb smoke-testing."
#     pass
cmdprox.py 文件源码 项目:proxmox-tools 作者: FredHutch 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run_chef_knife(host):
    knife = "knife bootstrap --no-host-key-verify " \
        "--ssh-user root --ssh-identity-file %s/.ssh/id_rsa_prox " \
        "--environment=scicomp-env-compute " \
        '--server-url "https://chef.fhcrc.org/organizations/cit" ' \
        "--run-list 'role[cit-base]','role[scicomp-base]' " \
        "--node-name %s " \
        "%s" % (homedir,host,host)
    if host == 'hostname': 
        print('you can also execute this knife command manually:')
        print('************************************')
        print(knife)
        print('************************************')
    else:
        if os.path.exists('%s/.chef' % homedir):
            print('*** executing knife command:')
            print(knife)
            ret = subprocess.run(knife, shell=True)
        else:
            print ('chef/knife config dir %s/.chef does not exist.' % homedir)


问题


面经


文章

微信
公众号

扫码关注公众号