def handle(self, *args, **options):
if not options['url']:
raise CommandError('Worker endpoint url parameter (--url) not found')
if not options['queue_name']:
raise CommandError('Queue name (--queue) not specified')
url = options['url']
queue_name = options['queue_name']
retry_limit = max(int(options['retry_limit']), 1)
try:
self.stdout.write('Connect to SQS')
sqs = boto3.resource(
'sqs',
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)
queue = sqs.get_queue_by_name(QueueName=queue_name)
self.stdout.write('> Connected')
while True:
messages = queue.receive_messages(
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
if len(messages) == 0:
break
for msg in messages:
self.stdout.write('Deliver message {}'.format(msg.message_id))
if self._process_message_with_retry(url, retry_limit, msg):
self.stdout.write('> Delivered')
else:
self.stdout.write('> Delivery failed (retry-limit reached)')
msg.delete()
self.stdout.write('Message processing finished')
except ConnectionError:
self.stdout.write('Connection to {} failed. Message processing failed'.format(url))
评论列表
文章目录