def shared_task_email(func):
"""
Replacement for @shared_task decorator that emails admins if an exception is raised.
"""
@wraps(func)
def new_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
subject = "Celery task failure"
message = traceback.format_exc()
mail_admins(subject, message)
raise
return shared_task(new_func)
python类shared_task()的实例源码
def shared_task(f):
f.delay = f
return f
def shared_task(func):
"""Naive wrapper in case Celery does not exist."""
def delay(*args, **kwargs):
return func(*args, **kwargs)
func.delay = delay
return func
def create_geocoder(model):
def geocode_model(pk):
try:
item = model.objects.get(pk=pk)
except model.DoesNotExist:
return
if geocode_element(item):
item.save()
geocode_model.__name__ = "geocode_{}".format(model.__name__.lower())
return shared_task(geocode_model)