python类HttpError()的实例源码

compute.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_global_operation(self, project_id, operation_id):
        """Get the Operations Status
        Args:
            project_id (str): The project id.
            operation_id (str): The operation id.

        Returns:
            dict: Global Operation status and info.
            https://cloud.google.com/compute/docs/reference/latest/globalOperations/get

        Raises:
            ApiNotEnabledError: Returns if the api is not enabled.
            ApiExecutionError: Returns if the api is not executable.

        """
        try:
            return self.repository.global_operations.get(
                project_id, operation_id)
        except (errors.HttpError, HttpLib2Error) as e:
            api_not_enabled, details = _api_not_enabled(e)
            if api_not_enabled:
                raise api_errors.ApiNotEnabledError(details, e)
            raise api_errors.ApiExecutionError(project_id, e)
compute.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_project(self, project_id):
        """Returns the specified Project resource.

        Args:
            project_id (str): The project id.

        Returns:
            dict: A Compute Project resource dict.
            https://cloud.google.com/compute/docs/reference/latest/projects/get
        """
        try:
            return self.repository.projects.get(project_id)
        except (errors.HttpError, HttpLib2Error) as e:
            api_not_enabled, details = _api_not_enabled(e)
            if api_not_enabled:
                raise api_errors.ApiNotEnabledError(details, e)
            raise api_errors.ApiExecutionError(project_id, e)
compute.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_api_enabled(self, project_id):
        """Checks if the Compute API is enabled for the specified project.

        Args:
            project_id (str): The project id.

        Returns:
            bool: True if the API is enabled, else False.
        """
        try:
            result = self.repository.projects.get(project_id, fields='name')
            return bool('name' in result)  # True if name, otherwise False.
        except (errors.HttpError, HttpLib2Error) as e:
            api_not_enabled, _ = _api_not_enabled(e)
            if api_not_enabled:
                return False
            raise api_errors.ApiExecutionError(project_id, e)
admin_directory.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_group_members(self, group_key):
        """Get all the members for specified groups.

        Args:
            group_key (str): The group's unique id assigned by the Admin API.

        Returns:
            list: A list of member objects from the API.

        Raises:
            api_errors.ApiExecutionError: If group member retrieval fails.
        """
        try:
            paged_results = self.repository.members.list(group_key)
            return api_helpers.flatten_list_results(paged_results, 'members')
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError(group_key, e)
admin_directory.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_groups(self, customer_id='my_customer'):
        """Get all the groups for a given customer_id.

        A note on customer_id='my_customer'. This is a magic string instead
        of using the real customer id. See:

        https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups#get_all_domain_groups

        Args:
            customer_id (str): The customer id to scope the request to.

        Returns:
            list: A list of group objects returned from the API.

        Raises:
            api_errors.ApiExecutionError: If groups retrieval fails.
        """
        try:
            paged_results = self.repository.groups.list(customer=customer_id)
            return api_helpers.flatten_list_results(paged_results, 'groups')
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError('groups', e)
bigquery.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_datasets_for_projectid(self, project_id):
        """Return BigQuery datasets stored in the requested project_id.

        Args:
            project_id (str): String representing the project id.

        Returns:
            list: A list of datasetReference objects for a given project_id.

            [{'datasetId': 'dataset-id',
              'projectId': 'project-id'},
             {...}]
        """
        try:
            results = self.repository.datasets.list(
                resource=project_id,
                fields='datasets/datasetReference,nextPageToken',
                all=True)
            flattened = api_helpers.flatten_list_results(results, 'datasets')
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError(project_id, e)

        datasets = [result.get('datasetReference') for result in flattened
                    if 'datasetReference' in result]
        return datasets
appengine.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_app(self, project_id):
        """Gets information about an application.

        Args:
            project_id (str): The id of the project.

        Returns:
            dict: The response of retrieving the AppEngine app.
        """
        try:
            return self.repository.apps.get(project_id)
        except (errors.HttpError, HttpLib2Error) as e:
            if isinstance(e, errors.HttpError) and e.resp.status == 404:
                # TODO: handle error more gracefully
                # application not found
                return {}
            raise api_errors.ApiExecutionError(project_id, e)
appengine.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_service(self, project_id, service_id):
        """Gets information about a specific service.

        Args:
            project_id (str): The id of the project.
            service_id (str): The id of the service to query.

        Returns:
            dict: A Service resource dict for a given project_id and
                service_id.
        """
        try:
            return self.repository.app_services.get(
                project_id, target=service_id)
        except (errors.HttpError, HttpLib2Error) as e:
            if isinstance(e, errors.HttpError) and e.resp.status == 404:
                return {}
            raise api_errors.ApiExecutionError(project_id, e)
appengine.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def list_services(self, project_id):
        """Lists services of a project.

        Args:
            project_id (str): The id of the project.

        Returns:
            list: A list of Service resource dicts for a project_id.
        """
        try:
            paged_results = self.repository.app_services.list(project_id)
            return api_helpers.flatten_list_results(paged_results, 'services')
        except (errors.HttpError, HttpLib2Error) as e:
            if isinstance(e, errors.HttpError) and e.resp.status == 404:
                return []
            raise api_errors.ApiExecutionError(project_id, e)
appengine.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def list_versions(self, project_id, service_id):
        """Lists versions of a given service.

        Args:
            project_id (str): The id of the project.
            service_id (str): The id of the service to query.

        Returns:
            list: A list of Version resource dicts for a given Service.
        """
        try:
            paged_results = self.repository.service_versions.list(
                project_id, services_id=service_id)
            return api_helpers.flatten_list_results(paged_results, 'versions')
        except (errors.HttpError, HttpLib2Error) as e:
            if isinstance(e, errors.HttpError) and e.resp.status == 404:
                return []
            raise api_errors.ApiExecutionError(project_id, e)
appengine.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_instance(self, project_id, service_id, version_id, instances_id):
        """Gets information about a specific instance of a service.

        Args:
            project_id (str): The id of the project.
            service_id (str): The id of the service to query.
            version_id (str): The id of the version to query.
            instances_id (str): The id of the instance to query.

        Returns:
            dict: An Instance resource dict for a given project_id,
                service_id and version_id.
        """
        try:
            return self.repository.version_instances.get(
                project_id, target=instances_id, services_id=service_id,
                versions_id=version_id)
        except (errors.HttpError, HttpLib2Error) as e:
            if isinstance(e, errors.HttpError) and e.resp.status == 404:
                return {}
            raise api_errors.ApiExecutionError(project_id, e)
appengine.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def list_instances(self, project_id, service_id, version_id):
        """Lists instances of a given service and version.

        Args:
            project_id (str): The id of the project.
            service_id (str): The id of the service to query.
            version_id (str): The id of the version to query.

        Returns:
            list: A list of Instance resource dicts for a given Version.
        """
        try:
            paged_results = self.repository.version_instances.list(
                project_id, services_id=service_id, versions_id=version_id)
            return api_helpers.flatten_list_results(paged_results, 'instances')
        except (errors.HttpError, HttpLib2Error) as e:
            if isinstance(e, errors.HttpError) and e.resp.status == 404:
                return []
            raise api_errors.ApiExecutionError(project_id, e)
cloud_resource_manager.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_project(self, project_id):
        """Get all the projects from organization.

        Args:
            project_id (str): The project id (not project number).

        Returns:
            dict: The project response object.

        Raises:
            ApiExecutionError: An error has occurred when executing the API.
        """
        try:
            return self.repository.projects.get(project_id)
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError(project_id, e)
cloud_resource_manager.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_org_iam_policies(self, resource_name, org_id):
        """Get all the iam policies of an org.

        Args:
            resource_name (str): The resource type.
            org_id (int): The org id number.

        Returns:
            dict: Organization IAM policy for given org_id.
            https://cloud.google.com/resource-manager/reference/rest/Shared.Types/Policy

        Raises:
            ApiExecutionError: An error has occurred when executing the API.
        """
        resource_id = 'organizations/%s' % org_id
        try:
            iam_policy = (
                self.repository.organizations.get_iam_policy(resource_id))
            return {'org_id': org_id,
                    'iam_policy': iam_policy}
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError(resource_name, e)
cloud_resource_manager.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_folder(self, folder_name):
        """Get a folder.

        Args:
            folder_name (str): The unique folder name, with the format
                "folders/{folderId}".

        Returns:
            dict: The folder API response.

        Raises:
            ApiExecutionError: An error has occurred when executing the API.
        """
        name = self.repository.folders.get_name(folder_name)
        try:
            return self.repository.folders.get(name)
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError(folder_name, e)
cloud_resource_manager.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_folder_iam_policies(self, resource_name, folder_id):
        """Get all the iam policies of an folder.

        Args:
            resource_name (str): The resource name (type).
            folder_id (int): The folder id.

        Returns:
            dict: Folder IAM policy for given folder_id.

        Raises:
            ApiExecutionError: An error has occurred when executing the API.
        """
        resource_id = 'folders/%s' % folder_id
        try:
            iam_policy = self.repository.folders.get_iam_policy(resource_id)
            return {'folder_id': folder_id,
                    'iam_policy': iam_policy}
        except (errors.HttpError, HttpLib2Error) as e:
            raise api_errors.ApiExecutionError(resource_name, e)
storage.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_text_file(self, full_bucket_path):
        """Gets a text file object as a string.

        Args:
            full_bucket_path (str): The full path of the bucket object.

        Returns:
            str: The object's content as a string.

        Raises:
            HttpError: HttpError is raised if the call to the GCP storage API
                fails
        """
        bucket, object_name = get_bucket_and_path_from(full_bucket_path)
        try:
            return self.repository.objects.download(bucket, object_name)
        except errors.HttpError as e:
            LOGGER.error('Unable to download file: %s', e)
            raise
storage.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_buckets(self, project_id):
        """Gets all GCS buckets for a project.

        Args:
            project_id (int): The project id for a GCP project.

        Returns:
            list: a list of bucket resource dicts.
            https://cloud.google.com/storage/docs/json_api/v1/buckets

        Raises:
            ApiExecutionError: ApiExecutionError is raised if the call to the
                GCP ClodSQL API fails
        """
        try:
            paged_results = self.repository.buckets.list(project_id,
                                                         projection='full')
            return api_helpers.flatten_list_results(paged_results, 'items')
        except (errors.HttpError, HttpLib2Error) as e:
            LOGGER.warn(api_errors.ApiExecutionError(project_id, e))
            raise api_errors.ApiExecutionError('buckets', e)
iam.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_service_accounts(self, project_id):
        """Get Service Accounts associated with a project.

        Args:
            project_id (str): The project ID to get Service Accounts for.

        Returns:
            list: List of service accounts associated with the project.
        """
        name = self.repository.projects_serviceaccounts.get_name(project_id)

        try:
            paged_results = self.repository.projects_serviceaccounts.list(name)
            return api_helpers.flatten_list_results(paged_results, 'accounts')
        except (errors.HttpError, HttpLib2Error) as e:
            LOGGER.warn(api_errors.ApiExecutionError(name, e))
            raise api_errors.ApiExecutionError('serviceAccounts', e)


问题


面经


文章

微信
公众号

扫码关注公众号