python类Session()的实例源码

fabricawscfn.py 文件源码 项目:fabric-aws-cloudformation 作者: crossroad0201 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cfn_client(self):
        if self.__cfn_client is None:
            session = Session(
                profile_name = env.get('Profile'),
                region_name = env.get('Region'),
                aws_access_key_id = env.get('AccessKeyId'),
                aws_secret_access_key = env.get('SecretAccessKey')
            )
            self.__cfn_client = session.client('cloudformation')
        return self.__cfn_client
fabricawscfn.py 文件源码 项目:fabric-aws-cloudformation 作者: crossroad0201 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cfn_resource(self):
        if self.__cfn_resource is None:
            session = Session(
                profile_name = env.get('Profile'),
                region_name = env.get('Region'),
                aws_access_key_id = env.get('AccessKeyId'),
                aws_secret_access_key = env.get('SecretAccessKey')
            )
            self.__cfn_resource = session.resource('cloudformation')
        return self.__cfn_resource
fabricawscfn.py 文件源码 项目:fabric-aws-cloudformation 作者: crossroad0201 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def console(self):
        """
        Open AWS Console on your default Web browser.
        """
        import webbrowser
        session = boto3.session.Session()
        webbrowser.open('https://%(region)s.console.aws.amazon.com/cloudformation/home?region=%(region)s#/stacks?filter=active' % dict(
            region = session.region_name
        ))
sync-catalog.py 文件源码 项目:aws-pipeline-to-service-catalog 作者: awslabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setup_s3_client():
    """
    :return: Boto3 S3 session. Uses IAM credentials
    """
    session = Session()
    return session.client('s3', config=botocore.client.Config(signature_version='s3v4'))
aws-git-backed-static-website-lambda.py 文件源码 项目:aws-git-backed-static-website 作者: alestic 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def setup(event):
    # Extract attributes passed in by CodePipeline
    job_id = event['CodePipeline.job']['id']
    job_data = event['CodePipeline.job']['data']
    artifact = job_data['inputArtifacts'][0]
    config = job_data['actionConfiguration']['configuration']
    credentials = job_data['artifactCredentials']
    from_bucket = artifact['location']['s3Location']['bucketName']
    from_key = artifact['location']['s3Location']['objectKey']
    from_revision = artifact['revision']
    #output_artifact = job_data['outputArtifacts'][0]
    #to_bucket = output_artifact['location']['s3Location']['bucketName']
    #to_key = output_artifact['location']['s3Location']['objectKey']
    user_parameters = config['UserParameters']

    # Temporary credentials to access CodePipeline artifact in S3
    key_id = credentials['accessKeyId']
    key_secret = credentials['secretAccessKey']
    session_token = credentials['sessionToken']
    session = Session(aws_access_key_id=key_id,
                      aws_secret_access_key=key_secret,
                      aws_session_token=session_token)
    s3 = session.client('s3',
                        config=botocore.client.Config(signature_version='s3v4'))

    return (job_id, s3, from_bucket, from_key, from_revision,
            user_parameters)
aws.py 文件源码 项目:django-chartwerk 作者: DallasMorningNews 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_chartwerk_bucket():
    session = Session(
        region_name=app_settings.AWS_REGION,
        aws_access_key_id=app_settings.AWS_ACCESS_KEY_ID,
        aws_secret_access_key=app_settings.AWS_SECRET_ACCESS_KEY
    )
    s3 = session.resource('s3')
    return s3.Bucket(app_settings.AWS_BUCKET)
ec2_vpc_peer.py 文件源码 项目:ld-ansible-modules 作者: linuxdynasty 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_client_with_profile(profile_name, region, resource_name='ec2'):
    """ Create a new boto3 client with a boto3 profile  in ~/.aws/credentials
    Args:
        profile_name (str): The name of the profile that you have set in your
            ~/.aws/credentials profile.
        region (str): The aws region you want to connect to.
        resource_name (str): Valid aws resource.
            default=ec2

    Basic Usage:
        >>> client, err_msg = create_client_with_profile('lab01', 'us-west-2')

    Returns:
        Tuple (botocore.client.EC2, str)
    """
    client = None
    err_msg = ''
    try:
        session = (
            boto3.session.Session(
                profile_name=profile_name, region_name=region
            )
        )
        client = session.client(resource_name)
    except Exception as e:
        err_msg = str(e)

    return client, err_msg
cfUpdate.py 文件源码 项目:ecs-deploy 作者: OpenWhere 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_cf_file(self, args):
        try:
            cf_params_local = copy.deepcopy(args[0])
            cluster = args[1]
            elb_name_suffix = args[2]
            env = args[3]
            filename = args[4]
            has_ecs_service = args[5]
            listener_port = args[6]
            region = args[7]
            session = boto3.session.Session()
            if has_ecs_service:
                elb_name = 'ecs-elb-' + cluster
                if elb_name_suffix is not None:
                    elb_name = "-".join([elb_name, elb_name_suffix])
                self.populate_ecs_service_params(session, cf_params_local, cluster, elb_name, env, region, listener_port)
            # Skip non-cf files
            ext = filename.split('.')[-1]
            if ext != 'template' and ext != 'yml':
                return
            cf_client = session.client('cloudformation', region_name=region)
            name = filename.split('/')[-1].split('.')[0]
            logging.info("%s: Processing CloudFormation Template" % filename)
            cf_params_local['name'] = name
            parameters = [{'ParameterKey': 'name', 'ParameterValue': name}]
            if name is None or name in filename:
                with open(filename, 'r') as f_h:
                    try:
                        cf_template = f_h.read()
                    except:
                        logging.exception("%s: Error reading file." % (filename))
                        self.catfile(filename)
                        raise
                    validate_response = self.validate_template(cf_client, cf_template, filename)

                    service_name = "%s-%s-%s" % (env, name, cluster)
                    if elb_name_suffix is not None:
                        service_name = "-".join([service_name, elb_name_suffix])
                    cf_command = cf_client.create_stack
                    existing_stack_id = self.find_existing_stack(cf_client, cf_params_local, service_name)
                    if existing_stack_id is not None:
                        cf_command = cf_client.update_stack
                    self.populate_cf_params(cf_params_local, existing_stack_id, filename, parameters, validate_response)
                    logging.info("%s: Updating CloudFormation Stack" % (service_name))
                    try:
                        cf_response = cf_command(StackName=service_name, TemplateBody=cf_template, Parameters=parameters, Capabilities=["CAPABILITY_IAM"])
                        creating_stack_id = cf_response['StackId']
                        stack_status = self.wait_for_stack_creation(cf_client, creating_stack_id, service_name)
                    except botocore.exceptions.ClientError as e:
                        if e.response["Error"]["Message"] == 'No updates are to be performed.':
                            logging.info("%s: No updates to be performed, CF update succeeded." % service_name)
                        else:
                            raise
                    self.q.put("%s Succeeded" % filename)
                    logging.info("%s Succeeded" % filename)
        except Exception as e:
            logging.error("%s: Error executing CloudFormation Stack" % filename)
            logging.exception(e)
            self.q.put("%s Failed" % filename)


问题


面经


文章

微信
公众号

扫码关注公众号