def _consume_record(self, record):
"""De-serialize the message and execute the incoming job.
:param record: Record fetched from the Kafka topic.
:type record: kafka.consumer.fetcher.ConsumerRecord
"""
rec = rec_repr(record)
self._logger.info('Processing {} ...'.format(rec))
# noinspection PyBroadException
try:
job = dill.loads(record.value)
except Exception:
self._logger.warning('{} unloadable. Skipping ...'.format(rec))
else:
# Simple check for job validity
if not (isinstance(job, Job)
and isinstance(job.args, collections.Iterable)
and isinstance(job.kwargs, collections.Mapping)
and callable(job.func)):
self._logger.warning('{} malformed. Skipping ...'.format(rec))
return
func, args, kwargs = job.func, job.args, job.kwargs
self._logger.info('Running Job {}: {} ...'.format(
job.id, func_repr(func, args, kwargs)
))
try:
timeout = self._timeout or job.timeout
if timeout is None:
res = func(*args, **kwargs)
else:
run = self._pool.apply_async(func, args, kwargs)
res = run.get(timeout)
except mp.TimeoutError:
self._logger.error('Job {} timed out after {} seconds.'
.format(job.id, job.timeout))
self._exec_callback('timeout', job, None, None, None)
except Exception as e:
self._logger.exception('Job {} failed: {}'.format(job.id, e))
self._exec_callback('failure', job, None, e, tb.format_exc())
else:
self._logger.info('Job {} returned: {}'.format(job.id, res))
self._exec_callback('success', job, res, None, None)
评论列表
文章目录