def _get_in_use_ports(self):
kubernetes = clients.get_kubernetes_client()
in_use_ports = []
running_pods = kubernetes.get(constants.K8S_API_BASE + '/pods')
for pod in running_pods['items']:
try:
annotations = jsonutils.loads(pod['metadata']['annotations'][
constants.K8S_ANNOTATION_VIF])
except KeyError:
LOG.debug("Skipping pod without kuryr VIF annotation: %s",
pod)
else:
in_use_ports.append(
annotations['versioned_object.data']['id'])
return in_use_ports
评论列表
文章目录