def execute_cleanup_tasks(ctx, cleanup_tasks, dry_run=False):
"""Execute several cleanup tasks as part of the cleanup.
REQUIRES: ``clean(ctx, dry_run=False)`` signature in cleanup tasks.
:param ctx: Context object for the tasks.
:param cleanup_tasks: Collection of cleanup tasks (as Collection).
:param dry_run: Indicates dry-run mode (bool)
"""
executor = Executor(cleanup_tasks, ctx.config)
for cleanup_task in cleanup_tasks.tasks:
print("CLEANUP TASK: %s" % cleanup_task)
executor.execute((cleanup_task, dict(dry_run=dry_run)))
python类Collection()的实例源码
def execute_cleanup_tasks(ctx, cleanup_tasks, dry_run=False):
"""Execute several cleanup tasks as part of the cleanup.
REQUIRES: ``clean(ctx, dry_run=False)`` signature in cleanup tasks.
:param ctx: Context object for the tasks.
:param cleanup_tasks: Collection of cleanup tasks (as Collection).
:param dry_run: Indicates dry-run mode (bool)
"""
executor = Executor(cleanup_tasks, ctx.config)
for cleanup_task in cleanup_tasks.tasks:
print("CLEANUP TASK: %s" % cleanup_task)
executor.execute((cleanup_task, dict(dry_run=dry_run)))
def tasks(self):
"""Function that turns a collection of tasks
suitable for pyinvoke_
Example::
from app.web import ac
ns = Collection()
ns.add_collection(ac.tasks())
.. _pyinvoke: http://www.pyinvoke.org/
"""
from invoke import task, Collection
@task
def list(ctx):
"""Show all clients in the database"""
from json import dumps
with (self.app or current_app).app_context():
print(dumps([
dict(c) for c in self.client_class.all()
]))
@task
def show(ctx, clientKey):
"""Lookup one client from the database"""
from json import dumps
with (self.app or current_app).app_context():
print(dumps(dict(self.client_class.load(clientKey))))
@task
def install(ctx, data):
"""Add a given client from the database"""
from json import loads
with (self.app or current_app).app_context():
client = loads(data)
self.client_class.save(client)
print("Added")
@task()
def uninstall(ctx, clientKey):
"""Remove a given client from the database"""
with (self.app or current_app).app_context():
self.client_class.delete(clientKey)
print("Deleted")
ns = Collection('clients')
ns.add_task(list)
ns.add_task(show)
ns.add_task(install)
ns.add_task(uninstall)
return ns