def handle_next_request(self):
# Get the next JobRequest
try:
request_id, meta, request_message = self.transport.receive_request_message()
except MessageReceiveTimeout:
# no new message, nothing to do
return
if meta.setdefault('__request_serialized__', True) is False:
# The caller is a new client that did not double-serialize, so do not double-deserialize
job_request = request_message
else:
# The caller is an old client that double-serialized, so be sure to double-deserialize
# TODO: Remove this and the serializer in version >= 0.25.0
job_request = self.serializer.blob_to_dict(request_message)
self.job_logger.info('Job request: %s', job_request)
# Process and run the Job
job_response = self.process_job(job_request)
# Send the JobResponse
response_dict = {}
try:
response_dict = attr.asdict(job_response, dict_factory=UnicodeKeysDict)
if meta['__request_serialized__'] is False:
# Match the response serialization behavior to the request serialization behavior
response_message = response_dict
else:
# TODO: Remove this and the serializer in version >= 0.25.0
response_message = self.serializer.dict_to_blob(response_dict)
except Exception as e:
self.metrics.counter('server.error.serialization_failure').increment()
job_response = self.handle_error(e, variables={'job_response': response_dict})
response_dict = attr.asdict(job_response, dict_factory=UnicodeKeysDict)
if meta['__request_serialized__'] is False:
# Match the response serialization behavior to the request serialization behavior
response_message = response_dict
else:
# TODO: Remove this and the serializer in version >= 0.25.0
response_message = self.serializer.dict_to_blob(response_dict)
self.transport.send_response_message(request_id, meta, response_message)
self.job_logger.info('Job response: %s', response_dict)
评论列表
文章目录