def test_jinja_render_strict(self):
context = {
"base_distro": "debian",
"base_tag": "jessie",
"maintainer": "some maintainer",
"duck": {"egg": "needle"}
}
content = jinja_utils.jinja_render(self.filename, context,
functions=[utils.address])
self.assertEqual(
"debian\njessie\nsome maintainer\nneedle\nneedle\n"
"keystone.ccp.svc.cluster.local\n"
"keystone.ccp.svc.cluster.local",
content)
context = {
"base_distro": "debian"
}
self.assertRaises(exceptions.UndefinedError, jinja_utils.jinja_render,
self.filename, context)
python类UndefinedError()的实例源码
def render_template (tmpl_str, render_vals={}):
# XXX" ih, why is there a try / except here?
env = Environment (undefined=AlertUndefined)
env.filters.update (jext.FILTER_DICT)
template = env.from_string (tmpl_str)
render_vals.update (jext.EXT_DICT)
try:
rendered_tmpl = template.render (**render_vals)
return rendered_tmpl
except jexcept.UndefinedError as err:
print ('variable used in schema is undefined')
raise
### END ###
def is_template(self, data):
''' lets us know if data has a template'''
if isinstance(data, string_types):
try:
new = self.do_template(data, fail_on_undefined=True)
except (AnsibleUndefinedVariable, UndefinedError):
return True
except:
return False
return (new != data)
elif isinstance(data, (list, tuple)):
for v in data:
if self.is_template(v):
return True
elif isinstance(data, dict):
for k in data:
if self.is_template(k) or self.is_template(data[k]):
return True
return False
def template(ctx: Context, path: "string"):
"""
Execute the template in path in the current context. This function will
generate a new statement that has dependencies on the used variables.
"""
jinja_env = _get_template_engine(ctx)
if path in tcache:
template = tcache[path]
else:
template = jinja_env.get_template(path)
tcache[path] = template
resolver = ctx.get_resolver()
try:
out = template.render({"{{resolver": resolver})
return out
except UndefinedError as e:
raise NotFoundException(ctx.owner, None, e.message)
def invoke(self, context):
given_args = context.protected_args + context.args
interactive = True
args = []
for arg in given_args:
if arg == '--interactive':
interactive = True
elif arg == '--not-interactive':
interactive = False
else:
args.append(arg)
name = args[0]
application = self.application
if not application:
raise click.ClickException('Could not locate application')
blueprint = application.blueprints.get(name)
if not blueprint:
raise click.ClickException('Could not locate blueprint')
command = blueprint.load_context()
args = args[1:]
ctx = command.main(args, standalone_mode=False)
try:
return application.generate(
blueprint,
ctx,
interactive=interactive
)
except UndefinedError as e:
raise click.ClickException(
'%s.\n'
"The blueprint's context may be invalid.\n"
'Blueprint: %s\n'
'Context: %s' % (str(e), str(blueprint), str(ctx))
)
def test_get_deploy_components_info_with_not_enough_context(self):
default_params = {
"configs": {
"service_name": "keystone",
"db_root_password": "db_root_password_default",
"keystone_db_name": "keystone_db_name_default",
"keystone_db_username": "keystone_db_username_default",
"keystone_db_password": "keystone_db_password_default",
"openstack_user_password": "os_user_password_default",
"openstack_user_name": "os_user_name_default",
"openstack_project_name": "os_project_name_default",
"openstack_role_name": "os_role_name_default",
}
}
conf = config._yaml.AttrDict()
conf._merge(default_params)
conf._merge(config._REAL_CONF)
config._REAL_CONF = conf
base_dir = os.path.dirname(__file__)
self.conf.repositories.path = os.path.join(base_dir, "test_repo_dir")
self.conf.repositories.repos = [{"name": "component"}]
config.load_component_defaults()
self.assertRaises(jinja_exceptions.UndefinedError,
utils.get_deploy_components_info)
def set_xml(self):
"""Set document xml just rendered already
validated against xsd to be signed.
:params boolean debug_mode: Either if you want
the rendered template to be saved either it
is valid or not with the given schema.
:returns boolean: Either was valid or not the generated document.
"""
cached = StringIO()
document = u''
try:
document = self.template.render(inv=self)
except UndefinedError as ups:
self.ups = ups
# TODO: Here should be called the cleanup 'Just before the validation'.
valid = self.validate(self.schema, document)
self.document = document
if valid:
document = etree.XML(document)
self.document = etree.tostring(document,
pretty_print=True,
xml_declaration=True,
encoding='utf-8')
# TODO: When Document Generated, this this should not fail either.
# Caching just when valid then.
cached.write(self.document is not None and self.document or u'')
cached.seek(0)
self.document_path = cached
def _lookup(self, name, *args, **kwargs):
instance = self._lookup_loader.get(name.lower(), loader=self._loader, templar=self)
if instance is not None:
wantlist = kwargs.pop('wantlist', False)
from ansible.utils.listify import listify_lookup_plugin_terms
loop_terms = listify_lookup_plugin_terms(terms=args, templar=self, loader=self._loader, fail_on_undefined=True, convert_bare=False)
# safely catch run failures per #5059
try:
ran = instance.run(loop_terms, variables=self._available_variables, **kwargs)
except (AnsibleUndefinedVariable, UndefinedError) as e:
raise AnsibleUndefinedVariable(e)
except Exception as e:
if self._fail_on_lookup_errors:
raise
ran = None
if ran:
from ansible.vars.unsafe_proxy import UnsafeProxy, wrap_var
if wantlist:
ran = wrap_var(ran)
else:
try:
ran = UnsafeProxy(",".join(ran))
except TypeError:
if isinstance(ran, list) and len(ran) == 1:
ran = wrap_var(ran[0])
else:
ran = wrap_var(ran)
return ran
else:
raise AnsibleError("lookup plugin (%s) not found" % name)
def _lookup_variables(self, terms, variables):
results = []
for x in terms:
try:
intermediate = listify_lookup_plugin_terms(x, templar=self._templar, loader=self._loader, fail_on_undefined=True)
except UndefinedError as e:
raise AnsibleUndefinedVariable("One of the nested variables was undefined. The error was: %s" % e)
results.append(intermediate)
return results
def render_template(self, path, inventory):
try:
return self.jinja_environ.get_template(path).render(inventory.as_dict)
except exceptions.UndefinedError as e:
log.error("while rendering: {} ({})".format(path, e.message))
sys.exit(1)
def _lookup(self, name, *args, **kwargs):
instance = self._lookup_loader.get(name.lower(), loader=self._loader, templar=self)
if instance is not None:
wantlist = kwargs.pop('wantlist', False)
allow_unsafe = kwargs.pop('allow_unsafe', C.DEFAULT_ALLOW_UNSAFE_LOOKUPS)
from ansible.utils.listify import listify_lookup_plugin_terms
loop_terms = listify_lookup_plugin_terms(terms=args, templar=self, loader=self._loader, fail_on_undefined=True, convert_bare=False)
# safely catch run failures per #5059
try:
ran = instance.run(loop_terms, variables=self._available_variables, **kwargs)
except (AnsibleUndefinedVariable, UndefinedError) as e:
raise AnsibleUndefinedVariable(e)
except Exception as e:
if self._fail_on_lookup_errors:
raise AnsibleError("An unhandled exception occurred while running the lookup plugin '%s'. Error was a %s, "
"original message: %s" % (name, type(e), e))
ran = None
if ran and not allow_unsafe:
if wantlist:
ran = wrap_var(ran)
else:
try:
ran = UnsafeProxy(",".join(ran))
except TypeError:
if isinstance(ran, list) and len(ran) == 1:
ran = wrap_var(ran[0])
else:
ran = wrap_var(ran)
if self.cur_context:
self.cur_context.unsafe = True
return ran
else:
raise AnsibleError("lookup plugin (%s) not found" % name)
def _lookup_variables(self, terms, variables):
results = []
for x in terms:
try:
intermediate = listify_lookup_plugin_terms(x, templar=self._templar, loader=self._loader, fail_on_undefined=True)
except UndefinedError as e:
raise AnsibleUndefinedVariable("One of the nested variables was undefined. The error was: %s" % e)
results.append(intermediate)
return results
def _check_conditional(self, conditional, templar, all_vars):
'''
This method does the low-level evaluation of each conditional
set on this object, using jinja2 to wrap the conditionals for
evaluation.
'''
original = conditional
if conditional is None or conditional == '':
return True
if conditional in all_vars and '-' not in text_type(all_vars[conditional]):
conditional = all_vars[conditional]
# make sure the templar is using the variables specified with this method
templar.set_available_variables(variables=all_vars)
try:
conditional = templar.template(conditional)
if not isinstance(conditional, text_type) or conditional == "":
return conditional
# a Jinja2 evaluation that results in something Python can eval!
presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional
conditional = templar.template(presented)
val = conditional.strip()
if val == "True":
return True
elif val == "False":
return False
else:
raise AnsibleError("unable to evaluate conditional: %s" % original)
except (AnsibleUndefinedVariable, UndefinedError) as e:
# the templating failed, meaning most likely a
# variable was undefined. If we happened to be
# looking for an undefined variable, return True,
# otherwise fail
if "is undefined" in original:
return True
elif "is defined" in original:
return False
else:
raise AnsibleError("error while evaluating conditional (%s): %s" % (original, e))
def run(self, terms, variables, **kwargs):
anydict = False
skip = False
for term in terms:
if isinstance(term, dict):
anydict = True
total_search = []
if anydict:
for term in terms:
if isinstance(term, dict):
files = term.get('files', [])
paths = term.get('paths', [])
skip = boolean(term.get('skip', False))
filelist = files
if isinstance(files, string_types):
files = files.replace(',', ' ')
files = files.replace(';', ' ')
filelist = files.split(' ')
pathlist = paths
if paths:
if isinstance(paths, string_types):
paths = paths.replace(',', ' ')
paths = paths.replace(':', ' ')
paths = paths.replace(';', ' ')
pathlist = paths.split(' ')
if not pathlist:
total_search = filelist
else:
for path in pathlist:
for fn in filelist:
f = os.path.join(path, fn)
total_search.append(f)
else:
total_search.append(term)
else:
total_search = self._flatten(terms)
for fn in total_search:
try:
fn = self._templar.template(fn)
except (AnsibleUndefinedVariable, UndefinedError):
continue
# get subdir if set by task executor, default to files otherwise
subdir = getattr(self, '_subdir', 'files')
path = None
path = self.find_file_in_search_path(variables, subdir, fn, ignore_missing=True)
if path is not None:
return [path]
else:
if skip:
return []
else:
raise AnsibleLookupError("No file was found when using with_first_found. Use the 'skip: true' option to allow this task to be skipped if no files are found")
def run(self, terms, variables, **kwargs):
anydict = False
skip = False
for term in terms:
if isinstance(term, dict):
anydict = True
total_search = []
if anydict:
for term in terms:
if isinstance(term, dict):
files = term.get('files', [])
paths = term.get('paths', [])
skip = boolean(term.get('skip', False), strict=False)
filelist = files
if isinstance(files, string_types):
files = files.replace(',', ' ')
files = files.replace(';', ' ')
filelist = files.split(' ')
pathlist = paths
if paths:
if isinstance(paths, string_types):
paths = paths.replace(',', ' ')
paths = paths.replace(':', ' ')
paths = paths.replace(';', ' ')
pathlist = paths.split(' ')
if not pathlist:
total_search = filelist
else:
for path in pathlist:
for fn in filelist:
f = os.path.join(path, fn)
total_search.append(f)
else:
total_search.append(term)
else:
total_search = self._flatten(terms)
for fn in total_search:
try:
fn = self._templar.template(fn)
except (AnsibleUndefinedVariable, UndefinedError):
continue
# get subdir if set by task executor, default to files otherwise
subdir = getattr(self, '_subdir', 'files')
path = None
path = self.find_file_in_search_path(variables, subdir, fn, ignore_missing=True)
if path is not None:
return [path]
else:
if skip:
return []
else:
raise AnsibleLookupError("No file was found when using with_first_found. Use the 'skip: true' option to allow this task to be skipped if no "
"files are found")