python类client()的实例源码

r53_cleanup.py 文件源码 项目:zinc 作者: PressLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def destroy(self, dry_run=False):
        changes = []
        for record in self.records['ResourceRecordSets']:
            if record['Name'] == self.zone['Name'] and record['Type'] in ['NS', 'SOA']:
                continue
            changes.append({
                'Action': 'DELETE',
                'ResourceRecordSet': record
            })

        print('{} {} ({})'.format('Deleting' if dry_run else 'Will delete',
                                  self.zone['Name'], self.zone_id))
        if not dry_run:
            if changes:
                client.change_resource_record_sets(HostedZoneId=self.zone_id,
                                                   ChangeBatch={
                                                       'Changes': changes
                                                   })
            client.delete_hosted_zone(Id=self.zone_id)
ses_tests.py 文件源码 项目:aws-certificate-management 作者: ImmobilienScout24 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_bucket_policy(cls):
        sts_client = boto3.client('sts', region_name='eu-west-1')
        account_id = sts_client.get_caller_identity()['Account']
        policy_document = {
            "Version": "2008-10-17",
            "Statement": [
                {
                    "Sid": "GiveSESPermissionToWriteEmail",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "ses.amazonaws.com"
                    },
                    "Action": "s3:PutObject",
                    "Resource": "arn:aws:s3:::{0}/*".format(cls.s3_bucket),
                    "Condition": {
                        "StringEquals": {
                            "aws:Referer": account_id
                        }
                    }
                }
            ]
        }
        s3 = boto3.resource('s3')
        policy = s3.BucketPolicy(cls.s3_bucket)
        policy.put(Policy=json.dumps(policy_document))
bootstrap.py 文件源码 项目:api-manager 作者: edx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_apigw_custom_domain_name(domain_name, cert_name, cert_body, cert_pk, cert_chain):
    """Creates an api gateway custom domain entity"""

    client = boto3.client('apigateway', region_name=args.aws_region)

    try:
        response = client.create_domain_name(
            domainName=domain_name,
            certificateName=cert_name,
            certificateBody=cert_body,
            certificatePrivateKey=cert_pk,
            certificateChain=cert_chain
        )
    except Exception, e:
        raise e

    return response
bootstrap.py 文件源码 项目:api-manager 作者: edx 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def bootstrap_api(stage_name):
    """
    Upload a bootstrap Swagger document to a new API Gateway object and set it live
    with environment-specific variables.
    """

    client = boto3.client('apigateway', region_name=args.aws_region)

    # bootstrap.json is relative to me; where am I?
    my_dir = os.path.dirname(os.path.realpath(__file__))

    bootstrap_swagger = open(my_dir + '/bootstrap.json', 'r')

    response = client.import_rest_api(body=bootstrap_swagger.read())
    logging.info('New bootstrap API ID "%s" created', response['id'])

    client.create_deployment(
        restApiId=response['id'],
        stageName=stage_name)
    logging.info('API ID "%s" deployed to stage "%s"', response['id'], stage_name)

    return response['id']
update_job_definition.py 文件源码 项目:pfb-network-connectivity 作者: azavea 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():

    parser = argparse.ArgumentParser(description=help)
    parser.add_argument('job_definition_filename', type=str)
    parser.add_argument('image_url', type=str)

    parser.add_argument('--deregister', action='store_true',
                        help='Deregister old verison of the job definition after updating')
    args = parser.parse_args()

    path_to_config_json = os.path.join('.', 'job-definitions',
                                       args.job_definition_filename)
    with open(path_to_config_json, 'r') as json_file:
        job_definition = json.load(json_file)
        job_definition['containerProperties']['image'] = args.image_url

        client = boto3.client('batch')
        response = client.register_job_definition(**job_definition)

        if args.deregister:
            old_revision = int(response['revision']) - 1
            old_job_definition = '{}:{}'.format(response['jobDefinitionName'], old_revision)
            client.deregister_job_definition(jobDefinition=old_job_definition)

        print('{}:{}'.format(response['jobDefinitionName'], response['revision']), end='')
aws_batch.py 文件源码 项目:pfb-network-connectivity 作者: azavea 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_latest_job_definition(job_definition_name):
    """ Get the latest revision of an AWS Batch job definition

    Raises NoActiveJobDefinitionRevision if no current active revision for the
        requested job definition

    """
    client = boto3.client('batch')
    response = client.describe_job_definitions(jobDefinitionName=job_definition_name,
                                               status='ACTIVE')
    job_definitions = response.get('jobDefinitions', [])
    while(response.get('nextToken') is not None):
        response = client.describe_job_definitions(jobDefinitionName=job_definition_name,
                                                   status='ACTIVE',
                                                   nextToken=response['nextToken'])
        job_definitions.extend(response.get('jobDefinitions', []))
    sorted_definitions = sorted(job_definitions, key=lambda job: job['revision'])
    try:
        return sorted_definitions.pop()
    except IndexError:
        raise NoActiveJobDefinitionRevision(job_definition=job_definition_name)
models.py 文件源码 项目:pfb-network-connectivity 作者: azavea 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cancel(self, reason=None):
        """ Cancel the analysis job, if its running """
        if not reason:
            reason = 'AnalysisJob terminated by user at {}'.format(datetime.utcnow())

        if self.status in self.Status.ACTIVE_STATUSES:
            logger.info('Cancelling job: {}'.format(self))
            old_status = self.status
            self.update_status(self.Status.CANCELLED)
            if self.batch_job_id is not None:
                try:
                    client = boto3.client('batch')
                    client.terminate_job(jobId=self.batch_job_id, reason=reason)
                except:
                    self.update_status(old_status,
                                       'REVERTED',
                                       'Reverted due to failure cancelling job in AWS Batch')
                    raise
fabfile.py 文件源码 项目:sync-buckets-state-machine 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_bucket(bucket):
    s3 = boto3.client('s3', region_name=AWS_DEFAULT_REGION)

    print('Checking bucket: ' + bucket)
    try:
        s3.head_bucket(Bucket=bucket)
    except ClientError:
        print('Creating bucket: ' + bucket)
        args = {
            'Bucket': bucket
        }
        if AWS_DEFAULT_REGION != 'us-east-1':
            args['CreateBucketConfiguration'] = {
                'LocationConstraint': AWS_DEFAULT_REGION
            }
        s3.create_bucket(**args)
        waiter = s3.get_waiter('bucket_exists')
        waiter.wait(Bucket=bucket)
fabfile.py 文件源码 项目:sync-buckets-state-machine 作者: awslabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_timestamp_from_s3_object(bucket, key):
    s3 = boto3.client('s3', region_name=AWS_DEFAULT_REGION)

    try:
        response = s3.get_object(
            Bucket=bucket,
            Key=key
        )
        timestamp = response['LastModified']  # We assume this is UTC.
    except ClientError:
        timestamp = datetime(1970, 1, 1, tzinfo=None)

    return (timestamp.replace(tzinfo=None) - datetime(1970, 1, 1, tzinfo=None)).total_seconds()


# IAM
fabfile.py 文件源码 项目:sync-buckets-state-machine 作者: awslabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_arn_from_policy_name(policy_name):
    iam = boto3.client('iam', region_name=AWS_DEFAULT_REGION)

    args = {
        'Scope': 'All'
    }
    while True:
        response = iam.list_policies(**args)
        for p in response['Policies']:
            if p['PolicyName'] == policy_name:
                return p['Arn']
        if response['IsTruncated']:
            args['Marker'] = response['Marker']
        else:
            return None


# Lambda
fabfile.py 文件源码 项目:sync-buckets-state-machine 作者: awslabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def execute_cfn_change_set(change_set_id):
    cfn = boto3.client('cloudformation', region_name=AWS_DEFAULT_REGION)

    print('Executing CloudFormation change set...')
    cfn.execute_change_set(ChangeSetName=change_set_id)

    while True:
        response = get_cfn_stack_info()
        if response is None:
            status = 'UNKNOWN'
        else:
            status = response.get('StackStatus', 'UNKNOWN')

        print('Status: ' + status)
        if 'StatusReason' in response:
            print('Reason: ' + response['StatusReason'])
        if status.endswith('FAILED') or status == 'ROLLBACK_COMPLETE':
            exit(1)
        elif status == 'UNKNOWN':
            print('Stack info:\n' + json.dumps(response, sort_keys=True, indent=4, default=str))
        elif status.endswith('COMPLETE'):
            return

        time.sleep(SLEEP_TIME)
amazon.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create(self, name, cidr, **kargs):

        # step1: create vpc
        vpc = self.client.create_vpc(
            CidrBlock=cidr,
            InstanceTenancy='default'
        ).get('Vpc')
        # step 2: create subnet
        subnet = self.client.create_subnet(
            VpcId=vpc.get('VpcId'),
            CidrBlock=cidr
        ).get('Subnet')

        result = {'name': subnet['SubnetId'],
                  'description': None,
                  'id': subnet['SubnetId'],
                  'cidr': subnet['CidrBlock'],
                  'cloud': PROVIDER,
                  'gateway_ip': None,
                  'security_group': None,
                  'allocation_pools': None,
                  'dns_nameservers': None
                  }

        return result
amazon.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def show(self, subnet_id):
        subnet = self.client.describe_subnets(
            SubnetIds=[subnet_id]).get('Subnets')[0]

        result = {'name': subnet['SubnetId'],
                  'description': None,
                  'id': subnet['SubnetId'],
                  'cidr': subnet['CidrBlock'],
                  'cloud': PROVIDER,
                  'gateway_ip': None,
                  'security_group': None,
                  'allocation_pools': None,
                  'dns_nameservers': None
                  }

        return result
amazon.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def list(self, **search_opts):
        subnets = self.client.describe_subnets(**search_opts).get('Subnets')
        result = []
        for subnet in subnets:
            sub = {'name': subnet['SubnetId'],
                   'description': None,
                   'id': subnet['SubnetId'],
                   'cidr': subnet['CidrBlock'],
                   'cloud': PROVIDER,
                   'gateway_ip': None,
                   'security_group': None,
                   'allocation_pools': None,
                   'dns_nameservers': None
                   }
            result.append(sub)

        return result
amazon.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def copy_object(self, container, obj, metadata=None,
                    destination=None, **kwargs):
        copysource = {
            'Bucket': container,
            'Key': obj
        }

        if destination:
            metadata_directive = 'COPY'
            dst_container, dst_obj = destination.strip('/').split('/')
        else:
            metadata_directive = 'REPLACE'
            dst_container, dst_obj = container, obj
        if not metadata:
            metadata = {}
        return self.client.copy_object(Bucket=dst_container, Key=dst_obj,
                                       Metadata=metadata,
                                       MetadataDirective=metadata_directive,
                                       CopySource=copysource)
checker.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, region='us-east-1'):
        self.check_categories = ['S3','IAM', 'VPC', 'CloudWatch', 'CloudTrail']
        self.ec2 = boto3.resource("ec2", region_name=region)
        self.ec2_client = boto3.client("ec2", region_name=region)
        self.cloudwatch = boto3.resource("cloudwatch", region_name=region)
        self.cloudwatch_client = boto3.client("cloudwatch", region_name=region)
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
        self.iam = boto3.resource("iam", region_name=region)
        self.iam_client = boto3.client("iam", region_name=region)
        self.s3 = boto3.resource("s3", region_name=region)

        self.results = []
        self.results_dict = {}
checker.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check_vpcs(self):
        #collect vpc ids
        regions = get_regions()
        for region in regions:
            ec2 = boto3.resource('ec2', region_name=region)
            ec2_client = boto3.client('ec2', region_name=region)
            ids = [ x.id for x in ec2.vpcs.all() ]
            flowlogs = self.get_flowlogs_by_vpc_id(ec2_client)

            for vpc_id in ids:
                vpc_dict = flowlogs.get(vpc_id, None)
                self.append_collection(
                    misc_checks.VPCFlowLogCheck(vpc_id, vpc_dict)
                    )
checker.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_regions():
    client = boto3.client('ec2', region_name='us-east-1')
    regions = [ x['RegionName'] for x in client.describe_regions()['Regions']]
    return regions
delete_stack.py 文件源码 项目:aws-consolidated-admin 作者: awslabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def lambda_handler(event, context):
    sess = boto3.session.Session(
        aws_access_key_id=event['Credentials']['AccessKeyId'],
        aws_secret_access_key=decrypt(
            event['Credentials']['SecretAccessKeyCiphertext']),
        aws_session_token=event['Credentials']['SessionToken'],
        region_name=event['Region'])

    cfn = sess.client('cloudformation')

    resp = cfn.delete_stack(StackName=event['Stack']['StackId'])

    return {
        'RequestId': resp['ResponseMetadata']['RequestId']
    }


问题


面经


文章

微信
公众号

扫码关注公众号