python类local()的实例源码

fabfile.py 文件源码 项目:pyconapac-2016 作者: pythonkr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def deploy(target='dev', sha1=None):
    if sha1 is None:
        # get current working git sha1
        sha1 = local('git rev-parse HEAD', capture=True)
    # server code reset to current working sha1
    home_dir = '/home/pyconkr/{target}.pycon.kr/pyconkr-2016'.format(target=target)

    if target == 'dev':
        python_env = '/home/pyconkr/.pyenv/versions/pyconkr-2016-dev'
    else:
        python_env = '/home/pyconkr/.pyenv/versions/pyconkr-2016'

    with settings(cd(home_dir), shell_env(DJANGO_SETTINGS_MODULE='pyconkr.settings_prod')):
        sudo('git fetch --all -p', user='pyconkr')
        sudo('git reset --hard ' + sha1, user='pyconkr')
        sudo('bower install', user='pyconkr')
        sudo('%s/bin/pip install -r requirements.txt' % python_env, user='pyconkr')
        sudo('%s/bin/python manage.py compilemessages' % python_env, user='pyconkr')
        sudo('%s/bin/python manage.py migrate' % python_env, user='pyconkr')
        sudo('%s/bin/python manage.py collectstatic --noinput' % python_env, user='pyconkr')
        # worker reload
        run('echo r > /var/run/pyconkr-2016-%s.fifo' % target)
fabfile.py 文件源码 项目:gloss 作者: openhealthcare 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def deploy_test(key_file_name="../ec2.pem"):
    env.key_filename = key_file_name
    changes = local("git status --porcelain", capture=True)
    if len(changes):
        print " {}".format(changes)
        proceed = prompt(
            "you have uncommited changes, do you want to proceed",
            default=False,
            validate=bool
        )

        if not proceed:
            return

    git_branch_name = local('git rev-parse --abbrev-ref HEAD', capture=True)
    with prefix(". /usr/share/virtualenvwrapper/virtualenvwrapper.sh"):
        with prefix("workon {}".format(virtual_env_name)):
            run("git pull origin {}".format(git_branch_name))
            run("pip install -r requirements.txt")
            run("alembic upgrade head")
            run("pkill twistd||true")
            run("pkill gloss||true")
            run("twistd multiple_mllp --receiver gloss.ohc_receiver.OhcReceiver")
            run("gunicorn -w 1 -b 0.0.0:6767 -D gloss.api:app")
fabfile.py 文件源码 项目:dprr-django 作者: kingsdigitallab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def vagrant():
    env.srvr = 'vagrant'
    env.path = os.path.join('/', env.srvr)

    # this is necessary because ssh will fail when known hosts keys vary
    # every time vagrant is destroyed, a new key will be generated
    env.disable_known_hosts = True

    env.within_virtualenv = 'source {}'.format(
        os.path.join('~', 'venv', 'bin', 'activate'))

    result = dict(line.split()
                  for line in local('vagrant ssh-config',
                                    capture=True).splitlines())

    env.hosts = ['%s:%s' % (result['HostName'], result['Port'])]
    env.key_filename = result['IdentityFile']
    env.user = result['User']

    print(env.key_filename, env.hosts, env.user)
fabfile.py 文件源码 项目:dprr-django 作者: kingsdigitallab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(version=None):
    require('srvr', 'path', 'within_virtualenv', provided_by=env.servers)

    if version:
        # try specified version first
        to_version = version
    elif not version and env.srvr in ['local', 'vagrant', 'dev']:
        # if local, vagrant or dev deploy to develop branch
        to_version = 'develop'
    else:
        # else deploy to master branch
        to_version = 'master'

    with cd(env.path), prefix(env.within_virtualenv):
        run('git pull')
        run('git checkout {}'.format(to_version))
zfs.py 文件源码 项目:znappy 作者: eBayClassifiedsGroup 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def zfs_get_properties(self, target):
        if self.properties:
            properties = ','.join(self.properties)
            cmd = '/sbin/zfs get -H -o property,value {properties} {target}'.format(
                properties=properties,
                target=target
            )

            logger.debug('Getting properties from {}'.format(target))
            logger.debug('Executing {}'.format(cmd))

            result = local(cmd, capture=True)

            if result.return_code != 0:
                raise SnapshotException('Failed to get properties of {}'.format(target))

            props = {k.split()[0]: k.split()[1] for k in result.splitlines()}
            return props

        return {}
zfs.py 文件源码 项目:znappy 作者: eBayClassifiedsGroup 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def zfs_clone(self, source, target, properties):
        prop_list = ' '.join(map(lambda p: "-o {}={}".format(p, properties[p]), properties))

        cmd = '/sbin/zfs clone {prop_list} {source} {target}'.format(
            prop_list=prop_list,
            source=source,
            target=target
        )

        logger.debug('Cloning {source} into {target}'.format(
            source=source,
            target=target
        ))

        logger.debug('Executing: {}'.format(cmd))

        if local(cmd).return_code != 0:
            raise SnapshotException('Failed to clone {}'.format(source))
zfs.py 文件源码 项目:znappy 作者: eBayClassifiedsGroup 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_snapshot_sync(self, znappy, *args, **kwargs):
        local_snapshots = self.zfs_snapshot_list(self, self.filesystem)

        local_snapshots = set(x.split('@', 2)[1] for x in local_snapshots)
        consul_snapshots = set(znappy.host.snapshots.keys())

        logger.debug(local_snapshots)
        logger.debug(consul_snapshots)

        diff = local_snapshots ^ consul_snapshots

        if len(diff) == 0:
            return True, (0, "OK: No differences in consul and local system")
        elif len(diff) == 1:
            return False, (1, "WARN: {} difference between consul and local system: {}".format(len(diff), ', '.join(diff)))
        else:
            return False, (2, "CRITICAL: {} differences between consul and local system: {}".format(len(diff), ', '.join(diff)))
fabfile.py 文件源码 项目:rq-scheduler-dashboard 作者: lamflam 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def style():
    """Use flake8 to check Python style, PEP8, and McCabe complexity.

    See http://pypi.python.org/pypi/flake8/

    .. note::

        * Files with the following header are skipped::

            # flake8: noqa

        * Lines that end with a ``# NOQA`` comment will not issue a warning.

    """
    with lcd(_relative_to_fabfile('rq_dashboard')):
        local(
            'flake8 '
            '--exclude=".svn,CVS,.bzr,.hg,.git,__pycache__,._*" '
            '--max-complexity=9 .')
__init__.py 文件源码 项目:spacy-dev-resources 作者: explosion 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def build_vocab(language, corpus_files_root):
    corpus_dir = CORPUS_DIR.format(lang=language)
    local("mkdir -p {}".format(corpus_dir))

    model_dir = MODEL_DIR.format(lang=language)
    local("mkdir -p {}".format(model_dir))

    corpus_file = join(corpus_dir, "{}_wiki.corpus".format(language))
    merge_corpus(corpus_files_root, corpus_file)

    word_freq_path = join(model_dir, "{}_wiki.freqs".format(language))
    word_counts(corpus_files_root + "/*", word_freq_path)

    word2vec_model_path = join(model_dir, "{}_wiki.word2vec".format(language))
    word2vec(corpus_file, word2vec_model_path)

    brown_out_dir = join(model_dir, "brown")
    brown_clusters(corpus_file, brown_out_dir)

    init_vocab(language, model_dir, word_freq_path, word2vec_model_path, brown_out_dir)
__init__.py 文件源码 项目:spacy-dev-resources 作者: explosion 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def word2vec(corpus_path, out_path, dim=150, threads=4, min_count=10, cbow=0):
    local("mkdir -p {}".format(dirname(out_path)))
    local(
        "python -m gensim.scripts.word2vec_standalone " +
        "-train {corpus_file} -output {file} -size {dim} -threads {threads} -min_count {min} -cbow {cbow}".format(
            corpus_file=corpus_path,
            dim=dim,
            file=out_path,
            threads=threads,
            min=min_count,
            cbow=cbow
        )
    )
    local("bzip2 {}".format(out_path), capture=True)
    # local(
    #     "python training/word_vectors.py {lang} {in_dir} {out_file} -n {threads} -d {dim}".format(
    #         dim=dim,
    #         in_dir=corpus_path,
    #         out_file=out_path,
    #         threads=threads,
    #         lang=language,
    #     )
    # )
fabfile.py 文件源码 项目:service-notifications 作者: rehive 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def provision(provider='digitalocean'):
    if provider == 'digitalocean':
        local('docker-machine create '
              '--driver digitalocean '
              '--digitalocean-region=ams2 '
              '--digitalocean-size=1gb '
              '--digitalocean-access-token={digital_ocean_token} '
              '--digitalocean-ssh-user {user}'
              '{host_name}'.format(digital_ocean_token=env.digital_ocean_token,
                                   user=env.username,
                                   host_name=env.host_name))

    # for gcloud, first install gcloud and do gcloud auth login
    elif provider == 'gcloud':
        local('docker-machine create '
              '--driver google '
              '--google-project play-server '
              '--google-zone europe-west1-c '
              '--google-machine-type g1-small '
              '--google-disk-size 20 '
              '--google-disk-type pd-standard '
              '--google-username {user} '
              '{host_name}'.format(host_name=env.host_name, user=env.username))
fabfile.py 文件源码 项目:service-notifications 作者: rehive 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ssh_config():
    ip_address = local(
        'docker-machine ip {host_name}'.format(host_name=env.host_name), capture=True)
    keyfile = '~/.docker/machine/machines/{host_name}/id_rsa'.format(
        host_name=env.host_name)

    ssh_config = env.ssh_config_template.format(
        host_name=env.host_name,
        ip=ip_address,
        port=env.sshd_port,
        user=env.username,
        keyfile=keyfile,
    )
    local('echo "\nHost {host_name}\n\tHostName {ip}\n\tPort {ssh_port}\n\tUser {user}\n\tIdentityFile {keyfile}"'
          '>> ~/.ssh/config'.format(host_name=env.host_name,
                                    ip=ip_address,
                                    ssh_port=env.sshd_port,
                                    user=env.username,
                                    keyfile=keyfile))
    print(ssh_config)
fabfile.py 文件源码 项目:service-notifications 作者: rehive 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def install():
    # Add user to sudo:
    sudo('adduser {user} sudo'.format(user=env.username))

    # Install Docker Compose:
    sudo('curl -L '
         'https://github.com/docker/compose/releases/download/{docker_compose_version}/'
         'docker-compose-`uname -s`-`uname -m`'
         ' > /usr/local/bin/docker-compose'.format(docker_compose_version=env.docker_compose_version))

    sudo('chmod +x /usr/local/bin/docker-compose')

    # Add user to docker group:
    sudo('gpasswd -a {user} docker'.format(user=env.username))
    sudo('service docker restart')

    # Create server directory structure:
    sudo("mkdir -p /srv/certs /srv/config /srv/apps/default /srv/htdocs /srv/build")
    sudo("chown -R %s:%s /srv/" % (env.username, env.username))
fabfile.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_versioning_metadata():
    """ Extracts version metadata from the version control system """
    if _is_hg():
        commit_summary = local('hg id -i', capture=True).translate("+")
        # Extract the current branch/bookmark from the bookmarks list.
        bookmarks = local("hg bookmarks", capture=True)
        branch = "master"
        for line in bookmarks.split("\n"):
            if "*" in line:
                branch = line.split()[1]
                break
    elif _is_git():
        branch = local("git rev-parse --abbrev-ref HEAD", capture=True)
        commit_summary = local('git rev-parse HEAD', capture=True).translate(None, "+")
    else:
        raise Exception("Not git or hg")

    # dpkg requires the version start with a number, so lead with `0-`
    version = "0-%s" % commit_summary.split()[0]
    return branch, commit_summary, version
fabfile.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def package():
    """ [deploy] Creates a deployment package. """
    branch, summary, version = _get_versioning_metadata()

    # Builds the deployment package.
    local('fpm -s dir -t deb -n endagaweb -a all -v %(version)s \
            --description "%(branch)s: %(cs)s" \
            -d byobu -d nginx -d python-pip -d python-dev \
            -d libpq-dev -d git -d supervisor \
            --after-install configs/deployment/endagaweb-postinst \
            endagaweb=/var/www ../common/ccm=/var/www \
            requirements.txt=/var/opt/ \
            sason=/var/www settings.py=/var/www urls.py=/var/www \
            manage.py=/var/www/ configs/nginx.conf=/etc/nginx/sites-enabled/ \
            configs/uwsgi.conf=/etc/init/ \
            configs/endagaweb.ini=/etc/uwsgi/apps-enabled/ \
            configs/celeryd.conf=/etc/supervisor/conf.d/ \
            configs/celerybeat.conf=/etc/supervisor/conf.d/ \
            configs/celerystick.conf=/etc/supervisor/conf.d/' \
            % {'branch': branch, 'cs': summary, 'version': version})
    return version
fabfile.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def refresh_staging_db():
    """ [deploy] Delete staging DB and clone current prod DB. """
    cmd = ("aws rds delete-db-instance --db-instance-identifier staging \
            --skip-final-snapshot")

    # if this fails, db doesn't exist, so just continue
    with settings(warn_only=True):
        local(cmd, capture=True)

    puts("Waiting for DB to be deleted...")
    while True:
        cmd = "aws rds describe-db-instances --db-instance-identifier staging"
        with settings(warn_only=True):
            # This will keep returning a success until the DB is deleted.
            if local(cmd, capture=True).failed:
                break
        time.sleep(30)
    clonedb("elephant", "staging")
shipping.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _push_packages_to_repo(repo_name="dev"):
    """Push local deploy directory of packages to actual repo, and refresh the
    repo.
    """
    if env.pkgfmt != "deb":
        # We only support freight, which is only for deb packages. We'd need to
        # add something that understands RPM repos as well if we want to add
        # support for CentOS here.
        print("Only pushing deb packages is supported, not pushing.")
        return
    run('mkdir -p /tmp/endaga-packages-deploy')
    put(local_path='/tmp/endaga-packages-deploy/*.deb',
        remote_path='/tmp/endaga-packages-deploy/')
    sudo('freight add /tmp/endaga-packages-deploy/*.deb apt/%s' % repo_name)
    sudo('freight cache apt/%s' % repo_name)
    run('rm -r /tmp/endaga-packages-deploy')
fabfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def gitrepos(branch=None, fork='sympy'):
    """
    Clone the repo

    fab vagrant prepare (namely, checkout_cache()) must be run first. By
    default, the branch checked out is the same one as the one checked out
    locally. The master branch is not allowed--use a release branch (see the
    README). No naming convention is put on the release branch.

    To test the release, create a branch in your fork, and set the fork
    option.
    """
    with cd("/home/vagrant"):
        if not exists("sympy-cache.git"):
            error("Run fab vagrant prepare first")
    if not branch:
        # Use the current branch (of this git repo, not the one in Vagrant)
        branch = local("git rev-parse --abbrev-ref HEAD", capture=True)
    if branch == "master":
        raise Exception("Cannot release from master")
    run("mkdir -p repos")
    with cd("/home/vagrant/repos"):
        run("git clone --reference ../sympy-cache.git https://github.com/{fork}/sympy.git".format(fork=fork))
        with cd("/home/vagrant/repos/sympy"):
            run("git checkout -t origin/%s" % branch)
fabfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def release(branch=None, fork='sympy'):
    """
    Perform all the steps required for the release, except uploading

    In particular, it builds all the release files, and puts them in the
    release/ directory in the same directory as this one.  At the end, it
    prints some things that need to be pasted into various places as part of
    the release.

    To test the release, push a branch to your fork on GitHub and set the fork
    option to your username.
    """
    remove_userspace()
    gitrepos(branch, fork)
    # This has to be run locally because it itself uses fabric. I split it out
    # into a separate script so that it can be used without vagrant.
    local("../bin/mailmap_update.py")
    source_tarball()
    build_docs()
    copy_release_files()
    test_tarball('2')
    test_tarball('3')
    compare_tar_against_git()
    print_authors()
servers.py 文件源码 项目:django-starter-kit 作者: nprapps 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def render_confs():
    """
    Renders server configurations.
    """
    require('settings', provided_by=['production', 'staging'])

    with settings(warn_only=True):
        local('mkdir confs/rendered')

    # Copy the app_config so that when we load the secrets they don't
    # get exposed to other management commands
    context = copy.copy(app_config.__dict__)
    context.update(app_config.get_secrets())

    for service, remote_path, extension in app_config.SERVER_SERVICES:
        template_path = _get_template_conf_path(service, extension)
        rendered_path = _get_rendered_conf_path(service, extension)

        with open(template_path,  'r') as read_template:

            with open(rendered_path, 'w') as write_template:
                payload = Template(read_template.read())
                write_template.write(payload.render(**context))
fabfile.py 文件源码 项目:Distributed-CellProfiler 作者: CellProfiler 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_or_update_ecs_service():
    # Create the service with no workers (0 desired count)
    info = local('aws ecs list-services', capture=True)
    data = json.loads(info)
    service = [srv for srv in data['serviceArns'] if srv.endswith(ECS_SERVICE_NAME)]
    if len(service) > 0:
        print 'Service exists. Removing'
    local('aws ecs delete-service --cluster ' + ECS_CLUSTER + 
          ' --service ' + ECS_SERVICE_NAME,
          capture=True)
    time.sleep(WAIT_TIME)

    print 'Creating new service'
    local('aws ecs create-service --cluster ' + ECS_CLUSTER + 
          ' --service-name ' + ECS_SERVICE_NAME + 
          ' --task-definition ' + ECS_TASK_NAME + 
          ' --desired-count 0 ',
          capture=True
    )


# Amazon SQS
fabfile.py 文件源码 项目:ottertune 作者: cmu-db 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def reset_website():
    # WARNING: destroys the existing website and creates with all
    # of the required inital data loaded (e.g., the KnobCatalog)

    # Recreate the ottertune database
    user = DATABASES['default']['USER']
    passwd = DATABASES['default']['PASSWORD']
    name = DATABASES['default']['NAME']
    local("mysql -u {} -p{} -N -B -e \"DROP DATABASE IF EXISTS {}\"".format(
            user, passwd, name))
    local("mysql -u {} -p{} -N -B -e \"CREATE DATABASE {}\"".format(
            user, passwd, name))

    # Reinitialize the website
    local('python manage.py migrate website')
    local('python manage.py migrate')
opbeat.py 文件源码 项目:soda-pylib 作者: sodavirtual 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def run(self):
        display.info('Registering deployment to Opbeat...')

        # Do not communicate to Opbeat if it's not set up
        if 'opbeat' not in self.roledef:
            display.warning(
                'Opbeat is not set up for {}'.format(self.roledef['name']))
            return

        revision = local('git log -n 1 --pretty="format:%H"', capture=True)
        branch = local('git rev-parse --abbrev-ref HEAD', capture=True)
        local((
            'curl {base_url}/organizations/{opbeat[ORGANIZATION_ID]}/apps'
            '/{opbeat[APP_ID]}/releases/'
            ' -H "Authorization: Bearer {opbeat[SECRET_TOKEN]}"'
            ' -d rev={rev}'
            ' -d branch={branch}'
            ' -d status=completed').format(
                base_url=BASE_URL,
                opbeat=self.roledef['opbeat'],
                rev=revision,
                branch=branch,
        ))
fabfile.py 文件源码 项目:ansible-role 作者: mattvonrocketstein 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def vulture():
    """ try to find dead code paths """
    with api.quiet():
        if not api.local('which vulture').succeeded:
            print 'vulture not found, installing it'
            api.local('pip install vulture')
    ignore_functions_grep = 'egrep -v "{0}"'.format(
        '|'.join(VULTURE_IGNORE_FUNCTIONS))
    excluded = ",".join(VULTURE_EXCLUDE_PATHS)
    excluded_paths = (' --exclude ' + excluded) if excluded else ''
    vulture_cmd = '\n  vulture {pkg_name}{exclude}{pipes}'
    vulture_cmd = vulture_cmd.format(
        pkg_name=PKG_NAME,
        exclude=excluded_paths,
        pipes='|'.join(['', ignore_functions_grep]))
    changedir = api.lcd(os.path.dirname(__file__))
    warn_only = api.settings(warn_only=True)
    be_quit = api.hide('warnings')
    with contextlib.nested(changedir, warn_only, be_quit):
        result = api.local(vulture_cmd, capture=True)
        exit_code = result.return_code
    print result.strip()
    raise SystemExit(exit_code)
backstage.py 文件源码 项目:astoptool 作者: zouliuyun 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def merge(self, s_server, t_server):
        """
        ??
        .. ???: s_server
        .. ???: t_server
        """
        import json

        game, t_yx, t_id = t_server.split('_')
        game, s_yx, s_id = s_server.split('_')

        cmd = '''curl -H "host:{}" --retry 3 --retry-delay 5 --data "mix={}_S{}&master={}_S{}" http://{}/backStage\\!mixServer.action'''.format(self.backstage_header, s_yx, s_id, t_yx, t_id, self.backstage_interface_url)
        result = local(cmd, capture=True)
        json_result = json.loads(result)
        status = json_result['status']
        if status == '1':
            return True
        else:
            print('The error msg for "{}" is {}'.format(cmd, json_result['msg']))
            return False
mobile_www_release.py 文件源码 项目:astoptool 作者: zouliuyun 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_diff(remote_script_dir, diff_from_lua, diff_to_lua, resource_dir, dest):
    """
    ???????????????????????with cd(dir): ???????

    Example:
        /app/opbak/make_diff_3/make_diff.py --resource-dir 3.6.1.0/res --diff-from 3.6.0.9/res/res.lua --diff-to 3.6.1.0/res/res.lua --dest /app/opbak/make_diff_20150909_xxxxx/3.6.1.0,/app/opbak/make_diff_20150909_xxxxx/3.6.1.0.zip
    """
    with hide('running', 'stdout'):
        run('''python {remote_script_dir}/make_diff.py --resource-dir {resource_dir} --diff-from {diff_from_lua} --diff-to {diff_to_lua} --dest {dest}'''.format(remote_script_dir=remote_script_dir, resource_dir=resource_dir, diff_from_lua=diff_from_lua, diff_to_lua=diff_to_lua, dest=dest))

    #?????????.lua??
    _zipfile = dest.split(',')[0]
    zipfile = _zipfile.rstrip('.zip')
    zip_lua = '{}.lua'.format(zipfile)
    with hide('running', 'stdout'):
        file_size = run('stat --printf="%s" {}'.format(zipfile))
        md5 = run("md5sum {} | awk '{{print $1}}'".format(zipfile)).strip('\n')
        run('''echo -ne 'local updateZipSize = {{}}\nupdateZipSize.value = {file_size}\nupdateZipSize.md5 = "{md5}"\nreturn updateZipSize' >{zip_lua}'''.format(file_size=file_size, md5=md5, zip_lua=zip_lua))
getTodayServer.py 文件源码 项目:astoptool 作者: zouliuyun 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def gettodaylist(game, language):
    #d1 = datetime.date.today()
    d1 = datetime.date.today()
    d1 = d1 + datetime.timedelta(2)
    d2 = d1 + datetime.timedelta(4)
    serverList = []
    servers = {}
    result = local("python /app/opbin/work/bible/main.py  serverlist -g {} -l {} --startdate {} --enddate {} -s '.*'".format(game,language,d1,d2),capture=True)
    for i in result.split("\n"):
        serverList.append(i)
    nums = len(serverList)
    for i in serverList:
        server = i.split("@")[0]
        server_ip = i.split("@")[1]
        servers.setdefault(server_ip,[]).append(server)
    return servers,len
fabfile.py 文件源码 项目:hurricane 作者: johnson-li 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def aws_teardown(tags=None):
    '''Teardown resources on AWS

    Args:
        tags: only execute tasks matching specific tags (comma-separated)
    '''
    ansible_playbook('aws_teardown.yml', 'localhost,',tags=tags)
    domain = '.glow-dev.com'
    local('grep {} inventory/*.yml | xargs -n1 ssh-keygen -R '
          .format(domain))
    rds_conn = boto.rds.connect_to_region(aws_vars['region'])
    for rds in aws_vars['rds_instances']:
        print 'RDS: {} degraded to db.t2.micro'.format(rds['name'])
        rds_conn.modify_dbinstance(rds['name'],
                                   instance_class='db.t2.micro',
                                   apply_immediately=True)
git.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def last_commit(remote=True, short=False):
    '''
    Get the last commit of the git repository.

    Note: This assumes the current working directory (on remote or local host)
    to be a git repository. So, make sure current directory is set before using this.
    '''

    cmd = 'git rev-parse{}HEAD'.format(' --short ' if short else ' ')

    with hide('everything'):
        result = run(cmd) if remote else local(cmd, capture=True)

        return result.strip()
git.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def current_branch(remote=True):
    '''
    Get the current branch of the git repository.

    Note: This assumes the current working directory (on remote or local host)
    to be a git repository. So, make sure current directory is set before using this.
    '''

    cmd = 'git rev-parse --abbrev-ref HEAD'

    with hide('everything'):
        result = run(cmd) if remote else local(cmd, capture=True)

        return result.strip()


问题


面经


文章

微信
公众号

扫码关注公众号