python类hosts()的实例源码

mobile_static_resources.py 文件源码 项目:astoptool 作者: zouliuyun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_env_mobile_static_resources(game, region, component, root_dir, ip):

    @hosts(ip)
    def _test_env_mobile_static_resources():
        """
        Inner Fabric task 
        """
        ftp_resource_dir = "/app/online/{}/frontend/{}/{}".format(game, region, component)
        remote_temp_dir = "/app/opbak/test_mobile_static_resources_release/{}_{}_{}_{}".format(game, region, component, TIMESTAMP)
        remote_backup_dir = "/app/opbak/test_mobile_static_resources_release/backup_{}_{}_{}_{}".format(game, region, component, TIMESTAMP)

        #????md5
        with lcd(ftp_resource_dir):
            local("dos2unix md5.txt >/dev/null 2>&1")
            local("chown virtual_user.virtual_user md5.txt")
            local("md5sum -c md5.txt >/dev/null")

        #??????????
        remote_mkdir(remote_temp_dir)

        #??zip??md5.txt
        component_zip_file = '{}.zip'.format(component)
        print('???? {}...'.format(component_zip_file))
        sys.stdout.flush()
        with lcd(ftp_resource_dir):
            put(component_zip_file, remote_temp_dir)
            put('md5.txt', remote_temp_dir)

        #????md5
        with cd(remote_temp_dir):
            run('dos2unix md5.txt')
            run('md5sum -c md5.txt')
            run("unzip -o -q {}".format(component_zip_file))

            #??zip???????component???
            run('test -d "{}"'.format(component))

        static_resources_dir = '{}/static_resources'.format(root_dir)
        remote_mkdir(static_resources_dir)

        with cd(static_resources_dir):
            if remote_dir_exists(component):
                run('mkdir -p {}'.format(remote_backup_dir))
                run('mv {} {}/'.format(component, remote_backup_dir))

        with cd(remote_temp_dir):
            run('cp -r {} {}/'.format(component, static_resources_dir))

        #??FTP???????
        local("rm -rf /app/online/{}/frontend/{}/{}".format(game, region, component))

    execute(_test_env_mobile_static_resources)
flush_mem_data.py 文件源码 项目:astoptool 作者: zouliuyun 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def flush(self, gameservers, command, table=None, path=None):

        locate_gameservers = self.game_pj.transform(gameservers)
        ips = locate_gameservers.keys()

        def _flush(gameserver, http_port):
            base_url = 'http://127.0.0.1:{}/root/gateway.action'.format(http_port)

            real_url = '{}?command={}'.format(base_url, command)

            if table is not None:
                real_url += '&tableName={}'.format(table)
            if path is not None:
                real_url += '&path={}'.format(path)

            return run('curl "{}"'.format(real_url))

        def _flush_task():
            if path is not None:
                download_from_resource(self.game, path)

            ret = {}

            for gameserver in locate_gameservers[env.host_string]:
                print('Working on {}...'.format(gameserver))
                http_port = get_http_port(gameserver)
                result = _flush(gameserver, http_port)

                _result = result.lower()

                success_tags = ['succ', 'success']
                if any(each in _result for each in success_tags):
                    ret[gameserver] = (True, result)
                else:
                    ret[gameserver] = (False, result)

            return ret

        result = execute(_flush_task, hosts=ips)

        total = {}
        for each_result in result:
            for each_gameserver in result[each_result]:
                total[each_gameserver] = result[each_result][each_gameserver] 

        return total
remote_add_environment.py 文件源码 项目:f5-openstack-lbaasv2-driver 作者: F5Networks 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def add_diff_env_to_controller(differentiated_environment):
    """Add a differentiated environment remotely and bounce services.

    This function is used in:

     *  test/functional/test_environment_add.py

    Examine that example for further explanation.

    Given an appropriate host_string and password, this function:

    (0) halts services on a Neutron controller;
    (1) reconfigures the relevant files to add an "environment"
        service_provider;
    (2) restarts the services.

    (CRITICAL NOTE: The relevant credentials are hardcoded
    via the 'source keystonerc_testlab' line.
    NOT apropriate for use in a production environment.)
    """
    env.host_string = ''.join(
        [pytest.symbols.tenant_name,
         '@',
         pytest.symbols.controller_ip,
         ':22'])

    @hosts(env.host_string)
    def setup_env_oncontroller(diff_env):
        env.password = pytest.symbols.tenant_password
        execute(lambda: run('sudo ls -la'))

        # Stop existing agent
        execute(lambda: run('sudo systemctl stop f5-openstack-agent'))
        # Stop neutron server / f5_plugin
        execute(lambda: run('sudo systemctl stop neutron-server'))
        # Edit agent configuration to use new environment
        sedtempl = '''sed -i "s/^\(environment_prefix = \)\(.*\)$/\\1%s/"''' +\
                   ''' /etc/neutron/services/f5/f5-openstack-agent.ini'''
        sedstring = 'sudo ' + sedtempl % diff_env
        execute(lambda: run(sedstring))
        # Add diff env to neutron_lbaas.conf and installed Python package
        add_string = 'sudo add_f5agent_environment %s' % diff_env
        execute(lambda: run(add_string))
        # Start neutron-server / f5_plugin
        execute(lambda: run('sudo systemctl start neutron-server'))
        # Start existing agent
        execute(lambda: run('source keystonerc_testlab && '
                            'sudo systemctl start f5-openstack-agent'))

    setup_env_oncontroller(differentiated_environment)


问题


面经


文章

微信
公众号

扫码关注公众号