def _send_queue_message(self, msg, msg_type):
if self.enable_sqs:
from_start = time.time() - self.start_time
payload = {msg_type: msg}
payload['TS'] = from_start
payload['PREFIX'] = self.prefix
# update the last seen timestamp for
# the message type
self.last_seen_ts[msg_type] = time.time()
if msg_type in ['OK', 'FAILURE']:
# report the delta between the OK/FAILURE and
# last TASK
if 'TASK' in self.last_seen_ts:
from_task = \
self.last_seen_ts[msg_type] - self.last_seen_ts['TASK']
payload['delta'] = from_task
for output in ['stderr', 'stdout']:
if output in payload[msg_type]:
# only keep the last 1000 characters
# of stderr and stdout
# Some modules set the value of stdout or stderr to booleans in
# which case the len will fail. Check to see if there is content
# before trying to clip it.
if payload[msg_type][output] and len(payload[msg_type][output]) > 1000:
payload[msg_type][output] = "(clipping) ... " \
+ payload[msg_type][output][-1000:]
if 'stdout_lines' in payload[msg_type]:
# only keep the last 20 or so lines to avoid payload size errors
if len(payload[msg_type]['stdout_lines']) > 20:
payload[msg_type]['stdout_lines'] = ['(clipping) ... '] + payload[msg_type]['stdout_lines'][-20:]
while True:
try:
self.sqs.send_message(self.queue, json.dumps(payload))
break
except socket.gaierror as e:
print 'socket.gaierror will retry: ' + e
time.sleep(1)
except Exception as e:
raise e
评论列表
文章目录