def apply_async(self, *args, **kwargs):
"""
Override the default Celery Task apply_async to allow for the passing of
tags to tasks.
:param args: Positional arguments.
:param kwargs: Keyword arguments.
:return: The results of calling super.apply_async.
"""
tags = kwargs.get("tags", [])
headers = kwargs.get("headers", {})
retries = kwargs.get("retries", 0)
task_args, task_kwargs = args
tags.extend(self.__get_tags_from_task_kwargs(task_kwargs))
try:
del kwargs["chord"]["options"]["producer"]
except (TypeError, KeyError):
pass
if tags is not None and not isinstance(tags, list):
raise ValueError(
"Got an unexpected value for the tags keyword argument to apply_async: %s."
% (tags,)
)
if len(tags) > 0:
if retries > 0:
logger.debug(
"Not incrementing tags %s as apply_async resulted from retry."
% (tags,)
)
else:
self.__increment_tags(tags)
headers["tags"] = tags
kwargs["headers"] = headers
return super(WebSightBaseTask, self).apply_async(*args, **kwargs)
评论列表
文章目录