python类resource()的实例源码

aws.py 文件源码 项目:two1-deep-learning 作者: 21dotco 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def spin_up(data):
    ec2 = boto3.resource('ec2')

    instances = ec2.create_instances(
        ImageId=settings.EC2_AMI_ID,
        InstanceType=settings.EC2_INSTANCE_TYPE,
        KeyName=settings.EC2_SSH_KEYPAIR_ID,
        MinCount=1,
        MaxCount=1,
        IamInstanceProfile={
            'Arn': settings.EC2_IAM_INSTANCE_PROFILE_ARN
        },
        InstanceInitiatedShutdownBehavior='terminate',
        SecurityGroupIds=[settings.EC2_SECURITY_GROUP_NAME],
        UserData=USERDATA_TEMPLATE.format(**data)
    )

    instance = instances[0]
    logger.info('Spinning up instance with id {} at {}'.format(instance.id, instance.launch_time))

    return instance.id
inject.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def bucket_load(self, *args, **kwargs):
    """
    Calls s3.Client.list_buckets() to update the attributes of the Bucket
    resource.
    """
    # The docstring above is phrased this way to match what the autogenerated
    # docs produce.

    # We can't actually get the bucket's attributes from a HeadBucket,
    # so we need to use a ListBuckets and search for our bucket.
    response = self.meta.client.list_buckets()
    for bucket_data in response['Buckets']:
        if bucket_data['Name'] == self.name:
            self.meta.data = bucket_data
            break
    else:
        raise ClientError({'Error': {'Code': '404', 'Message': 'NotFound'}},
                          'ListBuckets')
test_deploy_unit.py 文件源码 项目:ardy 作者: avara1986 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _create_lambda_from_s3(self):
        zip_file = MockZipFile.create_zip("test")

        bucket_name = "test_bucket"
        s3 = boto3.resource('s3')
        bucket = s3.Bucket(bucket_name)
        bucket.create()

        bucket.put_object(
            Key=zip_file,
            Body=MockZipFile.read_file(zip_file)

        )
        self.lambda_conf["Code"] = {'S3Bucket': bucket_name,
                                    'S3Key': zip_file}

        response = self.deploy.remote_create_lambada(**self.lambda_conf)
        self.assertEqual(response["ResponseMetadata"]["HTTPStatusCode"], 201)
        for key in ["FunctionName", "Role", "Runtime", "Handler"]:
            self.assertEqual(response[key], self.lambda_conf[key])

        return response
base.py 文件源码 项目:zappa-django-utils 作者: Miserlou 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def close(self, *args, **kwargs):
        """
        Engine closed, copy file to DB
        """
        super(DatabaseWrapper, self).close(*args, **kwargs)

        signature_version = self.settings_dict.get("SIGNATURE_VERSION", "s3v4")
        s3 = boto3.resource('s3',
                config=botocore.client.Config(signature_version=signature_version))

        try:
            with open(self.settings_dict['NAME'], 'rb') as f:
                fb = f.read()
                bytesIO = BytesIO()
                bytesIO.write(fb)
                bytesIO.seek(0)

                s3_object = s3.Object(self.settings_dict['BUCKET'], self.settings_dict['REMOTE_NAME'])
                result = s3_object.put('rb', Body=bytesIO)

        except Exception as e:
            print(e)

        logging.debug("Saved to remote DB!")
cluster_auto_start_daemon.py 文件源码 项目:sm-engine-ansible 作者: METASPACE2020 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, ansible_config_path, aws_key_name=None, interval=60,
                 qname='sm_annotate', debug=False):

        with open(ansible_config_path) as fp:
            self.ansible_config = yaml.load(fp)

        self.interval = min(interval, 1200)
        self.aws_key_name = aws_key_name or self.ansible_config['aws_key_name']
        self.master_hostgroup = self.ansible_config['cluster_configuration']['instances']['master']['hostgroup']
        self.slave_hostgroup = self.ansible_config['cluster_configuration']['instances']['slave']['hostgroup']
        self.stage = self.ansible_config['stage']
        self.qname = qname
        self.debug = debug

        self._setup_logger()
        self.ec2 = boto3.resource('ec2', self.ansible_config['aws_region'])
func.py 文件源码 项目:cfpp 作者: dcoker 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def handler(event, context):
    s3 = boto3.resource('s3')
    url = event["URL"]
    parsed = urllib2.urlparse.urlparse(url)
    response = s3.Object(parsed.netloc, parsed.path.lstrip('/')).get()

    linecount = bytes = 0
    contents = response['Body'].read(BUF_SZ)
    while contents:
        bytes += len(contents)
        linecount += contents.count('\n')
        contents = response['Body'].read(BUF_SZ)
    return {
        'url': event["URL"],
        'lines': linecount,
        'bytes': bytes
    }
submit_split_job.py 文件源码 项目:Falco 作者: VCCRI 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_mapper_number(manifest_file):
    fastq_counts = 0

    if manifest_file.startswith("s3://"):
        s3 = boto3.resource("s3")

        bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1)

        with tempfile.TemporaryDirectory() as tmpdirname:
            s3.meta.client.download_file(bucket_name, key_prefix, tmpdirname + "/manifest")

            for line in open(tmpdirname+"/manifest"):
                fastq_counts += 1
    else:
        for line in open(manifest_file):
            fastq_counts += 1

    return fastq_counts
submit_download_job.py 文件源码 项目:Falco 作者: VCCRI 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def set_mapper_number(manifest_file):
    accession_counts = 0

    if manifest_file.startswith("s3://"):
        s3_client = boto3.resource("s3")

        bucket_name, key_prefix = manifest_file.strip().strip("/")[5:].split("/", 1)

        with tempfile.TemporaryDirectory() as tmpdirname:
            s3_client.Object(bucket_name, key_prefix).download_file(tmpdirname+"/manifest")

            for line in open(tmpdirname+"/manifest"):
                accession_counts += 1
    else:
        for line in open(manifest_file):
            accession_counts += 1

    return accession_counts
fastq_splitter.py 文件源码 项目:Falco 作者: VCCRI 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def upload_split(file_name):
    global parser_result

    base_file_name = file_name.split("/")[-1]

    if parser_result.output_dir.startswith("s3://"):
        bucket_name, folder_name = parser_result.output_dir[5:].split("/", 1)
        key_name = "{}/{}".format(folder_name.rstrip("/"), base_file_name)

        s3 = boto3.resource("s3", region_name=parser_result.s3_region)
        s3.Bucket(bucket_name).upload_file(file_name, key_name)

        os.remove(file_name)

    else:
        subprocess.call(["hdfs", "dfs", "-mkdir", "-p", parser_result.output_dir])
        subprocess.call(["hdfs", "dfs", "-put", file_name, parser_result.output_dir])

    print(base_file_name)
sra_downloader.py 文件源码 项目:Falco 作者: VCCRI 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def upload_split(file_name, print_output_name=True):
    global parser_result

    base_file_name = file_name.split("/")[-1]

    if parser_result.output_dir.startswith("s3://"):
        bucket_name, folder_name = parser_result.output_dir[5:].split("/", 1)
        key_name = "{}/{}".format(folder_name.rstrip("/"), base_file_name)

        s3 = boto3.resource("s3", region_name=parser_result.s3_region)
        s3.Bucket(bucket_name).upload_file(file_name, key_name)

        os.remove(file_name)

    else:
        subprocess.call(["hdfs", "dfs", "-mkdir", "-p", parser_result.output_dir])
        subprocess.call(["hdfs", "dfs", "-put", file_name, parser_result.output_dir])

    if print_output_name:
        print(base_file_name)
custom-resource.py 文件源码 项目:aws-waf-security-automation 作者: cerbo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def remove_s3_bucket_lambda_event(bucket_name, lambda_function_arn):
    s3 = boto3.resource('s3')
    s3_client = boto3.client('s3')
    try:
        new_conf = {}
        notification_conf = s3_client.get_bucket_notification_configuration(Bucket=bucket_name)
        if 'TopicConfigurations' in notification_conf:
            new_conf['TopicConfigurations'] = notification_conf['TopicConfigurations']
        if 'QueueConfigurations' in notification_conf:
            new_conf['QueueConfigurations'] = notification_conf['QueueConfigurations']

        if 'LambdaFunctionConfigurations' in notification_conf:
            new_conf['LambdaFunctionConfigurations'] = []
            for lfc in notification_conf['LambdaFunctionConfigurations']:
                if lfc['LambdaFunctionArn'] == lambda_function_arn:
                    continue #remove all references for Log Parser event
                else:
                    new_conf['LambdaFunctionConfigurations'].append(lfc)

        response = s3_client.put_bucket_notification_configuration(Bucket=bucket_name, NotificationConfiguration=new_conf)

    except Exception, e:
        print(e)
        print("[ERROR] Error to remove S3 Bucket lambda event")
ses_tests.py 文件源码 项目:aws-certificate-management 作者: ImmobilienScout24 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_bucket_policy(cls):
        sts_client = boto3.client('sts', region_name='eu-west-1')
        account_id = sts_client.get_caller_identity()['Account']
        policy_document = {
            "Version": "2008-10-17",
            "Statement": [
                {
                    "Sid": "GiveSESPermissionToWriteEmail",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "ses.amazonaws.com"
                    },
                    "Action": "s3:PutObject",
                    "Resource": "arn:aws:s3:::{0}/*".format(cls.s3_bucket),
                    "Condition": {
                        "StringEquals": {
                            "aws:Referer": account_id
                        }
                    }
                }
            ]
        }
        s3 = boto3.resource('s3')
        policy = s3.BucketPolicy(cls.s3_bucket)
        policy.put(Policy=json.dumps(policy_document))
amazon.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create(self, image_id, flavor_id,
               network_id, name=None, number=1, **kargs):
        if name is None:
            name = six.text_type(datetime.now())
        profile = {
            'Arn': '',
            'Name': name
        }
        server = self.resource.create_instances(
            ImageId=image_id,
            MinCount=number,
            MaxCount=number,
            InstanceType=flavor_id,
            SubnetId=network_id,
            IamInstanceProfile=profile
        )
        return server
iam.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_role(self):
        """Method to delete a role for a stack

        Returns:
            None
        """
        # Remove Role in-line policy
        try:
            iam = boto3.resource('iam')
            role_policy = iam.RolePolicy(self.role_name, self.policy_name)
            role_policy.delete()
        except botocore.exceptions.ClientError as e:
            # Assume a client error is a NoSuchEntity
            print("   No Role found. Skipping")
            return

        # Remove Role
        response = self.client.delete_role(RoleName=self.role_name)

        if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
            raise Exception("Failed to delete policy.")
controllers.py 文件源码 项目:bitstore 作者: datahq 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_s3_client():
    endpoint_url = os.environ.get("S3_ENDPOINT_URL")
    s3_client = boto3.client('s3',
                             # region_name='us-east-1',
                             aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'],
                             config=Config(signature_version='s3v4'),
                             aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'],
                             endpoint_url=endpoint_url
                             )
    if endpoint_url:
        try:
            s3 = boto3.resource('s3',
                                 aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'],
                                 config=Config(signature_version='s3v4'),
                                 aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'],
                                 endpoint_url=endpoint_url)
            s3.create_bucket(Bucket=config['STORAGE_BUCKET_NAME'])
            bucket = s3.Bucket(config['STORAGE_BUCKET_NAME'])
            bucket.Acl().put(ACL='public-read')
        except: # noqa
            logging.exception('Failed to create the bucket')
            pass
    return s3_client
inject.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def bucket_load(self, *args, **kwargs):
    """
    Calls s3.Client.list_buckets() to update the attributes of the Bucket
    resource.
    """
    # The docstring above is phrased this way to match what the autogenerated
    # docs produce.

    # We can't actually get the bucket's attributes from a HeadBucket,
    # so we need to use a ListBuckets and search for our bucket.
    response = self.meta.client.list_buckets()
    for bucket_data in response['Buckets']:
        if bucket_data['Name'] == self.name:
            self.meta.data = bucket_data
            break
    else:
        raise ClientError({'Error': {'Code': '404', 'Message': 'NotFound'}},
                          'ListBuckets')
inject.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def upload_file(self, Filename, Bucket, Key, ExtraArgs=None,
                Callback=None, Config=None):
    """Upload a file to an S3 object.

    Usage::

        import boto3
        s3 = boto3.resource('s3')
        s3.meta.client.upload_file('/tmp/hello.txt', 'mybucket', 'hello.txt')

    Similar behavior as S3Transfer's upload_file() method,
    except that parameters are capitalized. Detailed examples can be found at
    :ref:`S3Transfer's Usage <ref_s3transfer_usage>`.
    """
    with S3Transfer(self, Config) as transfer:
        return transfer.upload_file(
            filename=Filename, bucket=Bucket, key=Key,
            extra_args=ExtraArgs, callback=Callback)
inject.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def download_file(self, Bucket, Key, Filename, ExtraArgs=None,
                  Callback=None, Config=None):
    """Download an S3 object to a file.

    Usage::

        import boto3
        s3 = boto3.resource('s3')
        s3.meta.client.download_file('mybucket', 'hello.txt', '/tmp/hello.txt')

    Similar behavior as S3Transfer's download_file() method,
    except that parameters are capitalized. Detailed examples can be found at
    :ref:`S3Transfer's Usage <ref_s3transfer_usage>`.
    """
    with S3Transfer(self, Config) as transfer:
        return transfer.download_file(
            bucket=Bucket, key=Key, filename=Filename,
            extra_args=ExtraArgs, callback=Callback)
inject.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def bucket_upload_file(self, Filename, Key,
                       ExtraArgs=None, Callback=None, Config=None):
    """Upload a file to an S3 object.

    Usage::

        import boto3
        s3 = boto3.resource('s3')
        s3.Bucket('mybucket').upload_file('/tmp/hello.txt', 'hello.txt')

    Similar behavior as S3Transfer's upload_file() method,
    except that parameters are capitalized. Detailed examples can be found at
    :ref:`S3Transfer's Usage <ref_s3transfer_usage>`.
    """
    return self.meta.client.upload_file(
        Filename=Filename, Bucket=self.name, Key=Key,
        ExtraArgs=ExtraArgs, Callback=Callback, Config=Config)
inject.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def object_upload_file(self, Filename,
                       ExtraArgs=None, Callback=None, Config=None):
    """Upload a file to an S3 object.

    Usage::

        import boto3
        s3 = boto3.resource('s3')
        s3.Object('mybucket', 'hello.txt').upload_file('/tmp/hello.txt')

    Similar behavior as S3Transfer's upload_file() method,
    except that parameters are capitalized. Detailed examples can be found at
    :ref:`S3Transfer's Usage <ref_s3transfer_usage>`.
    """
    return self.meta.client.upload_file(
        Filename=Filename, Bucket=self.bucket_name, Key=self.key,
        ExtraArgs=ExtraArgs, Callback=Callback, Config=Config)
inject.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def object_download_file(self, Filename,
                         ExtraArgs=None, Callback=None, Config=None):
    """Download an S3 object to a file.

    Usage::

        import boto3
        s3 = boto3.resource('s3')
        s3.Object('mybucket', 'hello.txt').download_file('/tmp/hello.txt')

    Similar behavior as S3Transfer's download_file() method,
    except that parameters are capitalized. Detailed examples can be found at
    :ref:`S3Transfer's Usage <ref_s3transfer_usage>`.
    """
    return self.meta.client.download_file(
        Bucket=self.bucket_name, Key=self.key, Filename=Filename,
        ExtraArgs=ExtraArgs, Callback=Callback, Config=Config)
__init__.py 文件源码 项目:sbds 作者: steemit 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_checkpoints_from_s3_path(path):
    import boto3
    s3_resource = boto3.resource('s3')
    bucket_name, key_name = split_s3_bucket_key(path)
    bucket = s3_resource.Bucket(bucket_name)
    all_objects = list(bucket.objects.filter(Prefix=key_name))
    all_keys = [o.key for o in all_objects]
    keys = fnmatch.filter(all_keys, S3_KEY_PATTERN)
    checkpoints = []
    for f in keys:
        try:
            file_path = os.path.join(bucket_name, f)
            checkpoints.append(parse_checkpoint_s3_path(file_path))
        except ValueError:
            continue
    return sorted(checkpoints, key=lambda cp: cp.start)
test_taar_locale.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_generate_dictionary(spark, multi_locales_df):
    conn = boto3.resource('s3', region_name='us-west-2')
    conn.create_bucket(Bucket=taar_utils.AMO_DUMP_BUCKET)

    # Store the data in the mocked bucket.
    conn.Object(taar_utils.AMO_DUMP_BUCKET, key=taar_utils.AMO_DUMP_KEY)\
        .put(Body=json.dumps(FAKE_AMO_DUMP))

    multi_locales_df.createOrReplaceTempView("longitudinal")

    # The "en-US" locale must not be reported: we set it to a low
    # frequency on |multi_locale_df|.
    expected = {
        "it-IT": ["test-guid-0001"]
    }

    assert taar_locale.generate_dictionary(spark, 5) == expected
test_utils.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_write_csv_to_s3(generate_data):
    bucket = 'test-bucket'
    key = 'test.csv'

    conn = boto3.resource('s3', region_name='us-west-2')
    conn.create_bucket(Bucket=bucket)

    utils.write_csv_to_s3(generate_data(["foo"]), bucket, key)

    body = (
        conn
        .Object(bucket, key)
        .get()['Body']
        .read().decode('utf-8')
    )

    # header + 1x row = 2
    assert len(body.rstrip().split('\n')) == 2
test_utils.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_write_csv_to_s3_no_header(generate_data):
    bucket = 'test-bucket'
    key = 'test.csv'

    conn = boto3.resource('s3', region_name='us-west-2')
    conn.create_bucket(Bucket=bucket)

    utils.write_csv_to_s3(generate_data(), bucket, key, header=False)

    body = (
        conn
        .Object(bucket, key)
        .get()['Body']
        .read().decode('utf-8')
    )

    assert len(body.rstrip().split('\n')) == 1
test_utils.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_write_csv_to_s3_existing(generate_data):
    bucket = 'test-bucket'
    key = 'test.csv'

    conn = boto3.resource('s3', region_name='us-west-2')
    conn.create_bucket(Bucket=bucket)

    utils.write_csv_to_s3(generate_data(["foo"]), bucket, key)
    utils.write_csv_to_s3(generate_data(["foo", "bar"]), bucket, key)

    body = (
        conn
        .Object(bucket, key)
        .get()['Body']
        .read().decode('utf-8')
    )

    # header + 2x row = 3
    assert len(body.rstrip().split('\n')) == 3
model.py 文件源码 项目:myreco 作者: dutradda 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _put_file_on_s3(cls, stream, store_items_model, session, store_id):
        cls._logger.info("Started put file on S3 for '{}'".format(store_items_model.__key__))

        store = cls._run_coro(
            cls.get_model('stores').get(session, [{'id': store_id}]),
            session
        )[0]

        s3_bucket = store['configuration']['aws']['s3']['bucket']
        access_key_id = store['configuration']['aws'].get('access_key_id')
        secret_access_key = store['configuration']['aws'].get('secret_access_key')
        s3_key = '{}.zip'.format(store_items_model.__key__)

        boto3.resource(
            's3',
            aws_access_key_id=access_key_id,
            aws_secret_access_key=secret_access_key
        ).Bucket(s3_bucket).put_object(Body=stream, Key=s3_key)

        cls._logger.info("Finished put file on S3 for '{}'".format(store_items_model.__key__))
deploy_lambda.py 文件源码 项目:github-snooze-button 作者: tdsmith 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_or_get_lambda_role():
    """Creates the Lambda execution role for github-snooze-button.

    Args: None
    Returns: None
    """
    lambda_role_path = "/tdsmith/github-snooze-button/"
    lambda_role_name = "snooze_lambda_role"

    iam = boto3.resource("iam")
    roles = iam.roles.all()
    for role in roles:
        if role.path == lambda_role_path and role.name == lambda_role_name:
            return role

    role = iam.create_role(
        Path=lambda_role_path,
        RoleName=lambda_role_name,
        AssumeRolePolicyDocument=LAMBDA_ROLE_TRUST_POLICY)
    role.attach_policy(
        PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole")
    return role
test_repository_listener.py 文件源码 项目:github-snooze-button 作者: tdsmith 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_poll(self, config, trivial_message):
        self._test_poll_was_polled = False

        def my_callback(event, message):
            self._test_poll_was_polled = True

        responses.add(responses.POST, "https://api.github.com/repos/tdsmith/test_repo/hooks")
        repo_listener = snooze.RepositoryListener(
            events=snooze.LISTEN_EVENTS,
            callbacks=[my_callback], **config["tdsmith/test_repo"])

        sqs = boto3.resource("sqs", region_name="us-west-2")
        sqs_queue = list(sqs.queues.all())[0]

        sqs_queue.send_message(MessageBody=trivial_message)
        assert int(sqs_queue.attributes["ApproximateNumberOfMessages"]) > 0

        repo_listener.poll()
        sqs_queue.reload()
        assert int(sqs_queue.attributes["ApproximateNumberOfMessages"]) == 0
        assert self._test_poll_was_polled
setup_helper_handler.py 文件源码 项目:aws-ops-automator 作者: awslabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _create_request(self):
        """
        Handles create request from cloudformation custom resource
        :return: 
        """

        try:
            self._setup()
            self.physical_resource_id = self.__class__.__name__.lower()
            if allow_send_metrics():
                self._send_create_metrics()
            return True

        except Exception as ex:
            self.response["Reason"] = str(ex)
            return False


问题


面经


文章

微信
公众号

扫码关注公众号