def cfn_client(self):
if self.__cfn_client is None:
session = Session(
profile_name = env.get('Profile'),
region_name = env.get('Region'),
aws_access_key_id = env.get('AccessKeyId'),
aws_secret_access_key = env.get('SecretAccessKey')
)
self.__cfn_client = session.client('cloudformation')
return self.__cfn_client
python类Session()的实例源码
def cfn_resource(self):
if self.__cfn_resource is None:
session = Session(
profile_name = env.get('Profile'),
region_name = env.get('Region'),
aws_access_key_id = env.get('AccessKeyId'),
aws_secret_access_key = env.get('SecretAccessKey')
)
self.__cfn_resource = session.resource('cloudformation')
return self.__cfn_resource
def console(self):
"""
Open AWS Console on your default Web browser.
"""
import webbrowser
session = boto3.session.Session()
webbrowser.open('https://%(region)s.console.aws.amazon.com/cloudformation/home?region=%(region)s#/stacks?filter=active' % dict(
region = session.region_name
))
def setup_s3_client():
"""
:return: Boto3 S3 session. Uses IAM credentials
"""
session = Session()
return session.client('s3', config=botocore.client.Config(signature_version='s3v4'))
aws-git-backed-static-website-lambda.py 文件源码
项目:aws-git-backed-static-website
作者: alestic
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def setup(event):
# Extract attributes passed in by CodePipeline
job_id = event['CodePipeline.job']['id']
job_data = event['CodePipeline.job']['data']
artifact = job_data['inputArtifacts'][0]
config = job_data['actionConfiguration']['configuration']
credentials = job_data['artifactCredentials']
from_bucket = artifact['location']['s3Location']['bucketName']
from_key = artifact['location']['s3Location']['objectKey']
from_revision = artifact['revision']
#output_artifact = job_data['outputArtifacts'][0]
#to_bucket = output_artifact['location']['s3Location']['bucketName']
#to_key = output_artifact['location']['s3Location']['objectKey']
user_parameters = config['UserParameters']
# Temporary credentials to access CodePipeline artifact in S3
key_id = credentials['accessKeyId']
key_secret = credentials['secretAccessKey']
session_token = credentials['sessionToken']
session = Session(aws_access_key_id=key_id,
aws_secret_access_key=key_secret,
aws_session_token=session_token)
s3 = session.client('s3',
config=botocore.client.Config(signature_version='s3v4'))
return (job_id, s3, from_bucket, from_key, from_revision,
user_parameters)
def get_chartwerk_bucket():
session = Session(
region_name=app_settings.AWS_REGION,
aws_access_key_id=app_settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=app_settings.AWS_SECRET_ACCESS_KEY
)
s3 = session.resource('s3')
return s3.Bucket(app_settings.AWS_BUCKET)
def create_client_with_profile(profile_name, region, resource_name='ec2'):
""" Create a new boto3 client with a boto3 profile in ~/.aws/credentials
Args:
profile_name (str): The name of the profile that you have set in your
~/.aws/credentials profile.
region (str): The aws region you want to connect to.
resource_name (str): Valid aws resource.
default=ec2
Basic Usage:
>>> client, err_msg = create_client_with_profile('lab01', 'us-west-2')
Returns:
Tuple (botocore.client.EC2, str)
"""
client = None
err_msg = ''
try:
session = (
boto3.session.Session(
profile_name=profile_name, region_name=region
)
)
client = session.client(resource_name)
except Exception as e:
err_msg = str(e)
return client, err_msg
def process_cf_file(self, args):
try:
cf_params_local = copy.deepcopy(args[0])
cluster = args[1]
elb_name_suffix = args[2]
env = args[3]
filename = args[4]
has_ecs_service = args[5]
listener_port = args[6]
region = args[7]
session = boto3.session.Session()
if has_ecs_service:
elb_name = 'ecs-elb-' + cluster
if elb_name_suffix is not None:
elb_name = "-".join([elb_name, elb_name_suffix])
self.populate_ecs_service_params(session, cf_params_local, cluster, elb_name, env, region, listener_port)
# Skip non-cf files
ext = filename.split('.')[-1]
if ext != 'template' and ext != 'yml':
return
cf_client = session.client('cloudformation', region_name=region)
name = filename.split('/')[-1].split('.')[0]
logging.info("%s: Processing CloudFormation Template" % filename)
cf_params_local['name'] = name
parameters = [{'ParameterKey': 'name', 'ParameterValue': name}]
if name is None or name in filename:
with open(filename, 'r') as f_h:
try:
cf_template = f_h.read()
except:
logging.exception("%s: Error reading file." % (filename))
self.catfile(filename)
raise
validate_response = self.validate_template(cf_client, cf_template, filename)
service_name = "%s-%s-%s" % (env, name, cluster)
if elb_name_suffix is not None:
service_name = "-".join([service_name, elb_name_suffix])
cf_command = cf_client.create_stack
existing_stack_id = self.find_existing_stack(cf_client, cf_params_local, service_name)
if existing_stack_id is not None:
cf_command = cf_client.update_stack
self.populate_cf_params(cf_params_local, existing_stack_id, filename, parameters, validate_response)
logging.info("%s: Updating CloudFormation Stack" % (service_name))
try:
cf_response = cf_command(StackName=service_name, TemplateBody=cf_template, Parameters=parameters, Capabilities=["CAPABILITY_IAM"])
creating_stack_id = cf_response['StackId']
stack_status = self.wait_for_stack_creation(cf_client, creating_stack_id, service_name)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Message"] == 'No updates are to be performed.':
logging.info("%s: No updates to be performed, CF update succeeded." % service_name)
else:
raise
self.q.put("%s Succeeded" % filename)
logging.info("%s Succeeded" % filename)
except Exception as e:
logging.error("%s: Error executing CloudFormation Stack" % filename)
logging.exception(e)
self.q.put("%s Failed" % filename)