python类Config()的实例源码

connection.py 文件源码 项目:antenna 作者: mozilla-services 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _build_client(self):
        # Either they provided ACCESS_KEY and SECRET_ACCESS_KEY in which case
        # we use those, or they didn't in which case boto3 pulls credentials
        # from one of a myriad of other places.
        # http://boto3.readthedocs.io/en/latest/guide/configuration.html#configuring-credentials
        session_kwargs = {}
        if self.config('access_key') and self.config('secret_access_key'):
            session_kwargs['aws_access_key_id'] = self.config('access_key')
            session_kwargs['aws_secret_access_key'] = self.config('secret_access_key')
        session = boto3.session.Session(**session_kwargs)

        kwargs = {
            'service_name': 's3',
            'region_name': self.config('region'),
            # NOTE(willkg): We use path-style because that lets us have dots in
            # our bucket names and use SSL.
            'config': Config(s3={'addressing_style': 'path'})
        }
        if self.config('endpoint_url'):
            kwargs['endpoint_url'] = self.config('endpoint_url')

        return session.client(**kwargs)
conftest.py 文件源码 项目:antenna 作者: mozilla-services 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
        session_kwargs = {}
        if self.access_key and self.secret_access_key:
            session_kwargs['aws_access_key_id'] = self.access_key
            session_kwargs['aws_secret_access_key'] = self.secret_access_key

        session = boto3.session.Session(**session_kwargs)

        client_kwargs = {
            'service_name': 's3',
            'region_name': self.region,
            'config': Config(s3={'addression_style': 'path'})
        }
        if self.endpointurl:
            client_kwargs['endpoint_url'] = self.endpointurl

        client = session.client(**client_kwargs)
        return client
activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, handle_task = lambda t, i: None, **kwargs):
        """Will not be called if used as a mixin. Provides just the expected variables.

        Args:
            handle_task (callable) : Callable to process task input and send success or
                                     failure
            kwargs : Arguments for heaviside.utils.create_session
        """
        session, _ = create_session(**kwargs)
        # DP NOTE: read_timeout is needed so that the long poll for tasking doesn't
        #          timeout client side before AWS returns that there is no work
        self.client = session.client('stepfunctions', config=Config(read_timeout=70))
        self.log = logging.getLogger(__name__)
        self.name = None
        self.arn = None
        self.handle_task = handle_task
        self.max_concurrent = 0
        self.poll_delay = 1
        self.polling = False
activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, name, target=None, **kwargs):
        """
        Args:
            name (string): Name of the activity to monitor
                           The activity's ARN is looked up in AWS using the provided
                           AWS credentials
            target (string|callable): Function to pass to TaskProcess as the target,
                                      If string, the class / function will be imported
            kwargs (dict): Same arguments as utils.create_session()
        """
        super(ActivityProcess, self).__init__(name=name)
        self.name = name
        self.credentials = kwargs
        self.session, self.account_id = create_session(**kwargs)
        self.client = self.session.client('stepfunctions', config=Config(read_timeout=70))
        self.log = logging.getLogger(__name__)

        self.max_concurrent = 0
        self.poll_delay = 1

        if isinstance(target, str):
            target = TaskProcess.resolve_function(target)
        self.target = target
controllers.py 文件源码 项目:bitstore 作者: datahq 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def get_s3_client():
    endpoint_url = os.environ.get("S3_ENDPOINT_URL")
    s3_client = boto3.client('s3',
                             # region_name='us-east-1',
                             aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'],
                             config=Config(signature_version='s3v4'),
                             aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'],
                             endpoint_url=endpoint_url
                             )
    if endpoint_url:
        try:
            s3 = boto3.resource('s3',
                                 aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'],
                                 config=Config(signature_version='s3v4'),
                                 aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'],
                                 endpoint_url=endpoint_url)
            s3.create_bucket(Bucket=config['STORAGE_BUCKET_NAME'])
            bucket = s3.Bucket(config['STORAGE_BUCKET_NAME'])
            bucket.Acl().put(ACL='public-read')
        except: # noqa
            logging.exception('Failed to create the bucket')
            pass
    return s3_client
s3.py 文件源码 项目:Cloud-Custodian 作者: jtroberts83 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def bucket_client(session, b, kms=False):
    location = b.get('Location')
    if location is None:
        region = 'us-east-1'
    else:
        region = location['LocationConstraint'] or 'us-east-1'

    if kms:
        # Need v4 signature for aws:kms crypto, else let the sdk decide
        # based on region support.
        config = Config(
            signature_version='s3v4',
            read_timeout=200, connect_timeout=120)
    else:
        config = Config(read_timeout=200, connect_timeout=120)
    return session.client('s3', region_name=region, config=config)
s3.py 文件源码 项目:odooku-compat 作者: adaptivdesign 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def client(self):
        if not hasattr(self._local, 'client'):
            _logger.info("Creating new S3 Client")
            if self._dev_url:
                _logger.warning("S3 dev mode enabled")
                session = botocore.session.get_session()
                self._local.client = session.create_client(
                    's3',
                    aws_access_key_id='-',
                    aws_secret_access_key='-',
                    endpoint_url=self._dev_url,
                    config=Config(
                        s3={'addressing_style': 'path'},
                        signature_version='s3'
                    )
                )
            else:
                self._local.client = boto3.client(
                    's3',
                    aws_access_key_id=self._aws_access_key_id,
                    aws_secret_access_key=self._aws_secret_access_key
                )

        return self._local.client
boto3.py 文件源码 项目:fleece 作者: racker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def client(*args, **kwargs):
    """
    Create a low-level service client by name using the default session.
    Socket level timeouts are preconfigured according to the defaults set via
    the `fleece.boto3.set_default_timeout()` function, or they can also be set
    explicitly for a client by passing the `timeout`, `connect_timeout` or
    `read_timeout` arguments.
    """
    timeout = kwargs.pop('timeout', DEFAULT_TIMEOUT)
    connect_timeout = kwargs.pop('connect_timeout',
                                 DEFAULT_CONNECT_TIMEOUT or timeout)
    read_timeout = kwargs.pop('read_timeout', DEFAULT_READ_TIMEOUT or timeout)

    config = Config(connect_timeout=connect_timeout,
                    read_timeout=read_timeout)
    return real_boto3.client(*args, config=config, **kwargs)
boto3.py 文件源码 项目:fleece 作者: racker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def resource(*args, **kwargs):
    """
    Create a resource service client by name using the default session.
    Socket level timeouts are preconfigured according to the defaults set via
    the `fleece.boto3.set_default_timeout()` function, or they can also be set
    explicitly for a client by passing the `timeout`, `connect_timeout` or
    `read_timeout` arguments.
    """
    timeout = kwargs.pop('timeout', DEFAULT_TIMEOUT)
    connect_timeout = kwargs.pop('connect_timeout',
                                 DEFAULT_CONNECT_TIMEOUT or timeout)
    read_timeout = kwargs.pop('read_timeout', DEFAULT_READ_TIMEOUT or timeout)

    config = Config(connect_timeout=connect_timeout,
                    read_timeout=read_timeout)
    return real_boto3.resource(*args, config=config, **kwargs)
backend.py 文件源码 项目:odooku 作者: odooku 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def client(self):
        if not hasattr(self._local, 'client'):
            _logger.info("Creating new S3 Client")
            self._local.client = boto3.client(
                's3',
                region_name=self._aws_region,
                aws_access_key_id=self._aws_access_key_id,
                aws_secret_access_key=self._aws_secret_access_key,
                endpoint_url=self._endpoint_url,
                config=Config(
                    s3={'addressing_style': self._addressing_style},
                    signature_version=self._signature_version
                )
            )

        return self._local.client
traildb.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def process_trail_set(
        object_set, map_records, reduce_results=None, trail_bucket=None):

    session_factory = SessionFactory(
        options.region, options.profile, options.assume_role)

    s3 = session_factory().client(
        's3', config=Config(signature_version='s3v4'))

    previous = None
    for o in object_set:
        body = s3.get_object(Key=o['Key'], Bucket=trail_bucket)['Body']
        fh = GzipFile(fileobj=StringIO(body.read()))
        data = json.load(fh)
        s = map_records(data['Records'])
        if reduce_results:
            previous = reduce_results(s, previous)
    return previous
traildb.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setup_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("--bucket", required=True)
    parser.add_argument("--prefix", default="")
    parser.add_argument("--account", required=True)
    parser.add_argument("--user")
    parser.add_argument("--event")
    parser.add_argument("--source")
    parser.add_argument("--not-source")
    parser.add_argument("--day")
    parser.add_argument("--month")
    parser.add_argument("--tmpdir", default="/tmp/traildb")
    parser.add_argument("--region", default="us-east-1")
    parser.add_argument("--output", default="results.db")
    parser.add_argument(
        "--profile", default=os.environ.get('AWS_PROFILE'),
        help="AWS Account Config File Profile to utilize")
    parser.add_argument(
        "--assume", default=None, dest="assume_role",
        help="Role to assume")
    parser.add_argument('--field', action='append',
        help='additonal fields that can be added to each record',
        choices=['userIdentity', 'requestParameters', 'responseElements'])
    return parser
deploy_ui.py 文件源码 项目:serverless-image-handler 作者: awslabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def DeleteImageHandlerUI(deploy_config):
    #Expected dict entries
    #deploy_config['UIBucket']
    #deploy_config['UIPrefix']
    log.info("Deleting Serverless Image Handler UI from %s/%s", deploy_config['UIBucket'], deploy_config['UIPrefix'])
    try:
        s3 = boto3.client("s3", config=Config(signature_version='s3v4'))
        log.info("Listing UI objects in %s/%s", deploy_config['UIBucket'], deploy_config['UIPrefix'])
        for s3object in s3.list_objects(Bucket=deploy_config['UIBucket'], Prefix=deploy_config['UIPrefix'])['Contents']:
            log.info("Deleting %s/%s", deploy_config['UIBucket'], s3object['Key'])
            s3.delete_object(Bucket=deploy_config['UIBucket'], Key=s3object['Key'])
        log.info("Deleting %s/%s", deploy_config['UIBucket'], deploy_config['UIPrefix'])
        s3.delete_object(Bucket=deploy_config['UIBucket'], Key=deploy_config['UIPrefix'])

    except Exception as e:
        log.error("Error deleting UI. Error: %s", e)
        raise
test_upload.py 文件源码 项目:s3transfer 作者: boto 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_sigv4_progress_callbacks_invoked_once(self):
        # Reset the client and manager to use sigv4
        self.reset_stubber_with_new_client(
            {'config': Config(signature_version='s3v4')})
        self.client.meta.events.register(
            'before-parameter-build.s3.*', self.collect_body)
        self._manager = TransferManager(self.client, self.config)

        # Add the stubbed response.
        self.add_put_object_response_with_default_expected_params()

        subscriber = RecordingSubscriber()
        future = self.manager.upload(
            self.filename, self.bucket, self.key, subscribers=[subscriber])
        future.result()
        self.assert_expected_client_calls_were_correct()

        # The amount of bytes seen should be the same as the file size
        self.assertEqual(subscriber.calculate_bytes_seen(), len(self.content))
test_s3transfer.py 文件源码 项目:s3transfer 作者: boto 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_callback_called_once_with_sigv4(self):
        # Verify #98, where the callback was being invoked
        # twice when using signature version 4.
        self.amount_seen = 0
        lock = threading.Lock()

        def progress_callback(amount):
            with lock:
                self.amount_seen += amount

        client = self.session.create_client(
            's3', self.region,
            config=Config(signature_version='s3v4'))
        transfer = s3transfer.S3Transfer(client)
        filename = self.files.create_file_with_size(
            '10mb.txt', filesize=10 * 1024 * 1024)
        transfer.upload_file(filename, self.bucket_name,
                             '10mb.txt', callback=progress_callback)
        self.addCleanup(self.delete_object, '10mb.txt')

        self.assertEqual(self.amount_seen, 10 * 1024 * 1024)
client.py 文件源码 项目:col-aws-clients 作者: collectrium 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(
            self,
            region_name,
            aws_access_key_id,
            aws_secret_access_key
    ):
        """

        :param region_name: AWS region name
        :param aws_access_key_id:  AWS credentials
        :param aws_secret_access_key: AWS credentials
        """

        super(LambdaClient, self).__init__(
            service='lambda',
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            config=Config(read_timeout=300)
        )
activities.py 文件源码 项目:heaviside 作者: jhuapl-boss 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, name, arn=None, worker=None, **kwargs):
        """
        Args:
            name (String): Name of the Activity to monitor
            arn (String): Full ARN of Activity to monitor
                          If not given, it is looked up
                          If given, the actual ARN and Name are compared
            process (callable): Callable that transforms the task's input
                                into an output that is then returned
            kwargs : Arguments to heaviside.utils.create_session
        """
        self.name = name
        self.arn = arn
        self.worker = worker
        self.token = None

        self.session, self.account_id = create_session(**kwargs)
        self.client = self.session.client('stepfunctions', config=Config(read_timeout=70))
        self.log = logging.getLogger(__name__)

        self.max_concurrent = 0
        self.poll_delay = 1

        if self.arn is None:
            self.arn = self.lookup_activity_arn(name)
        else:
            try:
                resp = self.client.describe_activity(activityArn = self.arn)
                if resp['name'] != name:
                    raise Exception("Name of {} is not {}".format(self.arn, self.name))
            except ClientError:
                raise Exception("ARN {} is not valid".format(self.arn))
lambda_function.py 文件源码 项目:Cloud-Custodian 作者: jtroberts83 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def lambda_handler(event, context):
    s3resource = boto3.resource('s3', config=Config(signature_version='s3v4'), region_name=bucketregion)
    s3resource.meta.client.download_file(bucketname, key, '/tmp/OpenSGs.txt')
    AllOpenSGs=[]
    f = open('/tmp/OpenSGs.txt', 'r')
    AllOpenSGs = f.read()
    AllOpenSGs = AllOpenSGs.split('\n')

    #print("Existing SGS are: %s" % AllOpenSGs)
    response = ec2.describe_security_groups()
    for sg in response['SecurityGroups']:
        IPPermissions=sg['IpPermissions']
        for ingress in IPPermissions:
            IpRanges=ingress['IpRanges']
            for range in IpRanges:
                cidr=range['CidrIp']
                if '0.0.0.0/0' in cidr:
                    print(cidr)
                    sgname=sg['GroupId']
                    AllOpenSGs.append(sgname)

    # Creates array of unique values to remove duplicate SGs                
    AllUniqueSGs = list(set(AllOpenSGs))

    # Convert the List to a String to avoid S3 errors
    StringOfSGs = '\n'.join(AllUniqueSGs)

    # Upload the txt file to S3
    response = s3.put_object(
        Body=StringOfSGs,
        Bucket=bucketname,
        Key=key
    )
    return 'File Has Been Uploaded To S3'
s3.py 文件源码 项目:Cloud-Custodian 作者: jtroberts83 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_BucketVersioningConfiguration(self, resource, item_value):
        # Config defaults versioning to 'Off' for a null value
        if item_value['status'] not in ('Enabled', 'Suspended'):
            return
        resource['Versioning'] = {'Status': item_value['status']}
        if item_value['isMfaDeleteEnabled']:
            resource['Versioning']['MFADelete'] = item_value[
                'isMfaDeleteEnabled'].title()
s3.py 文件源码 项目:Cloud-Custodian 作者: jtroberts83 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def bucket_client(session, b, kms=False):
    region = get_region(b)

    if kms:
        # Need v4 signature for aws:kms crypto, else let the sdk decide
        # based on region support.
        config = Config(
            signature_version='s3v4',
            read_timeout=200, connect_timeout=120)
    else:
        config = Config(read_timeout=200, connect_timeout=120)
    return session.client('s3', region_name=region, config=config)
aws.py 文件源码 项目:cli 作者: madcore-ai 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, identity_pool_id=None):
        # region name for the base account
        self.region_name = 'eu-west-1'

        self.identity_pool_id = identity_pool_id or const.AWS_IDENTITY_POOL_ID
        self.cognito_client = boto3.client('cognito-identity', region_name=self.region_name,
                                           config=Config(signature_version=UNSIGNED))
        self.lambda_client_no_auth = self.create_aws_lambda_client()
package.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _run_main(self, parsed_args, parsed_globals):
        s3_client = self._session.create_client(
            "s3",
            config=Config(signature_version='s3v4'),
            region_name=parsed_globals.region,
            verify=parsed_globals.verify_ssl)

        template_path = parsed_args.template_file
        if not os.path.isfile(template_path):
            raise exceptions.InvalidTemplatePathError(
                    template_path=template_path)

        bucket = parsed_args.s3_bucket

        self.s3_uploader = S3Uploader(s3_client,
                                      bucket,
                                      parsed_globals.region,
                                      parsed_args.s3_prefix,
                                      parsed_args.kms_key_id,
                                      parsed_args.force_upload)

        output_file = parsed_args.output_template_file
        use_json = parsed_args.use_json
        exported_str = self._export(template_path, use_json)

        sys.stdout.write("\n")
        self.write_output(output_file, exported_str)

        if output_file:
            msg = self.MSG_PACKAGED_TEMPLATE_WRITTEN.format(
                    output_file_name=output_file,
                    output_file_path=os.path.abspath(output_file))
            sys.stdout.write(msg)

        sys.stdout.flush()
        return 0
globalargs.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _update_default_client_config(session, arg_name, arg_value):
    current_default_config = session.get_default_client_config()
    new_default_config = Config(**{arg_name: arg_value})
    if current_default_config is not None:
        new_default_config = current_default_config.merge(new_default_config)
    session.set_default_client_config(new_default_config)
subcommands.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_clients(self):
        client_config = None
        if self.parameters.get('sse') == 'aws:kms':
            client_config = Config(signature_version='s3v4')
        self._client = get_client(
            self.session,
            region=self.parameters['region'],
            endpoint_url=self.parameters['endpoint_url'],
            verify=self.parameters['verify_ssl'],
            config=client_config
        )
        self._source_client = get_client(
            self.session,
            region=self.parameters['region'],
            endpoint_url=self.parameters['endpoint_url'],
            verify=self.parameters['verify_ssl'],
            config=client_config
        )
        if self.parameters['source_region']:
            if self.parameters['paths_type'] == 's3s3':
                self._source_client = get_client(
                    self.session,
                    region=self.parameters['source_region'],
                    endpoint_url=None,
                    verify=self.parameters['verify_ssl'],
                    config=client_config
                )
assume_role.py 文件源码 项目:awslogin 作者: byu-oit 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def assume_role(account_role, samlAssertion):
    conn = boto3.client('sts', config=client.Config(signature_version=botocore.UNSIGNED))
    aws_session_token = conn.assume_role_with_saml(
        RoleArn=account_role.role_arn,
        PrincipalArn=account_role.principal_arn,
        SAMLAssertion=samlAssertion,
        DurationSeconds=3600,
    )

    return aws_session_token
iam_generator_deploy.py 文件源码 项目:aws-iam-generator 作者: awslabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def boto3_agent_from_sts(agent_service, agent_type, region, credentials={}):

    session = boto3.session.Session()

    # Generate our kwargs to pass
    kw_args = {
        "region_name": region,
        "config": Config(signature_version='s3v4')
    }

    if credentials:
        kw_args["aws_access_key_id"] = credentials['accessKeyId']
        kw_args["aws_secret_access_key"] = credentials['secretAccessKey']
        kw_args["aws_session_token"] = credentials['sessionToken']

    # Build our agent depending on how we're called.
    if agent_type == "client":
        return(session.client(
            agent_service,
            **kw_args
        ))
    if agent_type == "resource":
        return(session.resource(
            agent_service,
            **kw_args
        ))
swf.py 文件源码 项目:floto 作者: babbel 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def open_session(self, session_parameter):
        config = Config(connect_timeout=50, read_timeout=70)
        session = boto3.session.Session(**session_parameter)
        return session.client('swf', config=config)
test_storage.py 文件源码 项目:django-buckets 作者: Cadasta 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def get_boto_resource(storage):
    return boto3.resource(
        's3',
        aws_access_key_id=storage.access_key,
        aws_secret_access_key=storage.secret_key,
        region_name=storage.region,
        config=Config(signature_version='s3v4')
    )
storage.py 文件源码 项目:django-buckets 作者: Cadasta 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_boto_ressource(self):
        return boto3.resource(
            's3',
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
            region_name=self.region,
            config=Config(signature_version='s3v4')
        )
storage.py 文件源码 项目:django-buckets 作者: Cadasta 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_signed_url(self, key):
        dir = ''
        if '/' in key:
            dir = key[:key.rfind('/') + 1]

        ext = key[key.rfind('.'):]
        s3_key = ''

        while not s3_key:
            temp_key = dir + random_id() + ext

            if not self.exists(temp_key):
                s3_key = temp_key

        condtions = []
        if self.max_size:
            condtions.append(["content-length-range", 0, self.max_size])

        params = {
            'Bucket': self.bucket_name,
            'Key': s3_key,
            'Conditions': condtions
        }
        client = boto3.client(
            's3',
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key,
            region_name=self.region,
            config=Config(signature_version='s3v4')
        )

        return client.generate_presigned_post(**params)


问题


面经


文章

微信
公众号

扫码关注公众号