python类task()的实例源码

fabric.py 文件源码 项目:lambda-chef-node-cleanup 作者: awslabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def chef_query(query, api=None, hostname_attr=DEFAULT_HOSTNAME_ATTR, environment=_default_environment):
    """A decorator to use an arbitrary Chef search query to find nodes to execute on.

    This is used like Fabric's ``roles()`` decorator, but accepts a Chef search query.

    Example::

        from chef.fabric import chef_query

        @chef_query('roles:web AND tags:active')
        @task
        def deploy():
            pass

    .. versionadded:: 0.2.1
    """
    api = _api(api)
    if api.version_parsed < Environment.api_version_parsed and environment is not None:
        raise ChefAPIVersionError('Environment support requires Chef API 0.10 or greater')
    rolename = 'query_'+query
    env.setdefault('roledefs', {})[rolename] = Roledef(query, api, hostname_attr, environment)
    return lambda fn: roles(rolename)(fn)
fabric.py 文件源码 项目:lambda-chef-node-cleanup 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def chef_tags(*tags, **kwargs):
    """A decorator to use Chef node tags to find nodes to execute on.

    This is used like Fabric's ``roles()`` decorator, but accepts a list of tags.

    Example::

        from chef.fabric import chef_tags

        @chef_tags('active', 'migrator')
        @task
        def migrate():
            pass

    .. versionadded:: 0.2.1
    """
    # Allow passing a single iterable
    if len(tags) == 1 and not isinstance(tags[0], six.string_types):
        tags = tags[0]
    query = ' AND '.join('tags:%s'%tag.strip() for tag in tags)
    return chef_query(query, **kwargs)
fabfile.py 文件源码 项目:Simplechaindb 作者: BUAANLSDE 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def set_host(host_index):
    """A helper task to change env.hosts from the
    command line. It will only "stick" for the duration
    of the fab command that called it.

    Args:
        host_index (int): 0, 1, 2, 3, etc.
    Example:
        fab set_host:4 fab_task_A fab_task_B
        will set env.hosts = [public_dns_names[4]]
        but only for doing fab_task_A and fab_task_B
    """
    env.hosts = [public_hosts[int(host_index)]]
    env.password = [public_pwds[int(host_index)]]


# Install base software
decorators.py 文件源码 项目:builder 作者: elifesciences 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def echo_output(func):
    """if the wrapped function is the sole task being run, then it's output is
    printed to stdout. this wrapper first attempts to pass the keyword
    'verbose' to the function and, if it fails, calls it without.
    """
    @wraps(func)
    def _wrapper(*args, **kwargs):
        if _sole_task(func.__name__):
            res = func(*args, **kwargs)
            errcho('output:') # printing to stderr avoids corrupting structured data
            if isinstance(res, str) or isinstance(res, unicode):
                print res
            else:
                print pformat(remove_ordereddict(res))
            return res
        return func(*args, **kwargs)
    return _wrapper


# avoid circular dependencies.
# TODO: this is a design smell. refactor.
init.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def define_preset_tasks(module, config):
    ''' Define tasks for the configured deployment preset. '''
    deployment = deployer.import_preset(config)
    # Now that we have deployment preset set, import all the tasks.
    for (task_name, func) in deployment.__dict__.iteritems():
        if not _is_task(func):
            continue

        # Set a new task named as the stage name in the main fabfile module.
        setattr(module, task_name, func)
init.py 文件源码 项目:boss 作者: kabirbaidhya 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def define_stage_tasks(module, config):
    ''' Define tasks for the stages dynamically. '''
    for (stage_name, _) in config['stages'].iteritems():
        task_func = task(name=stage_name)(configure_env)
        task_func.__doc__ = 'Configures the {} server environment.'.format(
            stage_name)

        # Set a new task named as the stage name in the main fabfile module.
        setattr(module, stage_name, task_func)
tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _uncrawl(task, _cache={}):
    if not _cache:
        def _fill_cache(mapping=state.commands, keys=[]):
            for key, value in mapping.items():
                _keys = keys + [key]
                if isinstance(value, dict):
                    _fill_cache(value, _keys)
                else:
                    _cache[value] = '.'.join(_keys)
        _fill_cache()
    return _cache.get(task)
tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute(*args, **kwargs):
    try:
        task, args = args[0], args[1:]
    except IndexError:
        raise TypeError('must provide task to execute')
    default_name = '{command}.{task_name}({id})'.format(
        command=fab.env.command,
        task_name=getattr(task, 'name', task.__name__),
        id=id(task),
    )
    with utils.patch(task, 'name', _uncrawl(task) or default_name):
        return fab.execute(task, *args, **kwargs)
tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def skip_unknown_host(func):
    @functools.wraps(func)
    def _task(*args, **kwargs):
        if fab.env.get('host_string', False):
            return func(*args, **kwargs)
        fabricio.log(
            "'{func}' execution was skipped due to no host provided "
            "(command: {command})".format(
                func=func.__name__,
                command=fab.env.command,
            )
        )
    _task.wrapped = func  # compatibility with 'fab --display <task>' option
    return _task
tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __new__(cls, *args, **kwargs):
        self = super(Tasks, cls).__new__(cls)
        for attr in dir(cls):
            attr_value = getattr(cls, attr)
            if is_task_object(attr_value):
                task_decorator = fab.task(
                    default=attr_value.is_default,
                    name=attr_value.name,
                    aliases=attr_value.aliases,
                    task_class=attr_value.__class__,
                )
                bounded_task = functools.partial(attr_value.wrapped, self)
                task = task_decorator(functools.wraps(attr_value)(bounded_task))
                for wrapped_attr in [
                    'parallel',
                    'serial',
                    'pool_size',
                    'hosts',
                    'roles',
                ]:
                    if hasattr(attr_value.wrapped, wrapped_attr):
                        setattr(
                            task.wrapped,
                            wrapped_attr,
                            getattr(attr_value.wrapped, wrapped_attr),
                        )
                setattr(self, attr, task)
        return self
tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, roles=(), hosts=(), create_default_roles=True):
        if create_default_roles:
            for role in roles:
                fab.env.roledefs.setdefault(role, [])
        for task in self:
            if not hasattr(task, 'roles'):
                task.roles = roles
            if not hasattr(task, 'hosts'):
                task.hosts = hosts
tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def default(self, *args, **kwargs):
        """
        select {name} infrastructure to run task(s) on
        """
        if not console.confirm(
            'Are you sure you want to select {name} '
            'infrastructure to run task(s) on?'.format(
                name=self.color(self.name),
            ),
            default=False,
        ):
            fab.abort('Aborted')
        self.confirm(*args, **kwargs)
test_tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_skip_unknown_host(self):

        mocked_task = mock.Mock()

        @fabricio.tasks.skip_unknown_host
        def task():
            mocked_task()

        with fab.settings(fab.hide('everything')):
            fab.execute(task)
            mocked_task.assert_not_called()

            fab.execute(task, host='host')
            mocked_task.assert_called_once()
test_tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_infrastructure(self):
        class AbortException(Exception):
            pass

        def task():
            pass

        cases = dict(
            default=dict(
                decorator=tasks.infrastructure,
                expected_infrastructure='task',
            ),
            invoked=dict(
                decorator=tasks.infrastructure(),
                expected_infrastructure='task',
            ),
            with_custom_name=dict(
                decorator=tasks.infrastructure(name='infrastructure'),
                expected_infrastructure='infrastructure',
            ),
        )

        with fab.settings(abort_on_prompts=True, abort_exception=AbortException):
            with mock.patch.object(fab, 'abort', side_effect=AbortException):
                for case, data in cases.items():
                    with self.subTest(case=case):
                        decorator = data['decorator']
                        infrastructure = decorator(task)

                        self.assertTrue(is_task_object(infrastructure.confirm))
                        self.assertTrue(is_task_object(infrastructure.default))

                        fab.execute(infrastructure.confirm)
                        self.assertEqual(data['expected_infrastructure'], fab.env.infrastructure)

                        fab.env.infrastructure = None
                        with mock.patch.object(console, 'confirm', side_effect=[True, False]):
                            fab.execute(infrastructure.default)
                            self.assertEqual(data['expected_infrastructure'], fab.env.infrastructure)
                            with self.assertRaises(AbortException):
                                fab.execute(infrastructure.default)
fabric.py 文件源码 项目:lambda-chef-node-cleanup 作者: awslabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def chef_environment(name, api=None):
    """A Fabric task to set the current Chef environment context.

    This task works alongside :func:`~chef.fabric.chef_roledefs` to set the
    Chef environment to be used in future role queries.

    Example::

        from chef.fabric import chef_environment, chef_roledefs
        env.roledefs = chef_roledefs()

    .. code-block:: bash

        $ fab env:production deploy

    The task can be configured slightly via Fabric ``env`` values.

    ``env.chef_environment_task_alias`` sets the task alias, defaulting to "env".
    This value must be set **before** :mod:`chef.fabric` is imported.

    ``env.chef_environment_validate`` sets if :class:`~chef.Environment` names
    should be validated before use. Defaults to True.

    .. versionadded:: 0.2
    """
    if env.get('chef_environment_validate', True):
        api = _api(api)
        chef_env = Environment(name, api=api)
        if not chef_env.exists:
            raise ChefError('Unknown Chef environment: %s' % name)
    env['chef_environment'] = name
__init__.py 文件源码 项目:elasticsearch-fabric 作者: KunihikoKido 项目源码 文件源码 阅读 74 收藏 0 点赞 0 评论 0
def reindex_rethrottle(task_id=None, **kwargs):
    """
    Change the value of requests_per_second of a running reindex task.
    https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
    """
    res = request("reindex_rethrottle", None, task_id=task_id, **kwargs)
    jsonprint(res)
    return res
utils.py 文件源码 项目:geonode-devops 作者: pjdufour 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _run_task(task, args=None, kwargs=None):
    from fabfile import targets
    if targets:
        for target in targets:
            env = _build_env(target)
            if env:
                with fab_settings(** env):
                    _run_task_core(task, args, kwargs)
    else:
        _run_task_core(task, args, kwargs)
utils.py 文件源码 项目:geonode-devops 作者: pjdufour 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _run_task_core(task, args, kwargs):
    if task:
        if args:
            task(* args)
        elif kwargs:
            task(** kwargs)
        else:
            task()
fabfile.py 文件源码 项目:Simplechaindb 作者: BUAANLSDE 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send_client_confile(confile):
    put(confile, 'tempfile')
    run('mv tempfile ~/.bigchaindb')
    print('For this node, bigchaindb show-config says:')
    run('simplechaindb show-config')


# Initialize BigchainDB
# i.e. create the database, the tables,
# the indexes, and the genesis block.
# (The @hosts decorator is used to make this
# task run on only one node. See http://tinyurl.com/h9qqf3t )
__init__.py 文件源码 项目:fabsetup 作者: theno 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def INIT():
        '''Init repo `fabsetup_custom` with custom tasks and config.'''
        # decorator @needs_repo_fabsetup_custom makes the job
        print(green('Initialization finished\n'))
        print('List available tasks: ' + blue('fab -l'))
        print('Show details of a task: `fab -d <task>`, eg.: ' +
              blue('fab -d setup_webserver'))
__init__.py 文件源码 项目:fabsetup 作者: theno 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setup_desktop():
        '''Run setup tasks to set up a nicely configured desktop pc.'''
        # decorator @needs_repo_fabsetup_custom makes the job
        print('Init completed. Now run this task again.')
        # Next time, the task setup_desktop from
        # fabsetup_custom/fabfile_/__init__.py will be executed
        # instead
__init__.py 文件源码 项目:fabsetup 作者: theno 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setup_webserver():
        '''Run setup tasks to set up a nicely configured webserver.'''
        # decorator @needs_repo_fabsetup_custom makes the job
        print('Init completed. Now run this task again.')
        # Next time, the task setup_webserver from
        # fabsetup_custom/fabfile_/__init__.py will be executed
        # instead
fabfile.py 文件源码 项目:ctutlz 作者: theno 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _local_needs_pythons(*args, **kwargs):
    with warn_only():
        res = local(*args, **kwargs)
        print(res)
        if res.return_code == 127:
            print(cyan('missing python version(s), '
                       'run fabric task `pythons`:\n\n    '
                       'fab pythons\n'))
            sys.exit(1)
fabfile.py 文件源码 项目:ctutlz 作者: theno 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test(args='', py=None):
    '''Run unit tests.

    Keyword-Args:
        args: Optional arguments passed to pytest
        py: python version to run the tests against

    Example:

        fab test:args=-s,py=py27
    '''
    basedir = dirname(__file__)

    if py is None:
        # e.g. envlist: 'envlist = py26,py27,py33,py34,py35,py36'
        envlist = local(flo('cd {basedir}  &&  grep envlist tox.ini'),
                        capture=True)
        _, py = envlist.rsplit(',', 1)

    with warn_only():
        res = local(flo('cd {basedir}  &&  '
                        "PYTHONPATH='.' .tox/{py}/bin/python -m pytest {args}"))
        print(res)
        if res.return_code == 127:
            print(cyan('missing tox virtualenv, '
                       'run fabric task `tox`:\n\n    '
                       'fab tox\n'))
            sys.exit(1)
packer.py 文件源码 项目:builder 作者: elifesciences 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download_box(pname):
    "just download the vagrant .box for given project"
    boxurl = box_url(pname)
    if not boxurl.startswith('s3://'):
        print 'this task only downloads from s3. unhandled url', boxurl
        exit(1)
    dest = join('/tmp', os.path.basename(boxurl))
    if os.path.exists(dest):
        print 'file %s already exists, skipping download. move or rename the file to re-download' % dest
        return dest
    cmd = "aws s3 cp %s %s" % (boxurl, dest)
    assert local(cmd).return_code == 0, "failed to successfully download %s" % boxurl
    return dest
packer.py 文件源码 项目:builder 作者: elifesciences 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def install_box(pname):
    """alternative to 'vagrant add boxname'.
    downloads and installs a vagrant box.
    macs appear to have a problem maintaining a connection to S3,
    so this task downloads it for Vagrant and then adds it from the filesystem"""
    if box_installed(pname):
        print 'the .box file for %r has already been installed (%s)' % (pname, prj(pname, 'vagrant.box'))
        return
    dest = download_box(pname)
    with settings(warn_only=True):
        cmd = "vagrant box add %s %s" % (prj(pname, 'vagrant.box'), dest)
        retval = local(cmd).return_code
        if retval == 0 and os.path.exists(dest):
            print 'removing downloaded file ...'
            local('rm -i %s' % dest)
decorators.py 文件源码 项目:builder 作者: elifesciences 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def rtask(*roles):
    "role task. this task is only available for certain roles specified in the BLDR_ROLE env var"
    def wrapper(func):
        role_env = os.getenv('BLDR_ROLE')
        if role_env in roles:
            # a role has been set
            return task(func)
        return func
    return wrapper


# pylint: disable=invalid-name
lifecycle.py 文件源码 项目:builder 作者: elifesciences 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def stop_if_running_for(stackname, minimum_minutes='30'):
    """If a EC2 node has been running for a time greater than minimum_minutes, stop it.

    The assumption is that stacks where this command is used are not needed for long parts of the day/week, and that who needs them will call the start task first."""
    return lifecycle.stop_if_running_for(stackname, int(minimum_minutes))
pyFabricAndCapistrano.py 文件源码 项目:LinuxBashShellScriptForOps 作者: DingGuodong 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def with_defaults(func):
    """A decorator that sets all defaults for a task."""

    @functools.wraps(func)
    def decorated(*args, **kwargs):
        env.setdefault('use_sudo', True)
        env.setdefault('git_branch', 'master')
        env.setdefault('python_bin', 'python')
        env.setdefault('remote_owner', 'www-data')
        env.setdefault('remote_group', 'www-data')
        env.setdefault('pip_install_command', 'pip install -r requirements.txt')

        env.setdefault('domain_path', "%(base_dir)s/%(app_name)s" %
                       {'base_dir': env.base_dir,
                        'app_name': env.app_name})
        env.setdefault('current_path', "%(domain_path)s/current" %
                       {'domain_path': env.domain_path})
        env.setdefault('releases_path', "%(domain_path)s/releases" %
                       {'domain_path': env.domain_path})
        env.setdefault('shared_path', "%(domain_path)s/shared" %
                       {'domain_path': env.domain_path})
        if not 'releases' not in env:
            if dir_exists(env.releases_path):
                env.releases = sorted(run('ls -x %(releases_path)s' % {'releases_path': env.releases_path}).split())

                if len(env.releases) >= 1:
                    env.current_revision = env.releases[-1]
                    env.current_release = "%(releases_path)s/%(current_revision)s" % \
                                          {'releases_path': env.releases_path,
                                           'current_revision': env.current_revision}
                if len(env.releases) > 1:
                    env.previous_revision = env.releases[-2]
                    env.previous_release = "%(releases_path)s/%(previous_revision)s" % \
                                           {'releases_path': env.releases_path,
                                            'previous_revision': env.previous_revision}
        return func(*args, **kwargs)

    return decorated
test_tasks.py 文件源码 项目:fabricio 作者: renskiy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_tasks(self):
        class TestTasks(tasks.Tasks):

            @fab.task(default=True, aliases=['foo', 'bar'])
            def default(self):
                pass

            @fab.task(name='name', alias='alias')
            def task(self):
                pass

            @fab.task
            @fab.serial
            def serial(self):
                pass

            @fab.task
            @fab.parallel
            def parallel(self):
                pass

        roles = ['role_1', 'role_2']
        hosts = ['host_1', 'host_2']
        tasks_list = TestTasks(roles=roles, hosts=hosts)
        self.assertTrue(is_task_module(tasks_list))
        self.assertTrue(tasks_list.default.is_default)
        self.assertListEqual(['foo', 'bar'], tasks_list.default.aliases)
        self.assertEqual('name', tasks_list.task.name)
        self.assertListEqual(['alias'], tasks_list.task.aliases)
        for task in tasks_list:
            self.assertListEqual(roles, task.roles)
            self.assertListEqual(hosts, task.hosts)
        docstring, new_style, classic, default = load_tasks_from_module(tasks_list)
        self.assertIsNone(docstring)
        self.assertIn('default', new_style)
        self.assertIn('alias', new_style)
        self.assertIn('foo', new_style)
        self.assertIn('bar', new_style)
        self.assertIn('name', new_style)
        self.assertDictEqual({}, classic)
        self.assertIs(tasks_list.default, default)

        self.assertIn('serial', tasks_list.serial.wrapped.__dict__)
        self.assertTrue(tasks_list.serial.wrapped.serial)

        self.assertIn('parallel', tasks_list.parallel.wrapped.__dict__)
        self.assertTrue(tasks_list.parallel.wrapped.parallel)


问题


面经


文章

微信
公众号

扫码关注公众号