def delete_sqs_queue(connection, module):
queue_name = module.params.get('name')
result = dict(
region=module.params.get('region'),
name=queue_name,
)
try:
queue = connection.get_queue(queue_name)
if queue:
if not module.check_mode:
connection.delete_queue(queue)
result['changed'] = True
else:
result['changed'] = False
except BotoServerError:
result['msg'] = 'Failed to delete sqs queue due to error: ' + traceback.format_exc()
module.fail_json(**result)
else:
module.exit_json(**result)
python类sqs()的实例源码
def create_or_update_sqs_queue(connection, module):
queue_name = module.params.get('name')
queue_attributes = dict(
default_visibility_timeout=module.params.get('default_visibility_timeout'),
message_retention_period=module.params.get('message_retention_period'),
maximum_message_size=module.params.get('maximum_message_size'),
delivery_delay=module.params.get('delivery_delay'),
receive_message_wait_time=module.params.get('receive_message_wait_time'),
policy=module.params.get('policy'),
redrive_policy=module.params.get('redrive_policy')
)
result = dict(
region=module.params.get('region'),
name=queue_name,
)
result.update(queue_attributes)
try:
queue = connection.get_queue(queue_name)
if queue:
# Update existing
result['changed'] = update_sqs_queue(queue, check_mode=module.check_mode, **queue_attributes)
else:
# Create new
if not module.check_mode:
queue = connection.create_queue(queue_name)
update_sqs_queue(queue, **queue_attributes)
result['changed'] = True
except BotoServerError:
result['msg'] = 'Failed to create/update sqs queue due to error: ' + traceback.format_exc()
module.fail_json(**result)
else:
module.exit_json(**result)
def main():
argument_spec = ec2_argument_spec()
argument_spec.update(dict(
state=dict(default='present', choices=['present', 'absent']),
name=dict(required=True, type='str'),
default_visibility_timeout=dict(type='int'),
message_retention_period=dict(type='int'),
maximum_message_size=dict(type='int'),
delivery_delay=dict(type='int'),
receive_message_wait_time=dict(type='int'),
policy=dict(type='dict', required=False),
redrive_policy=dict(type='dict', required=False),
))
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True)
if not HAS_BOTO:
module.fail_json(msg='boto required for this module')
region, ec2_url, aws_connect_params = get_aws_connection_info(module)
if not region:
module.fail_json(msg='region must be specified')
try:
connection = connect_to_aws(boto.sqs, region, **aws_connect_params)
except (NoAuthHandlerFound, AnsibleAWSError) as e:
module.fail_json(msg=str(e))
state = module.params.get('state')
if state == 'present':
create_or_update_sqs_queue(connection, module)
elif state == 'absent':
delete_sqs_queue(connection, module)
def get_sqs_conn(conf):
region = conf.get('SQS_REGION')
if region:
conn = boto.sqs.connect_to_region(region, **aws_creds(conf))
if not conn:
raise ValueErrorRetry("Could not establish SQS connection to region %r" % (region,))
else:
conn = boto.connect_sqs(**aws_creds(conf))
return conn
def parse_sqs_url(url):
if url.startswith('sqs://'):
return url[6:]
def get_sqs_work_queue_name(conf):
qname = conf.get('WORK_QUEUE')
if not qname:
raise ValueError("WORK_QUEUE not defined in configuration")
qname = parse_sqs_url(qname)
if not qname:
raise ValueError("WORK_QUEUE must be an sqs:// URL")
return qname
def write_sqs_queue(string, queue):
m = boto.sqs.message.Message()
m.set_body(string)
queue.write(m)
def get_sqs_conn(conf):
region = conf.get('SQS_REGION')
if region:
conn = boto.sqs.connect_to_region(region, **aws_creds(conf))
if not conn:
raise ValueErrorRetry("Could not establish SQS connection to region %r" % (region,))
else:
conn = boto.connect_sqs(**aws_creds(conf))
return conn
def get_sqs_work_queue_name(conf):
qname = conf.get('WORK_QUEUE')
if not qname:
raise ValueError("WORK_QUEUE not defined in configuration")
qname = parse_sqs_url(qname)
if not qname:
raise ValueError("WORK_QUEUE must be an sqs:// URL")
return qname
def write_sqs_queue(string, queue):
m = boto.sqs.message.Message()
m.set_body(string)
queue.write(m)
def __init__(self):
self.enable_sqs = 'ANSIBLE_ENABLE_SQS' in os.environ
if not self.enable_sqs:
return
# make sure we got our imports
if not boto:
raise ImportError(
"The sqs callback module requires the boto Python module, "
"which is not installed or was not found."
)
self.start_time = time.time()
if not 'SQS_REGION' in os.environ:
print 'ANSIBLE_ENABLE_SQS enabled but SQS_REGION ' \
'not defined in environment'
sys.exit(1)
self.region = os.environ['SQS_REGION']
try:
self.sqs = boto.sqs.connect_to_region(self.region)
except NoAuthHandlerFound:
print 'ANSIBLE_ENABLE_SQS enabled but cannot connect ' \
'to AWS due invalid credentials'
sys.exit(1)
if not 'SQS_NAME' in os.environ:
print 'ANSIBLE_ENABLE_SQS enabled but SQS_NAME not ' \
'defined in environment'
sys.exit(1)
self.name = os.environ['SQS_NAME']
self.queue = self.sqs.create_queue(self.name)
if 'SQS_MSG_PREFIX' in os.environ:
self.prefix = os.environ['SQS_MSG_PREFIX']
else:
self.prefix = ''
self.last_seen_ts = {}
def _send_queue_message(self, msg, msg_type):
if self.enable_sqs:
from_start = time.time() - self.start_time
payload = {msg_type: msg}
payload['TS'] = from_start
payload['PREFIX'] = self.prefix
# update the last seen timestamp for
# the message type
self.last_seen_ts[msg_type] = time.time()
if msg_type in ['OK', 'FAILURE']:
# report the delta between the OK/FAILURE and
# last TASK
if 'TASK' in self.last_seen_ts:
from_task = \
self.last_seen_ts[msg_type] - self.last_seen_ts['TASK']
payload['delta'] = from_task
for output in ['stderr', 'stdout']:
if output in payload[msg_type]:
# only keep the last 1000 characters
# of stderr and stdout
# Some modules set the value of stdout or stderr to booleans in
# which case the len will fail. Check to see if there is content
# before trying to clip it.
if payload[msg_type][output] and len(payload[msg_type][output]) > 1000:
payload[msg_type][output] = "(clipping) ... " \
+ payload[msg_type][output][-1000:]
if 'stdout_lines' in payload[msg_type]:
# only keep the last 20 or so lines to avoid payload size errors
if len(payload[msg_type]['stdout_lines']) > 20:
payload[msg_type]['stdout_lines'] = ['(clipping) ... '] + payload[msg_type]['stdout_lines'][-20:]
while True:
try:
self.sqs.send_message(self.queue, json.dumps(payload))
break
except socket.gaierror as e:
print 'socket.gaierror will retry: ' + e
time.sleep(1)
except Exception as e:
raise e
def __init__(self):
self.enable_sqs = 'ANSIBLE_ENABLE_SQS' in os.environ
if not self.enable_sqs:
return
# make sure we got our imports
if not boto:
raise ImportError(
"The sqs callback module requires the boto Python module, "
"which is not installed or was not found."
)
self.start_time = time.time()
if not 'SQS_REGION' in os.environ:
print 'ANSIBLE_ENABLE_SQS enabled but SQS_REGION ' \
'not defined in environment'
sys.exit(1)
self.region = os.environ['SQS_REGION']
try:
self.sqs = boto.sqs.connect_to_region(self.region)
except NoAuthHandlerFound:
print 'ANSIBLE_ENABLE_SQS enabled but cannot connect ' \
'to AWS due invalid credentials'
sys.exit(1)
if not 'SQS_NAME' in os.environ:
print 'ANSIBLE_ENABLE_SQS enabled but SQS_NAME not ' \
'defined in environment'
sys.exit(1)
self.name = os.environ['SQS_NAME']
self.queue = self.sqs.create_queue(self.name)
if 'SQS_MSG_PREFIX' in os.environ:
self.prefix = os.environ['SQS_MSG_PREFIX']
else:
self.prefix = ''
self.last_seen_ts = {}
def _send_queue_message(self, msg, msg_type):
if self.enable_sqs:
from_start = time.time() - self.start_time
payload = {msg_type: msg}
payload['TS'] = from_start
payload['PREFIX'] = self.prefix
# update the last seen timestamp for
# the message type
self.last_seen_ts[msg_type] = time.time()
if msg_type in ['OK', 'FAILURE']:
# report the delta between the OK/FAILURE and
# last TASK
if 'TASK' in self.last_seen_ts:
from_task = \
self.last_seen_ts[msg_type] - self.last_seen_ts['TASK']
payload['delta'] = from_task
for output in ['stderr', 'stdout']:
if output in payload[msg_type]:
# only keep the last 1000 characters
# of stderr and stdout
# Some modules set the value of stdout or stderr to booleans in
# which case the len will fail. Check to see if there is content
# before trying to clip it.
if payload[msg_type][output] and len(payload[msg_type][output]) > 1000:
payload[msg_type][output] = "(clipping) ... " \
+ payload[msg_type][output][-1000:]
if 'stdout_lines' in payload[msg_type]:
# only keep the last 20 or so lines to avoid payload size errors
if len(payload[msg_type]['stdout_lines']) > 20:
payload[msg_type]['stdout_lines'] = ['(clipping) ... '] + payload[msg_type]['stdout_lines'][-20:]
while True:
try:
self.sqs.send_message(self.queue, json.dumps(payload))
break
except socket.gaierror as e:
print 'socket.gaierror will retry: ' + e
time.sleep(1)
except Exception as e:
raise e
def __init__(self):
self.enable_sqs = 'ANSIBLE_ENABLE_SQS' in os.environ
if not self.enable_sqs:
return
# make sure we got our imports
if not boto:
raise ImportError(
"The sqs callback module requires the boto Python module, "
"which is not installed or was not found."
)
self.start_time = time.time()
if not 'SQS_REGION' in os.environ:
print 'ANSIBLE_ENABLE_SQS enabled but SQS_REGION ' \
'not defined in environment'
sys.exit(1)
self.region = os.environ['SQS_REGION']
try:
self.sqs = boto.sqs.connect_to_region(self.region)
except NoAuthHandlerFound:
print 'ANSIBLE_ENABLE_SQS enabled but cannot connect ' \
'to AWS due invalid credentials'
sys.exit(1)
if not 'SQS_NAME' in os.environ:
print 'ANSIBLE_ENABLE_SQS enabled but SQS_NAME not ' \
'defined in environment'
sys.exit(1)
self.name = os.environ['SQS_NAME']
self.queue = self.sqs.create_queue(self.name)
if 'SQS_MSG_PREFIX' in os.environ:
self.prefix = os.environ['SQS_MSG_PREFIX']
else:
self.prefix = ''
self.last_seen_ts = {}