def __handle_processed_messages(self):
"""
Perform any final housekeeping once all of the messages in the queue have been processed.
:return: None
"""
from tasknode.tasks import handle_scanning_order_from_pubsub
logger.debug(
"Now handling all processed messages (%s keys in targets)."
% (len(self._targets),)
)
for k, v in self._targets.iteritems():
if len(v) > 0:
targets = list(set(v))
task_sig = handle_scanning_order_from_pubsub.si(
org_uuid=k,
targets=targets,
)
task_sig.options["queue"] = config.celery_priority_queue_name
self._tasks.append(task_sig)
self._messages.append(
"Total of %s targets defined for organization %s."
% (len(targets), k)
)
logger.debug(
"Total number of tasks to kick off is %s."
% (len(self._tasks),)
)
if len(self._tasks) > 0:
canvas_sig = group(self._tasks)
canvas_sig.apply_async()
logger.debug("Tasks kicked off.")
评论列表
文章目录