def xloader_data_into_datastore(input):
'''This is the func that is queued. It is a wrapper for
xloader_data_into_datastore, and makes sure it finishes by calling
xloader_hook to update the task_status with the result.
Errors are stored in task_status and job log and this method returns
'error' to let RQ know too. Should task_status fails, then we also return
'error'.
'''
# First flag that this task is running, to indicate the job is not
# stillborn, for when xloader_submit is deciding whether another job would
# be a duplicate or not
job_dict = dict(metadata=input['metadata'],
status='running')
callback_xloader_hook(result_url=input['result_url'],
api_key=input['api_key'],
job_dict=job_dict)
job_id = get_current_job().id
errored = False
try:
xloader_data_into_datastore_(input, job_dict)
job_dict['status'] = 'complete'
db.mark_job_as_completed(job_id, job_dict)
except JobError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {}'.format(e))
errored = True
except Exception as e:
db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_traceback)[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {}'.format(e))
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
is_saved_ok = callback_xloader_hook(result_url=input['result_url'],
api_key=input['api_key'],
job_dict=job_dict)
errored = errored or not is_saved_ok
return 'error' if errored else None
评论列表
文章目录