python类MetadataRetrievalError()的实例源码

utils.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retrieve_uri(self, relative_uri):
        """Retrieve JSON metadata from ECS metadata.

        :type relative_uri: str
        :param relative_uri: A relative URI, e.g "/foo/bar?id=123"

        :return: The parsed JSON response.

        """
        full_url = self._full_url(relative_uri)
        headers = {'Accept': 'application/json'}
        attempts = 0
        while True:
            try:
                return self._get_response(full_url, headers, self.TIMEOUT_SECONDS)
            except MetadataRetrievalError as e:
                logger.debug("Received error when attempting to retrieve "
                             "ECS metadata: %s", e, exc_info=True)
                self._sleep(self.SLEEP_TIME)
                attempts += 1
                if attempts >= self.RETRY_ATTEMPTS:
                    raise
utils.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_response(self, full_url, headers, timeout):
        try:
            response = self._session.get(full_url, headers=headers,
                                         timeout=timeout)
            if response.status_code != 200:
                raise MetadataRetrievalError(
                    error_msg="Received non 200 response (%s) from ECS metadata: %s"
                    % (response.status_code, response.text))
            try:
                return json.loads(response.text)
            except ValueError:
                raise MetadataRetrievalError(
                    error_msg=("Unable to parse JSON returned from "
                               "ECS metadata: %s" % response.text))
        except RETRYABLE_HTTP_ERRORS as e:
            error_msg = ("Received error when attempting to retrieve "
                         "ECS metadata: %s" % e)
            raise MetadataRetrievalError(error_msg=error_msg)
credentials.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _create_fetcher(self, relative_uri):
        def fetch_creds():
            try:
                response = self._fetcher.retrieve_uri(relative_uri)
            except MetadataRetrievalError as e:
                logger.debug("Error retrieving ECS metadata: %s", e,
                             exc_info=True)
                raise CredentialRetrievalError(provider=self.METHOD,
                                               error_msg=str(e))
            return {
                'access_key': response['AccessKeyId'],
                'secret_key': response['SecretAccessKey'],
                'token': response['Token'],
                'expiry_time': response['Expiration'],
            }
        return fetch_creds
utils.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_response(self, full_url, headers, timeout):
        try:
            response = self._session.get(full_url, headers=headers,
                                         timeout=timeout)
            if response.status_code != 200:
                raise MetadataRetrievalError(
                    error_msg="Received non 200 response (%s) from ECS metadata: %s"
                    % (response.status_code, response.text))
            try:
                return json.loads(response.text)
            except ValueError:
                raise MetadataRetrievalError(
                    error_msg=("Unable to parse JSON returned from "
                               "ECS metadata: %s" % response.text))
        except RETRYABLE_HTTP_ERRORS as e:
            error_msg = ("Received error when attempting to retrieve "
                         "ECS metadata: %s" % e)
            raise MetadataRetrievalError(error_msg=error_msg)
credentials.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _create_fetcher(self, full_uri, headers):
        def fetch_creds():
            try:
                response = self._fetcher.retrieve_full_uri(
                    full_uri, headers=headers)
            except MetadataRetrievalError as e:
                logger.debug("Error retrieving container metadata: %s", e,
                             exc_info=True)
                raise CredentialRetrievalError(provider=self.METHOD,
                                               error_msg=str(e))
            return {
                'access_key': response['AccessKeyId'],
                'secret_key': response['SecretAccessKey'],
                'token': response['Token'],
                'expiry_time': response['Expiration'],
            }
        return fetch_creds
utils.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retrieve_uri(self, relative_uri):
        """Retrieve JSON metadata from ECS metadata.

        :type relative_uri: str
        :param relative_uri: A relative URI, e.g "/foo/bar?id=123"

        :return: The parsed JSON response.

        """
        full_url = self._full_url(relative_uri)
        headers = {'Accept': 'application/json'}
        attempts = 0
        while True:
            try:
                return self._get_response(full_url, headers, self.TIMEOUT_SECONDS)
            except MetadataRetrievalError as e:
                logger.debug("Received error when attempting to retrieve "
                             "ECS metadata: %s", e, exc_info=True)
                self._sleep(self.SLEEP_TIME)
                attempts += 1
                if attempts >= self.RETRY_ATTEMPTS:
                    raise
utils.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_response(self, full_url, headers, timeout):
        try:
            response = self._session.get(full_url, headers=headers,
                                         timeout=timeout)
            if response.status_code != 200:
                raise MetadataRetrievalError(
                    error_msg="Received non 200 response (%s) from ECS metadata: %s"
                    % (response.status_code, response.text))
            try:
                return json.loads(response.text)
            except ValueError:
                raise MetadataRetrievalError(
                    error_msg=("Unable to parse JSON returned from "
                               "ECS metadata: %s" % response.text))
        except RETRYABLE_HTTP_ERRORS as e:
            error_msg = ("Received error when attempting to retrieve "
                         "ECS metadata: %s" % e)
            raise MetadataRetrievalError(error_msg=error_msg)
credentials.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _create_fetcher(self, relative_uri):
        def fetch_creds():
            try:
                response = self._fetcher.retrieve_uri(relative_uri)
            except MetadataRetrievalError as e:
                logger.debug("Error retrieving ECS metadata: %s", e,
                             exc_info=True)
                raise CredentialRetrievalError(provider=self.METHOD,
                                               error_msg=str(e))
            return {
                'access_key': response['AccessKeyId'],
                'secret_key': response['SecretAccessKey'],
                'token': response['Token'],
                'expiry_time': response['Expiration'],
            }
        return fetch_creds
utils.py 文件源码 项目:jepsen-training-vpc 作者: bloomberg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def retrieve_uri(self, relative_uri):
        """Retrieve JSON metadata from ECS metadata.

        :type relative_uri: str
        :param relative_uri: A relative URI, e.g "/foo/bar?id=123"

        :return: The parsed JSON response.

        """
        full_url = self._full_url(relative_uri)
        headers = {'Accept': 'application/json'}
        attempts = 0
        while True:
            try:
                return self._get_response(full_url, headers, self.TIMEOUT_SECONDS)
            except MetadataRetrievalError as e:
                logger.debug("Received error when attempting to retrieve "
                             "ECS metadata: %s", e, exc_info=True)
                self._sleep(self.SLEEP_TIME)
                attempts += 1
                if attempts >= self.RETRY_ATTEMPTS:
                    raise
utils.py 文件源码 项目:jepsen-training-vpc 作者: bloomberg 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_response(self, full_url, headers, timeout):
        try:
            response = self._session.get(full_url, headers=headers,
                                         timeout=timeout)
            if response.status_code != 200:
                raise MetadataRetrievalError(
                    error_msg="Received non 200 response (%s) from ECS metadata: %s"
                    % (response.status_code, response.text))
            try:
                return json.loads(response.text)
            except ValueError:
                raise MetadataRetrievalError(
                    error_msg=("Unable to parse JSON returned from "
                               "ECS metadata: %s" % response.text))
        except RETRYABLE_HTTP_ERRORS as e:
            error_msg = ("Received error when attempting to retrieve "
                         "ECS metadata: %s" % e)
            raise MetadataRetrievalError(error_msg=error_msg)
credentials.py 文件源码 项目:jepsen-training-vpc 作者: bloomberg 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _create_fetcher(self, relative_uri):
        def fetch_creds():
            try:
                response = self._fetcher.retrieve_uri(relative_uri)
            except MetadataRetrievalError as e:
                logger.debug("Error retrieving ECS metadata: %s", e,
                             exc_info=True)
                raise CredentialRetrievalError(provider=self.METHOD,
                                               error_msg=str(e))
            return {
                'access_key': response['AccessKeyId'],
                'secret_key': response['SecretAccessKey'],
                'token': response['Token'],
                'expiry_time': response['Expiration'],
            }
        return fetch_creds
utils.py 文件源码 项目:AWS-AutoTag 作者: cpollard0 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_response(self, full_url, headers, timeout):
        try:
            response = self._session.get(full_url, headers=headers,
                                         timeout=timeout)
            if response.status_code != 200:
                raise MetadataRetrievalError(
                    error_msg="Received non 200 response (%s) from ECS metadata: %s"
                    % (response.status_code, response.text))
            try:
                return json.loads(response.text)
            except ValueError:
                raise MetadataRetrievalError(
                    error_msg=("Unable to parse JSON returned from "
                               "ECS metadata: %s" % response.text))
        except RETRYABLE_HTTP_ERRORS as e:
            error_msg = ("Received error when attempting to retrieve "
                         "ECS metadata: %s" % e)
            raise MetadataRetrievalError(error_msg=error_msg)
credentials.py 文件源码 项目:AWS-AutoTag 作者: cpollard0 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _create_fetcher(self, full_uri, headers):
        def fetch_creds():
            try:
                response = self._fetcher.retrieve_full_uri(
                    full_uri, headers=headers)
            except MetadataRetrievalError as e:
                logger.debug("Error retrieving container metadata: %s", e,
                             exc_info=True)
                raise CredentialRetrievalError(provider=self.METHOD,
                                               error_msg=str(e))
            return {
                'access_key': response['AccessKeyId'],
                'secret_key': response['SecretAccessKey'],
                'token': response['Token'],
                'expiry_time': response['Expiration'],
            }
        return fetch_creds
utils.py 文件源码 项目:tf_aws_ecs_instance_draining_on_scale_in 作者: terraform-community-modules 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def retrieve_uri(self, relative_uri):
        """Retrieve JSON metadata from ECS metadata.

        :type relative_uri: str
        :param relative_uri: A relative URI, e.g "/foo/bar?id=123"

        :return: The parsed JSON response.

        """
        full_url = self._full_url(relative_uri)
        headers = {'Accept': 'application/json'}
        attempts = 0
        while True:
            try:
                return self._get_response(full_url, headers, self.TIMEOUT_SECONDS)
            except MetadataRetrievalError as e:
                logger.debug("Received error when attempting to retrieve "
                             "ECS metadata: %s", e, exc_info=True)
                self._sleep(self.SLEEP_TIME)
                attempts += 1
                if attempts >= self.RETRY_ATTEMPTS:
                    raise
utils.py 文件源码 项目:tf_aws_ecs_instance_draining_on_scale_in 作者: terraform-community-modules 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_response(self, full_url, headers, timeout):
        try:
            response = self._session.get(full_url, headers=headers,
                                         timeout=timeout)
            if response.status_code != 200:
                raise MetadataRetrievalError(
                    error_msg="Received non 200 response (%s) from ECS metadata: %s"
                    % (response.status_code, response.text))
            try:
                return json.loads(response.text)
            except ValueError:
                raise MetadataRetrievalError(
                    error_msg=("Unable to parse JSON returned from "
                               "ECS metadata: %s" % response.text))
        except RETRYABLE_HTTP_ERRORS as e:
            error_msg = ("Received error when attempting to retrieve "
                         "ECS metadata: %s" % e)
            raise MetadataRetrievalError(error_msg=error_msg)
credentials.py 文件源码 项目:tf_aws_ecs_instance_draining_on_scale_in 作者: terraform-community-modules 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _create_fetcher(self, relative_uri):
        def fetch_creds():
            try:
                response = self._fetcher.retrieve_uri(relative_uri)
            except MetadataRetrievalError as e:
                logger.debug("Error retrieving ECS metadata: %s", e,
                             exc_info=True)
                raise CredentialRetrievalError(provider=self.METHOD,
                                               error_msg=str(e))
            return {
                'access_key': response['AccessKeyId'],
                'secret_key': response['SecretAccessKey'],
                'token': response['Token'],
                'expiry_time': response['Expiration'],
            }
        return fetch_creds
utils.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _retrieve_credentials(self, full_url, extra_headers=None):
        headers = {'Accept': 'application/json'}
        if extra_headers is not None:
            headers.update(extra_headers)
        attempts = 0
        while True:
            try:
                return self._get_response(full_url, headers, self.TIMEOUT_SECONDS)
            except MetadataRetrievalError as e:
                logger.debug("Received error when attempting to retrieve "
                             "container metadata: %s", e, exc_info=True)
                self._sleep(self.SLEEP_TIME)
                attempts += 1
                if attempts >= self.RETRY_ATTEMPTS:
                    raise
utils.py 文件源码 项目:AWS-AutoTag 作者: cpollard0 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _retrieve_credentials(self, full_url, extra_headers=None):
        headers = {'Accept': 'application/json'}
        if extra_headers is not None:
            headers.update(extra_headers)
        attempts = 0
        while True:
            try:
                return self._get_response(full_url, headers, self.TIMEOUT_SECONDS)
            except MetadataRetrievalError as e:
                logger.debug("Received error when attempting to retrieve "
                             "container metadata: %s", e, exc_info=True)
                self._sleep(self.SLEEP_TIME)
                attempts += 1
                if attempts >= self.RETRY_ATTEMPTS:
                    raise


问题


面经


文章

微信
公众号

扫码关注公众号