def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
# Pass girder related job information through to
# the signals by adding this information to options['headers']
# This sets defaults for reserved_options based on the class defaults,
# or values defined by the girder_job() dectorator
headers = {
'girder_job_title': self._girder_job_title,
'girder_job_type': self._girder_job_type,
'girder_job_public': self._girder_job_public,
'girder_job_handler': self._girder_job_handler,
'girder_job_other_fields': self._girder_job_other_fields,
}
# Certain keys may show up in either kwargs (e.g. via
# .delay(girder_token='foo') or in options (e.g.
# .apply_async(args=(), kwargs={}, girder_token='foo') For
# those special headers, pop them out of kwargs or options and
# put them in headers so they can be picked up by the
# before_task_publish signal.
for key in self.reserved_headers + self.reserved_options:
if kwargs is not None and key in kwargs:
headers[key] = kwargs.pop(key)
if key in options:
headers[key] = options.pop(key)
if 'headers' in options:
options['headers'].update(headers)
else:
options['headers'] = headers
return super(Task, self).apply_async(
args=args, kwargs=kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, shadow=shadow, **options)
python类signals()的实例源码
def enable_signals():
"""Best effort enabling of metrics, logging, sentry signals for celery."""
try:
from celery import signals
from raven.contrib.celery import CeleryFilter
except ImportError: # pragma: no cover
return
signals.setup_logging.connect(celery_logging_handler)
signals.before_task_publish.connect(before_task_publish)
signals.after_task_publish.connect(after_task_publish)
signals.task_prerun.connect(task_prerun)
signals.task_postrun.connect(task_postrun)
signals.task_retry.connect(task_retry)
signals.task_success.connect(task_success)
signals.task_failure.connect(task_failure)
signals.task_revoked.connect(task_revoked)
# install celery error handler
get_sentry_handler().install()
talisker.sentry.register_client_update(sentry_handler_update)
# de-dup celery errors
log_handler = talisker.sentry.get_log_handler()
for filter in log_handler.filters:
if isinstance(filter, CeleryFilter):
break
else:
log_handler.addFilter(CeleryFilter())
logging.getLogger(__name__).info('enabled talisker celery signals')
def disable_signals():
from celery import signals
get_sentry_handler().uninstall()
signals.setup_logging.disconnect(celery_logging_handler)
signals.before_task_publish.disconnect(before_task_publish)
signals.after_task_publish.disconnect(after_task_publish)
signals.task_prerun.disconnect(task_prerun)
signals.task_postrun.disconnect(task_postrun)
signals.task_retry.disconnect(task_retry)
signals.task_success.disconnect(task_success)
signals.task_failure.disconnect(task_failure)
signals.task_revoked.disconnect(task_revoked)