python类signals()的实例源码

app.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):

        # Pass girder related job information through to
        # the signals by adding this information to options['headers']
        # This sets defaults for reserved_options based on the class defaults,
        # or values defined by the girder_job() dectorator
        headers = {
            'girder_job_title': self._girder_job_title,
            'girder_job_type': self._girder_job_type,
            'girder_job_public': self._girder_job_public,
            'girder_job_handler': self._girder_job_handler,
            'girder_job_other_fields': self._girder_job_other_fields,
        }

        # Certain keys may show up in either kwargs (e.g. via
        # .delay(girder_token='foo') or in options (e.g.
        # .apply_async(args=(), kwargs={}, girder_token='foo') For
        # those special headers, pop them out of kwargs or options and
        # put them in headers so they can be picked up by the
        # before_task_publish signal.
        for key in self.reserved_headers + self.reserved_options:
            if kwargs is not None and key in kwargs:
                headers[key] = kwargs.pop(key)
            if key in options:
                headers[key] = options.pop(key)

        if 'headers' in options:
            options['headers'].update(headers)
        else:
            options['headers'] = headers

        return super(Task, self).apply_async(
            args=args, kwargs=kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, shadow=shadow, **options)
celery.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def enable_signals():
    """Best effort enabling of metrics, logging, sentry signals for celery."""
    try:
        from celery import signals
        from raven.contrib.celery import CeleryFilter
    except ImportError:  # pragma: no cover
        return

    signals.setup_logging.connect(celery_logging_handler)
    signals.before_task_publish.connect(before_task_publish)
    signals.after_task_publish.connect(after_task_publish)
    signals.task_prerun.connect(task_prerun)
    signals.task_postrun.connect(task_postrun)
    signals.task_retry.connect(task_retry)
    signals.task_success.connect(task_success)
    signals.task_failure.connect(task_failure)
    signals.task_revoked.connect(task_revoked)

    # install celery error handler
    get_sentry_handler().install()
    talisker.sentry.register_client_update(sentry_handler_update)
    # de-dup celery errors
    log_handler = talisker.sentry.get_log_handler()
    for filter in log_handler.filters:
        if isinstance(filter, CeleryFilter):
            break
    else:
        log_handler.addFilter(CeleryFilter())

    logging.getLogger(__name__).info('enabled talisker celery signals')
celery.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def disable_signals():
    from celery import signals
    get_sentry_handler().uninstall()
    signals.setup_logging.disconnect(celery_logging_handler)
    signals.before_task_publish.disconnect(before_task_publish)
    signals.after_task_publish.disconnect(after_task_publish)
    signals.task_prerun.disconnect(task_prerun)
    signals.task_postrun.disconnect(task_postrun)
    signals.task_retry.disconnect(task_retry)
    signals.task_success.disconnect(task_success)
    signals.task_failure.disconnect(task_failure)
    signals.task_revoked.disconnect(task_revoked)


问题


面经


文章

微信
公众号

扫码关注公众号