def _perform_action_on_nodes(
self, system_ids, action_class, concurrency=2):
"""Perform a node action on the identified nodes.
This is *asynchronous*.
:param system_ids: An iterable of `Node.system_id` values.
:param action_class: A value from `ACTIONS_DICT`.
:param concurrency: The number of actions to run concurrently.
:return: A `dict` mapping `system_id` to results, where the result can
be a string (see `_perform_action_on_node`), or a `Failure` if
something went wrong.
"""
# We're going to be making the same call for every specified node, so
# bundle up the common bits here to keep the noise down later on.
perform = partial(
deferToDatabase, self._perform_action_on_node,
action_class=action_class)
# The results will be a `system_id` -> `result` mapping, where
# `result` can be a string like "done" or "not_actionable", or a
# Failure instance.
results = {}
# Convenient callback.
def record(result, system_id):
results[system_id] = result
# A *lazy* list of tasks to be run. It's very important that each task
# is only created at the moment it's needed. Each task records its
# outcome via `record`, be that success or failure.
tasks = (
perform(system_id).addBoth(record, system_id)
for system_id in system_ids
)
# Create `concurrency` co-iterators. Each draws work from `tasks`.
deferreds = (coiterate(tasks) for _ in range(concurrency))
# Capture the moment when all the co-iterators have finished.
done = DeferredList(deferreds, consumeErrors=True)
# Return only the `results` mapping; ignore the result from `done`.
return done.addCallback(lambda _: results)