azure_api.py 文件源码

python
阅读 16 收藏 0 点赞 0 评论 0

项目:kubernetes-ec2-autoscaler 作者: openai 项目源码 文件源码
def list_scale_sets(self, resource_group_name: str) -> List[AzureScaleSet]:
        fifteen_minutes_ago = datetime.now(pytz.utc) - TIMEOUT_PERIOD
        filter_clause = "eventTimestamp ge '{}' and resourceGroupName eq '{}'".format(fifteen_minutes_ago, resource_group_name)
        select_clause = "authorization,status,subStatus,properties,resourceId,eventTimestamp"

        failures_by_scale_set: MutableMapping[str, List[EventData]] = {}
        for log in self._monitor_client.activity_logs.list(filter=filter_clause, select=select_clause):
            if (log.status and log.status.value == 'Failed') or (log.properties and log.properties.get('statusCode') == 'Conflict'):
                if log.authorization and log.authorization.action and 'delete' in log.authorization.action:
                    continue
                failures_by_scale_set.setdefault(log.resource_id, []).append(log)

        result = []
        for scale_set in self._compute_client.virtual_machine_scale_sets.list(resource_group_name):
            failures = sorted(failures_by_scale_set.get(scale_set.id, []), key=lambda x: x.event_timestamp, reverse=True)
            timeout_until = None
            timeout_reason = None
            for failure in failures:
                status_message = json.loads(failure.properties.get('statusMessage', "{}")) if failure.properties else {}
                error_details = status_message.get('error', {})
                if 'message' in error_details:
                    timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
                    timeout_reason = error_details['message']
                    # Stop if we found a message with details
                    break
                if timeout_until is None:
                    timeout_until = failure.event_timestamp + TIMEOUT_PERIOD
                    timeout_reason = failure.sub_status.localized_value

            priority = int(scale_set.tags[PRIORITY_TAG]) if PRIORITY_TAG in scale_set.tags else None
            no_schedule_taints = json.loads(scale_set.tags.get(NO_SCHEDULE_TAINTS_TAG, '{}'))

            result.append(AzureScaleSet(scale_set.location, resource_group_name, scale_set.name, scale_set.sku.name,
                                        scale_set.sku.capacity, scale_set.provisioning_state, timeout_until=timeout_until,
                                        timeout_reason=timeout_reason, priority=priority, no_schedule_taints=no_schedule_taints))
        return result
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号