def make_session(identity_profile):
session = botocore.session.Session(profile=identity_profile)
try:
session3 = boto3.session.Session(botocore_session=session)
except botocore.exceptions.ProfileNotFound as err:
print(str(err), file=sys.stderr)
if session.available_profiles:
print("Available profiles: %s" %
", ".join(sorted(session.available_profiles)), file=sys.stderr)
print("You can specify a profile by passing it with the -i "
"command line flag.", file=sys.stderr)
else:
print("You have no AWS profiles configured. Please run 'aws "
"configure --profile identity' to get started.", file=sys.stderr)
return None, None, USER_RECOVERABLE_ERROR
return session, session3, None
python类session()的实例源码
def one_mfa(args, credentials):
session, session3, err = make_session(args.identity_profile)
if err:
return err
if "AWSMFA_TESTING_MODE" in os.environ:
use_testing_credentials(args, credentials)
return OK
mfa_args = {}
if args.token_code != 'skip':
serial_number, token_code, err = acquire_code(args, session, session3)
if err is not OK:
return err
mfa_args['SerialNumber'] = serial_number
mfa_args['TokenCode'] = token_code
sts = session3.client('sts')
try:
if args.role_to_assume:
mfa_args.update(
DurationSeconds=min(args.duration, 3600),
RoleArn=args.role_to_assume,
RoleSessionName=args.role_session_name)
response = sts.assume_role(**mfa_args)
else:
mfa_args.update(DurationSeconds=args.duration)
response = sts.get_session_token(**mfa_args)
except botocore.exceptions.ClientError as err:
if err.response["Error"]["Code"] == "AccessDenied":
print(str(err), file=sys.stderr)
return USER_RECOVERABLE_ERROR
else:
raise
print_expiration_time(response['Credentials']['Expiration'])
update_credentials_file(args.aws_credentials,
args.target_profile,
args.identity_profile,
credentials,
response['Credentials'])
return OK
def acquire_code(args, session, session3):
"""returns the user's token serial number, MFA token code, and an
error code."""
serial_number = find_mfa_for_user(args.serial_number, session, session3)
if not serial_number:
print("There are no MFA devices associated with this user.",
file=sys.stderr)
return None, None, USER_RECOVERABLE_ERROR
token_code = args.token_code
if token_code is None:
while token_code is None or len(token_code) != 6:
token_code = getpass.getpass("MFA Token Code: ")
return serial_number, token_code, OK
def rotate(args, credentials):
"""rotate the identity profile's AWS access key pair."""
current_access_key_id = credentials.get(
args.identity_profile, 'aws_access_key_id')
# create new sessions using the MFA credentials
session, session3, err = make_session(args.target_profile)
if err:
return err
iam = session3.resource('iam')
# find the AccessKey corresponding to the identity profile and delete it.
current_access_key = next((key for key
in iam.CurrentUser().access_keys.all()
if key.access_key_id == current_access_key_id))
current_access_key.delete()
# create the new access key pair
iam_service = session3.client('iam')
new_access_key_pair = iam_service.create_access_key()["AccessKey"]
print("Rotating from %s to %s." % (current_access_key.access_key_id,
new_access_key_pair['AccessKeyId']),
file=sys.stderr)
update_credentials_file(args.aws_credentials,
args.identity_profile,
args.identity_profile,
credentials,
new_access_key_pair)
print("%s profile updated." % args.identity_profile, file=sys.stderr)
return OK
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
aws_session_token=None, region_name=None,
botocore_session=None, profile_name=None):
if botocore_session is not None:
self._session = botocore_session
else:
# Create a new default session
self._session = botocore.session.get_session()
# Setup custom user-agent string if it isn't already customized
if self._session.user_agent_name == 'Botocore':
botocore_info = 'Botocore/{0}'.format(
self._session.user_agent_version)
if self._session.user_agent_extra:
self._session.user_agent_extra += ' ' + botocore_info
else:
self._session.user_agent_extra = botocore_info
self._session.user_agent_name = 'Boto3'
self._session.user_agent_version = boto3.__version__
if profile_name is not None:
self._session.set_config_variable('profile', profile_name)
if aws_access_key_id or aws_secret_access_key or aws_session_token:
self._session.set_credentials(
aws_access_key_id, aws_secret_access_key, aws_session_token)
if region_name is not None:
self._session.set_config_variable('region', region_name)
self.resource_factory = ResourceFactory(
self._session.get_component('event_emitter'))
self._setup_loader()
self._register_default_handlers()
def events(self):
"""
The event emitter for a session
"""
return self._session.get_component('event_emitter')
def available_profiles(self):
"""
The profiles available to the session credentials
"""
return self._session.available_profiles
def get_credentials(self):
"""
Return the :class:`botocore.credential.Credential` object
associated with this session. If the credentials have not
yet been loaded, this will attempt to load them. If they
have already been loaded, this will return the cached
credentials.
"""
return self._session.get_credentials()
def client(*args, **kwargs):
return session().client(*args, **kwargs)
def credentials(*args, **kwargs):
return session().get_credentials(*args, **kwargs)
def resource(*args, **kwargs):
return session().resource(*args, **kwargs)
def session():
aws_config = config.get('aws') or {}
profile = aws_config.pop('profile_name', None)
if 'botocore' not in _sessions:
print('[ssha] creating aws session')
_sessions['botocore'] = botocore.session.Session(profile=profile)
resolver = _sessions['botocore'].get_component('credential_provider')
provider = resolver.get_provider('assume-role')
provider.cache = boto3_session_cache.JSONFileCache()
return boto3.Session(botocore_session=_sessions['botocore'], **aws_config)
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
aws_session_token=None, region_name=None,
botocore_session=None, profile_name=None):
if botocore_session is not None:
self._session = botocore_session
else:
# Create a new default session
self._session = botocore.session.get_session()
# Setup custom user-agent string if it isn't already customized
if self._session.user_agent_name == 'Botocore':
botocore_info = 'Botocore/{0}'.format(
self._session.user_agent_version)
if self._session.user_agent_extra:
self._session.user_agent_extra += ' ' + botocore_info
else:
self._session.user_agent_extra = botocore_info
self._session.user_agent_name = 'Boto3'
self._session.user_agent_version = boto3.__version__
if profile_name is not None:
self._session.set_config_variable('profile', profile_name)
if aws_access_key_id or aws_secret_access_key or aws_session_token:
self._session.set_credentials(
aws_access_key_id, aws_secret_access_key, aws_session_token)
if region_name is not None:
self._session.set_config_variable('region', region_name)
self.resource_factory = ResourceFactory(
self._session.get_component('event_emitter'))
self._setup_loader()
self._register_default_handlers()
def events(self):
"""
The event emitter for a session
"""
return self._session.get_component('event_emitter')
def available_profiles(self):
"""
The profiles available to the session credentials
"""
return self._session.available_profiles
def get_credentials(self):
"""
Return the :class:`botocore.credential.Credential` object
associated with this session. If the credentials have not
yet been loaded, this will attempt to load them. If they
have already been loaded, this will return the cached
credentials.
"""
return self._session.get_credentials()
def test_can_create_deployer_from_factory_function():
session = botocore.session.get_session()
d = deployer.create_default_deployer(session)
assert isinstance(d, deployer.Deployer)
def __init__(self, session=None, policy_actions=None):
# type: (Any, Dict[str, str]) -> None
if session is None:
session = botocore.session.get_session()
if policy_actions is None:
policy_actions = load_policy_actions()
self._session = session
self._policy_actions = policy_actions
def test_boto3(pyi_builder):
pyi_builder.test_source(
"""
import boto3
session = boto3.Session(region_name='us-west-2')
# verify all clients
for service in session.get_available_services():
session.client(service)
# verify all resources
for resource in session.get_available_resources():
session.resource(resource)
""")
def test_botocore(pyi_builder):
pyi_builder.test_source(
"""
import botocore
from botocore.session import Session
session = Session()
# verify all services
for service in session.get_available_services():
session.create_client(service, region_name='us-west-2')
""")
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
aws_session_token=None, region_name=None,
botocore_session=None, profile_name=None):
if botocore_session is not None:
self._session = botocore_session
else:
# Create a new default session
self._session = botocore.session.get_session()
# Setup custom user-agent string if it isn't already customized
if self._session.user_agent_name == 'Botocore':
botocore_info = 'Botocore/{0}'.format(
self._session.user_agent_version)
if self._session.user_agent_extra:
self._session.user_agent_extra += ' ' + botocore_info
else:
self._session.user_agent_extra = botocore_info
self._session.user_agent_name = 'Boto3'
self._session.user_agent_version = boto3.__version__
if profile_name is not None:
self._session.set_config_variable('profile', profile_name)
if aws_access_key_id or aws_secret_access_key or aws_session_token:
self._session.set_credentials(
aws_access_key_id, aws_secret_access_key, aws_session_token)
if region_name is not None:
self._session.set_config_variable('region', region_name)
self.resource_factory = ResourceFactory(
self._session.get_component('event_emitter'))
self._setup_loader()
self._register_default_handlers()
def events(self):
"""
The event emitter for a session
"""
return self._session.get_component('event_emitter')
def available_profiles(self):
"""
The profiles available to the session credentials
"""
return self._session.available_profiles
def get_credentials(self):
"""
Return the :class:`botocore.credential.Credential` object
associated with this session. If the credentials have not
yet been loaded, this will attempt to load them. If they
have already been loaded, this will return the cached
credentials.
"""
return self._session.get_credentials()
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
aws_session_token=None, region_name=None,
botocore_session=None, profile_name=None):
if botocore_session is not None:
self._session = botocore_session
else:
# Create a new default session
self._session = botocore.session.get_session()
# Setup custom user-agent string if it isn't already customized
if self._session.user_agent_name == 'Botocore':
botocore_info = 'Botocore/{0}'.format(
self._session.user_agent_version)
if self._session.user_agent_extra:
self._session.user_agent_extra += ' ' + botocore_info
else:
self._session.user_agent_extra = botocore_info
self._session.user_agent_name = 'Boto3'
self._session.user_agent_version = boto3.__version__
if profile_name is not None:
self._session.set_config_variable('profile', profile_name)
if aws_access_key_id or aws_secret_access_key or aws_session_token:
self._session.set_credentials(
aws_access_key_id, aws_secret_access_key, aws_session_token)
if region_name is not None:
self._session.set_config_variable('region', region_name)
self.resource_factory = ResourceFactory(
self._session.get_component('event_emitter'))
self._setup_loader()
self._register_default_handlers()
def events(self):
"""
The event emitter for a session
"""
return self._session.get_component('event_emitter')
def available_profiles(self):
"""
The profiles available to the session credentials
"""
return self._session.available_profiles
def get_credentials(self):
"""
Return the :class:`botocore.credential.Credential` object
associated with this session. If the credentials have not
yet been loaded, this will attempt to load them. If they
have already been loaded, this will return the cached
credentials.
"""
return self._session.get_credentials()
def test_boto3(pyi_builder):
pyi_builder.test_source(
"""
import boto3
session = boto3.Session(region_name='us-west-2')
# verify all clients
for service in session.get_available_services():
session.client(service)
# verify all resources
for resource in session.get_available_resources():
session.resource(resource)
""")
def test_botocore(pyi_builder):
pyi_builder.test_source(
"""
import botocore
from botocore.session import Session
session = Session()
# verify all services
for service in session.get_available_services():
session.create_client(service, region_name='us-west-2')
""")