python类Session()的实例源码

orcaserver.py 文件源码 项目:orca 作者: bdastur 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def rundeck_list_groups():
    '''
    Return the list of groups from all available profiles
    '''
    resp_obj = {}

    awsconfig = aws_config.AwsConfig()
    profiles = awsconfig.get_profiles()
    for profile in profiles:
        session = boto3.Session(profile_name=profile)
        iamclient = session.client('iam')

        try:
            groupinfo = iamclient.list_groups()
        except botocore.exceptions.ClientError:
            groupinfo['Groups'] = []

        for group in groupinfo['Groups']:
            grouptext = "(%s) %s" % (profile, group['GroupName'])
            resp_obj[grouptext] = group['GroupName']

    return jsonify(resp_obj)
orcaserver.py 文件源码 项目:orca 作者: bdastur 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def rundeck_list_iam_policies():
    '''
    Return the list of profiles from all available profiles
    '''
    resp_obj = {}

    awsconfig = aws_config.AwsConfig()
    profiles = awsconfig.get_profiles()

    for profile in profiles:
        session = boto3.Session(profile_name=profile)
        iamclient = session.client('iam')

        try:
            policyinfo = iamclient.list_policies()
        except botocore.exceptions.ClientError:
            policyinfo['Policies'] = []

        for policy in policyinfo['Policies']:
            policytext = "(%s) %s" % (profile, policy['PolicyName'])
            resp_obj[policytext] = policy['PolicyName']

    return jsonify(resp_obj)
dns.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def find_existing_record(env, zone_id, dns_name, check_key=None, check_value=None):
    """Check if a specific DNS record exists.

    Args:
        env (str): Deployment environment.
        zone_id (str): Route53 zone id.
        dns_name (str): FQDN of application's dns entry to add/update.
        check_key(str): Key to look for in record. Example: "Type"
        check_value(str): Value to look for with check_key. Example: "CNAME"

    Returns:
        json: Found Record. Returns None if no record found

    """
    client = boto3.Session(profile_name=env).client('route53')
    pager = client.get_paginator('list_resource_record_sets')
    existingrecord = None
    for rset in pager.paginate(HostedZoneId=zone_id):
        for record in rset['ResourceRecordSets']:
            if check_key:
                if record['Name'].rstrip('.') == dns_name and record.get(check_key) == check_value:
                    LOG.info("Found existing record: %s", record)
                    existingrecord = record
                    break
    return existingrecord
dns.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete_existing_cname(env, zone_id, dns_name):
    """Delete an existing CNAME record.

    This is used when updating to multi-region for deleting old records. The
    record can not just be upserted since it changes types.

    Args:
        env (str): Deployment environment.
        zone_id (str): Route53 zone id.
        dns_name (str): FQDN of application's dns entry to add/update.
    """
    client = boto3.Session(profile_name=env).client('route53')
    startrecord = None
    newrecord_name = dns_name
    startrecord = find_existing_record(env, zone_id, newrecord_name, check_key='Type', check_value='CNAME')
    if startrecord:
        LOG.info("Deleting old record: %s", newrecord_name)
        _response = client.change_resource_record_sets(
            HostedZoneId=zone_id, ChangeBatch={'Changes': [{
                'Action': 'DELETE',
                'ResourceRecordSet': startrecord
            }]})
        LOG.debug('Response from deleting %s: %s', dns_name, _response)
elb.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def find_elb_dns_zone_id(name='', env='dev', region='us-east-1'):
    """Get an application's AWS elb dns zone id.

    Args:
        name (str): ELB name
        env (str): Environment/account of ELB
        region (str): AWS Region

    Returns:
        str: elb DNS zone ID

    """
    LOG.info('Find %s ELB DNS Zone ID in %s [%s].', name, env, region)
    client = boto3.Session(profile_name=env).client('elb', region_name=region)
    elbs = client.describe_load_balancers(LoadBalancerNames=[name])
    return elbs['LoadBalancerDescriptions'][0]['CanonicalHostedZoneNameID']
get_sns_subscriptions.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_sns_subscriptions(app_name, env, region):
    """List SNS lambda subscriptions.

    Returns:
        list: List of Lambda subscribed SNS ARNs.

    """
    session = boto3.Session(profile_name=env, region_name=region)
    sns_client = session.client('sns')

    lambda_alias_arn = get_lambda_alias_arn(app=app_name, account=env, region=region)

    lambda_subscriptions = []
    subscriptions = sns_client.list_subscriptions()

    for subscription in subscriptions['Subscriptions']:
        if subscription['Protocol'] == "lambda" and subscription['Endpoint'] == lambda_alias_arn:
            lambda_subscriptions.append(subscription['SubscriptionArn'])

    if not lambda_subscriptions:
        LOG.debug('SNS subscription for function %s not found', lambda_alias_arn)

    return lambda_subscriptions
api_gateway_event.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, app='', env='', region='', rules={}, prop_path=''):
        self.log = logging.getLogger(__name__)
        self.generated = get_details(app=app, env=env)
        self.trigger_settings = rules
        self.app_name = self.generated.app_name()
        self.env = env
        self.account_id = get_env_credential(env=self.env)['accountId']
        self.region = region
        self.properties = get_properties(properties_file=prop_path, env=self.env)

        session = boto3.Session(profile_name=env, region_name=region)
        self.client = session.client('apigateway')
        self.lambda_client = session.client('lambda')
        self.api_version = self.lambda_client.meta.service_model.api_version

        self.api_id = self.find_api_id()
        self.resource_id, self.parent_id = self.find_resource_ids()
destroy_cloudwatch_event.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def destroy_cloudwatch_event(app='', env='dev', region=''):
    """Destroy Cloudwatch event subscription.

    Args:
        app (str): Spinnaker Application name.
        env (str): Deployment environment.
        region (str): AWS region.
    Returns:
        bool: True upon successful completion.
    """

    session = boto3.Session(profile_name=env, region_name=region)
    cloudwatch_client = session.client('events')

    event_rules = get_cloudwatch_event_rule(app_name=app, account=env, region=region)

    for rule in event_rules:
        cloudwatch_client.remove_targets(Rule=rule, Ids=[app])

    return True
destroy_cloudwatch_log_event.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def destroy_cloudwatch_log_event(app='', env='dev', region=''):
    """Destroy Cloudwatch log event.

    Args:
        app (str): Spinnaker Application name.
        env (str): Deployment environment.
        region (str): AWS region.
    Returns:
        bool: True upon successful completion.
    """

    session = boto3.Session(profile_name=env, region_name=region)
    cloudwatch_client = session.client('logs')

    # FIXME: see below
    # TODO: Log group name is required, where do we get it if it is not in application-master-env.json?
    cloudwatch_client.delete_subscription_filter(logGroupName='/aws/lambda/awslimitchecker', filterName=app)

    return True
datapipeline.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, app=None, env=None, region='us-east-1', prop_path=None):
        """AWS Data Pipeline object.

        Args:
            app (str): Application name
            env (str): Environment/Account
            region (str): AWS Region
            prop_path (str): Path of environment property file
        """
        self.app_name = app
        self.env = env
        self.region = region
        self.properties = get_properties(prop_path)
        self.datapipeline_data = self.properties[self.env]['datapipeline']
        generated = get_details(app=self.app_name)
        self.group = generated.data['project']

        session = boto3.Session(profile_name=self.env, region_name=self.region)
        self.client = session.client('datapipeline')
        self.pipeline_id = None
handler.py 文件源码 项目:aws-tailor 作者: alanwill 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def handler(event, context):
    log.debug("Received event {}".format(json.dumps(event)))

    stage = event['ResourceProperties']['Stage']
    topicNamePrefix = event['ResourceProperties']['TopicNamePrefix']
    topicName = topicNamePrefix + '-' + stage
    requestType = event['RequestType']

    # Initialize a Session object in order to look up Config regions
    boto3Session = boto3.Session()

    # All Config regions
    snsRegions = boto3Session.get_available_regions(
        service_name='sns',
        partition_name='aws',
    )

    if "Create" in requestType:
        create_topics(snsRegions, topicName, context, event, stage)

    elif "Update" in requestType:
        create_topics(snsRegions, topicName, context, event, stage)

    elif "Delete" in requestType:
        delete_topics(snsRegions, topicName, context, event)
importer.py 文件源码 项目:gateway_manager 作者: binarydud 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main(region, profile='default'):
    session = boto3.Session(
        profile_name=profile,
        region_name=region
    )
    apex_json = json.load(open('project.json'))
    raml = ramlfications.parse('api_schema.raml')
    gateway = ApiGateway(raml, apex_json, session)
    print 'Creating Api Gateway'
    gateway.create()
    gateway.load()
    print 'Creating Authorizers'
    gateway.create_authorizers()
    print 'Creating Resources'
    gateway.create_resources()
    print 'Deploying Stage'
    print gateway.create_deployment()
__init__.py 文件源码 项目:aws-sudo 作者: voytek-solutions 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_session(profile_config):
    session_profile = profile_config['profile_name']

    if 'source_profile' in profile_config:
        session_profile = profile_config['source_profile']

    if 'region' in profile_config:
        os.putenv('AWS_DEFAULT_REGION', profile_config['region'])
        os.putenv('AWS_REGION', profile_config['region'])

    # Create a session using profile or EC2 Instance Role
    # To use Instance Role set `source_profile` to empty string in aws profile
    # configuration file
    session = boto3.Session(profile_name=session_profile)

    return session
consumer.py 文件源码 项目:squishy 作者: tmehlinger 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, queue_url, worker, session=None,
                 use_short_polling=False, polling_timeout=10,
                 polling_count=10):
        self.use_short_polling = use_short_polling
        self.polling_timeout = polling_timeout
        self.polling_count = polling_count

        if not session:
            region_name = self.parse_region_name(queue_url)
            session = Session(region_name=region_name)

        self.session = session
        self.sqs = self.session.resource('sqs')
        self.queue = self.sqs.Queue(url=queue_url)

        self.logger = get_logger(__name__)

        self.should_stop = Event()
        self.poller_thread = Thread(group=None, target=self._poll_messages)
        self.worker = worker
botocore_tests.py 文件源码 项目:apm-agent-python 作者: elastic 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_botocore_instrumentation(mock_make_request, elasticapm_client):
    mock_response = mock.Mock()
    mock_response.status_code = 200
    mock_make_request.return_value = (mock_response, {})

    elasticapm_client.begin_transaction("transaction.test")
    with capture_span("test_pipeline", "test"):
        session = boto3.Session(aws_access_key_id='foo',
                                aws_secret_access_key='bar',
                                region_name='us-west-2')
        ec2 = session.client('ec2')
        ec2.describe_instances()
    elasticapm_client.end_transaction("MyView")

    transactions = elasticapm_client.instrumentation_store.get_all()
    spans = transactions[0]['spans']
    assert 'ec2:DescribeInstances' in map(lambda x: x['name'], spans)
task_configuration.py 文件源码 项目:aws-ops-automator 作者: awslabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate_regions(self, regions, action_name, ):
        action_properties = actions.get_action_properties(action_name)
        service_name = action_properties[actions.ACTION_SERVICE]

        if self.service_is_regional(service_name) and action_properties.get(actions.ACTION_MULTI_REGION, True):
            if regions is None or len(regions) == 0:
                return [boto3.Session().region_name]
            else:
                available_regions = self.service_regions(service_name)
                if len(regions) == 1 and list(regions)[0] == "*":
                    return available_regions

                for region in regions:
                    if region not in available_regions:
                        raise ValueError(MSG_BAD_REGION.format(region, service_name, ",".join(available_regions)))
                return list(regions)
        else:
            if regions is not None and len(regions) != 0:
                msg = WARN_NOT_REGIONAL_SERVICE.format(",".join(regions), service_name, action_name)
                self._warn(msg)
        return []
aws_service.py 文件源码 项目:aws-ops-automator 作者: awslabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_session(role_arn=None, sts_client=None):
        """
        Created a session for the specified role
        :param role_arn: Role arn
        :param sts_client: Optional sts client, if not specified a (cache) sts client instance is used
        :return: Session for the specified role
        """

        if role_arn is not None:
            sts = sts_client if sts_client is not None else boto3.client("sts")
            account = AwsService.account_from_role_arn(role_arn)
            token = sts.assume_role(RoleArn=role_arn, RoleSessionName="{}-{}".format(account, str(uuid.uuid4())))
            credentials = token["Credentials"]
            return boto3.Session(aws_access_key_id=credentials["AccessKeyId"],
                                 aws_secret_access_key=credentials["SecretAccessKey"],
                                 aws_session_token=credentials["SessionToken"])
        else:
            return boto3.Session()
time_service.py 文件源码 项目:aws-ops-automator 作者: awslabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def describe(self, as_tuple=None, **kwargs):
        """
        This method is to retrieve a pseudo UTC time resource, method parameters are only used signature compatibility
        :param as_tuple: Set to true to return results as immutable named dictionaries instead of dictionaries
        :return: Pseudo time resource
        """

        def use_tuple():
            return (as_tuple is not None and as_tuple) or (as_tuple is None and self._as_tuple)

        region = kwargs.get("region")
        result = {
            "Time": datetime.datetime.now(pytz.timezone("UTC")),
            "AwsAccount": self.aws_account,
            "Region": region if region else boto3.Session().region_name
        }

        return [as_namedtuple("Time", result)] if use_tuple() else [result]
sync.py 文件源码 项目:aws-acl-helper 作者: brandond 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_session(config):
    if config.profile_name not in _session_cache:
        print('Creating new Boto3 Session for profile {0}'.format(config.profile_name))
        _session_cache[config.profile_name] = boto3.Session(profile_name=config.profile_name)

    session = _session_cache[config.profile_name]

    if config.role_arn:
        if config.role_arn not in _session_cache:
            sts_client = session.client('sts')
            role_session_name = '{0}.session-{1}'.format(__name__, time.time())

            print('Assuming role {0}'.format(config.role_arn))
            assumed_role = sts_client.assume_role(RoleArn=config.role_arn,
                                                  ExternalId=config.external_id,
                                                  RoleSessionName=role_session_name)
            _session_cache[config.role_arn] = boto3.Session(aws_access_key_id=assumed_role['Credentials']['AccessKeyId'],
                                                            aws_secret_access_key=assumed_role['Credentials']['SecretAccessKey'],
                                                            aws_session_token=assumed_role['Credentials']['SessionToken'])

        session = _session_cache[config.role_arn]

    return session
test_plan.py 文件源码 项目:creds 作者: jonhadfield 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_execute_plan_to_create_user_with_downloaded_yaml():
    """ Create a new user from downloaded YAML file """
    delete_test_user_and_group()
    session = Session()
    s3_client = session.client('s3')
    test_user_1 = open(os.path.join(os.path.dirname(__file__), 'test_user_1.yml')).read()
    s3_client.create_bucket(Bucket='test')
    s3_client.put_object(Bucket='test', Key='test.yml',Body=test_user_1)
    response = s3_client.get_object(Bucket='test', Key='test.yml')
    contents = response['Body'].read()
    yaml_content = yaml.load(contents)
    current_users = Users.from_passwd()
    provided_users = Users.from_dict(yaml_content)
    plan = create_plan(existing_users=current_users, proposed_users=provided_users, manage_home=False,
                       protected_users=['travis', 'couchdb', 'ubuntu', 'nginx', 'hadfielj', 'vagrant', CURRENT_USER])
    execute_plan(plan=plan)
    updated_users = Users.from_passwd()
    updated_user = updated_users.describe_users(users_filter=dict(name='dummy'))
    assert len(updated_user) == 1
    assert updated_user[0].name == 'dummy'
    assert updated_user[0].gecos == '\"dummy (user) test\"'
    assert not updated_user[0].public_keys
    assert updated_user[0].sudoers_entry == 'ALL=(ALL:ALL) ALL'
    delete_test_user_and_group()
engines.py 文件源码 项目:aq 作者: lebinh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, options=None):
        self.options = options if options else {}
        self.debug = options.get('--debug', False)

        self.profile = options.get('--profile', None)
        self.region = options.get('--region', None)
        self.table_cache_ttl = int(options.get('--table-cache-ttl', 300))
        self.last_refresh_time = defaultdict(int)

        self.boto3_session = boto3.Session(profile_name=self.profile)
        # dash (-) is not allowed in database name so we use underscore (_) instead in region name
        # throughout this module region name will *always* use underscore
        if self.region:
            self.default_region = self.region.replace('-', '_')
        elif self.boto3_session.region_name:
            self.default_region = self.boto3_session.region_name.replace('-', '_')
        else:
            self.default_region = DEFAULT_REGION

        self.boto3_session = boto3.Session(profile_name=self.profile, region_name=self.default_region.replace('_', '-'))
        self.db = self.init_db()
        # attach the default region too
        self.attach_region(self.default_region)
admin.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def records(account_id):
    """Fetch locks data
    """
    s = boto3.Session()
    table = s.resource('dynamodb').Table('Sphere11.Dev.ResourceLocks')
    results = table.scan()

    for r in results['Items']:
        if 'LockDate' in r:
            r['LockDate'] = datetime.fromtimestamp(r['LockDate'])
        if 'RevisionDate' in r:
            r['RevisionDate'] = datetime.fromtimestamp(r['RevisionDate'])

    print(tabulate.tabulate(
        results['Items'],
        headers="keys",
        tablefmt='fancy_grid'))
admin.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def config_status():
    """ Check config status in an account.
    """
    s = boto3.Session()
    client = s.client('config')
    channels = client.describe_delivery_channel_status()[
        'DeliveryChannelsStatus']
    for c in channels:
        print(yaml.safe_dump({
            c['name']: dict(
                snapshot=str(
                    c['configSnapshotDeliveryInfo'].get('lastSuccessfulTime')),
                history=str(
                    c['configHistoryDeliveryInfo'].get('lastSuccessfulTime')),
                stream=str(
                    c['configStreamDeliveryInfo'].get('lastStatusChangeTime'))
            ),
        }, default_flow_style=False))
replay.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, msg_file, config, msg_plain=False, json_dump_file=None):
        if not os.path.exists(msg_file):
            raise RuntimeError("File does not exist: %s" % msg_file)
        logger.debug('Reading message from: %s', msg_file)
        with open(msg_file, 'r') as fh:
            raw = fh.read()
        logger.debug('Read %d byte message', len(raw))
        if msg_plain:
            raw = raw.strip()
        else:
            logger.debug('base64-decoding and zlib decompressing message')
            raw = zlib.decompress(base64.b64decode(raw))
            if json_dump_file is not None:
                with open(json_dump_file, 'w') as fh:
                    fh.write(raw)
        self.data = json.loads(raw)
        logger.debug('Loaded message JSON')
        self.config = config
        self.session = boto3.Session()
buckets.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ls(args):
    """
    List S3 buckets. See also "aws s3 ls". Use "aws s3 ls NAME" to list bucket contents.
    """
    table = []
    for bucket in filter_collection(resources.s3.buckets, args):
        bucket.LocationConstraint = clients.s3.get_bucket_location(Bucket=bucket.name)["LocationConstraint"]
        cloudwatch = resources.cloudwatch
        bucket_region = bucket.LocationConstraint or "us-east-1"
        if bucket_region != cloudwatch.meta.client.meta.region_name:
            cloudwatch = boto3.Session(region_name=bucket_region).resource("cloudwatch")
        data = get_cloudwatch_metric_stats("AWS/S3", "NumberOfObjects", start_time=datetime.utcnow()-timedelta(days=2),
                                           end_time=datetime.utcnow(), period=3600, BucketName=bucket.name,
                                           StorageType="AllStorageTypes", resource=cloudwatch)
        bucket.NumberOfObjects = int(data["Datapoints"][-1]["Average"]) if data["Datapoints"] else None
        data = get_cloudwatch_metric_stats("AWS/S3", "BucketSizeBytes", start_time=datetime.utcnow()-timedelta(days=2),
                                           end_time=datetime.utcnow(), period=3600, BucketName=bucket.name,
                                           StorageType="StandardStorage", resource=cloudwatch)
        bucket.BucketSizeBytes = format_number(data["Datapoints"][-1]["Average"]) if data["Datapoints"] else None
        table.append(bucket)
    page_output(tabulate(table, args))
audit.py 文件源码 项目:aegea 作者: kislyuk 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def audit_2_3(self):
        """2.3 Ensure the S3 bucket CloudTrail logs to is not publicly accessible (Scored)"""
        raise NotImplementedError()
        import boto3
        s3 = boto3.session.Session(region_name="us-east-1").resource("s3")
        # s3 = boto3.resource("s3")
        # for trail in self.trails:
        #    for grant in s3.Bucket(trail["S3BucketName"]).Acl().grants:
        #    print(s3.Bucket(trail["S3BucketName"]).Policy().policy)
        for bucket in s3.buckets.all():
            print(bucket)
            try:
                print("    Policy:", bucket.Policy().policy)
            except:
                pass
            for grant in bucket.Acl().grants:
                try:
                    print("    Grant:", grant)
                except:
                    pass
metadb.py 文件源码 项目:boss 作者: jhuapl-boss 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        """
        Initialize the data base
        Returns:

        """
        self.__local_dynamo = os.environ.get('USING_DJANGO_TESTRUNNER') is not None
        if not self.__local_dynamo:
            session = get_session()
            dynamodb = session.resource('dynamodb')
            if 'test' in sys.argv:
                # TODO: This needs to be made more robust. Parameters should be mocked, not assumed.
                tablename = 'intTest.' + config["aws"]["meta-db"]
            else:
                tablename = config["aws"]["meta-db"]
        else:
            tablename = config["aws"]["meta-db"]
            session = boto3.Session(aws_access_key_id='foo', aws_secret_access_key='foo')
            dynamodb = session.resource('dynamodb', region_name='us-east-1', endpoint_url='http://localhost:8000')

        self.table = dynamodb.Table(tablename)
main.py 文件源码 项目:prbuilds 作者: guardian 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def github_webhook_to_sqs(event, context):

    """ When called, dumps the content of event.body into an sqs queue """

    if "body" not in event:

        return { "message" : "Hook was called with no body" }

    try:

        sqs = boto3.Session(
            aws_access_key_id=AWS_USER,
            aws_secret_access_key=AWS_KEY,
            region_name='eu-west-1'
        ).resource('sqs')

        queue = sqs.get_queue_by_name(QueueName=QUEUE_NAME)

        queue.send_message(MessageBody=event["body"])

        return { "message" : "Message posted to %s" % QUEUE_NAME }

    except:

        return { "message" : "Message posting failed" }
awsumepy.py 文件源码 项目:awsume 作者: trek10inc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_boto_sts_client(profileName=None, secretAccessKey=None, accessKeyId=None, sessionToken=None):
    """
    profileName - the name of the profile to create the client with;
    secretAccessKey - secret access key that can be passed into the session;
    accessKeyId - access key id that can be passed into the session;
    sessionToken - session token that can be passed into the session;
    return a boto3 session client
    """
    log.info('Creating a Boto3 STS client')
    log.debug('profile_name=' + str(profileName))
    log.debug('aws_access_key_id=' + str(accessKeyId))
    log.debug('aws_secret_access_key=' + str(secretAccessKey))
    log.debug('aws_session_token=' + str(sessionToken))
    #establish the boto session with given credentials
    botoSession = boto3.Session(profile_name=profileName,
                                aws_access_key_id=accessKeyId,
                                aws_secret_access_key=secretAccessKey,
                                aws_session_token=sessionToken)
    #create an sts client, always defaulted to us-east-1
    return botoSession.client('sts', region_name='us-east-1')
awsumepy.py 文件源码 项目:awsume 作者: trek10inc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_valid_awsume_session(awsumeSession):
    """
    awsumeSession - the session to validate;
    return whether or not the session is valid;
    if credentials are expired, or don't exist, they're invalid;
    else they are valid
    """
    log.info('Checking if the session is valid')

    #check if the session has an expiration
    if awsumeSession.get('Expiration'):
        #check if the session is expired
        if awsumeSession.get('Expiration') > datetime.datetime.now().replace():
            return True
        log.debug('Session is expired')
        return False
    return False


问题


面经


文章

微信
公众号

扫码关注公众号