def replace_object(self, name, namespace, k8s_obj=None, body=None):
""" Replace an existing object. Pass in a model object or request dict().
Will first lookup the existing object to get the resource version and
update the request.
"""
self.logger.debug('Starting replace object')
existing_obj = self.get_object(name, namespace)
if not existing_obj:
msg = "Error: Replacing object. Unable to find {}".format(name)
msg += " in namespace {}".format(namespace) if namespace else ""
raise self.get_exception_class()(msg)
if k8s_obj:
k8s_obj.status = self.properties['status']['class']()
self.__remove_creation_timestamps(k8s_obj)
k8s_obj.metadata.resource_version = existing_obj.metadata.resource_version
elif body:
body['metadata']['resourceVersion'] = existing_obj.metadata.resource_version
w, stream = self._create_stream(namespace)
return_obj = None
try:
replace_method = self.lookup_method('replace', namespace)
if k8s_obj:
if namespace is None:
replace_method(name, k8s_obj)
else:
replace_method(name, namespace, k8s_obj)
else:
if namespace is None:
replace_method(name, body=body)
else:
replace_method(name, namespace, body=body)
except ApiException as exc:
msg = json.loads(exc.body).get('message', exc.reason) if exc.body.startswith('{') else exc.body
raise self.get_exception_class()(msg, status=exc.status)
except MaxRetryError as ex:
raise self.get_exception_class()(str(ex.reason))
if stream is not None:
return_obj = self._read_stream(w, stream, name)
if not return_obj or self.kind in ('project', 'namespace'):
return_obj = self._wait_for_response(name, namespace, 'replace')
return self.fix_serialization(return_obj)
评论列表
文章目录