python类shared_task()的实例源码

celery.py 文件源码 项目:django-tmpl 作者: jarrekk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def shared_task_email(func):
    """
    Replacement for @shared_task decorator that emails admins if an exception is raised.
    """
    @wraps(func)
    def new_func(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except:
            subject = "Celery task failure"
            message = traceback.format_exc()
            mail_admins(subject, message)
            raise
    return shared_task(new_func)
tasks.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def shared_task(f):
        f.delay = f
        return f
tasks.py 文件源码 项目:django-remote-submission 作者: ornl-ndav 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def shared_task(func):
        """Naive wrapper in case Celery does not exist."""
        def delay(*args, **kwargs):
            return func(*args, **kwargs)

        func.delay = delay
        return func
tasks.py 文件源码 项目:api-django 作者: lafranceinsoumise 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_geocoder(model):
    def geocode_model(pk):
        try:
            item = model.objects.get(pk=pk)
        except model.DoesNotExist:
            return

        if geocode_element(item):
            item.save()

    geocode_model.__name__ = "geocode_{}".format(model.__name__.lower())

    return shared_task(geocode_model)


问题


面经


文章

微信
公众号

扫码关注公众号