def _parse_config(self, domain_name):
config_path = os.path.join(utils.CONFIG_PATH, domain_name, 'config')
if not os.path.exists(config_path):
raise exception.DomainNotFound(domain=domain_name)
config = configparser.ConfigParser()
config.read(config_path)
bmc = {}
for item in ('username', 'password', 'address', 'domain_name',
'libvirt_uri', 'libvirt_sasl_username',
'libvirt_sasl_password'):
try:
value = config.get(DEFAULT_SECTION, item)
except configparser.NoOptionError:
value = None
bmc[item] = value
# Port needs to be int
bmc['port'] = config.getint(DEFAULT_SECTION, 'port')
return bmc
python类NoOptionError()的实例源码
def _parse_config(self, domain_name):
config_path = os.path.join(self.config_dir, domain_name, 'config')
if not os.path.exists(config_path):
raise exception.DomainNotFound(domain=domain_name)
config = configparser.ConfigParser()
config.read(config_path)
bmc = {}
for item in ('username', 'password', 'address', 'domain_name',
'libvirt_uri', 'libvirt_sasl_username',
'libvirt_sasl_password'):
try:
value = config.get(DEFAULT_SECTION, item)
except configparser.NoOptionError:
value = None
bmc[item] = value
# Port needs to be int
bmc['port'] = config.getint(DEFAULT_SECTION, 'port')
return bmc
def _get_pwd(service=None):
"""
Returns the present working directory for the given service,
or all services if none specified.
"""
def to_text(data):
if six.PY2: # pragma: no cover
data = data.decode(locale.getpreferredencoding(False))
return data
parser = _get_env()
if service:
try:
return to_text(utils.with_trailing_slash(parser.get('env', service)))
except configparser.NoOptionError as e:
six.raise_from(ValueError('%s is an invalid service' % service), e)
return [to_text(utils.with_trailing_slash(value)) for name, value in parser.items('env')]
def get(key, default=None):
"""
Get the configuration value for key
:param str key: The key in the configuration to retreive
:returns: The value in the configuration, or the default
"""
del default
global configuration # pylint: disable=C0103
try:
return configuration.get("scitokens", key)
except configparser.NoOptionError as noe:
# Check the defaults
if key in CONFIG_DEFAULTS:
return CONFIG_DEFAULTS[key]
else:
raise noe
def handle(self, config, email, password, target, name, **options):
try:
config.get('serverapi', 'server_id')
except NoOptionError:
pass
else:
raise CommandError("This server is already registered. If you've unregistered it from your team dashboard, you can delete {}".format(options['config_path']))
if email is None:
email = input("Enter your Cloak email: ")
if password is None:
password = getpass("Enter your Cloak password: ")
if target is None:
target = input("Enter the target identifier (from the team dashboard): ")
server = Server.register(email, password, target, name)
config.set('serverapi', 'server_id', server.server_id)
config.set('serverapi', 'auth_token', server.auth_token)
print("This server has been registered. The next step is to request a certificate.", file=self.stdout)
def _check_file(self):
"""
Check if the file exists and has the expected password reference.
"""
if not os.path.exists(self.file_path):
return False
self._migrate()
config = configparser.RawConfigParser()
config.read(self.file_path)
try:
config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('password reference'),
)
except (configparser.NoSectionError, configparser.NoOptionError):
return False
try:
self._check_scheme(config)
except AttributeError:
# accept a missing scheme
return True
return self._check_version(config)
def _check_scheme(self, config):
"""
check for a valid scheme
raise ValueError otherwise
raise AttributeError if missing
"""
try:
scheme = config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('scheme'),
)
except (configparser.NoSectionError, configparser.NoOptionError):
raise AttributeError("Encryption scheme missing")
# remove pointless crypto module name
if scheme.startswith('PyCrypto '):
scheme = scheme[9:]
if scheme != self.scheme:
raise ValueError("Encryption scheme mismatch "
"(exp.: %s, found: %s)" % (self.scheme, scheme))
def get_password(self, service, username):
"""Read the password from the file.
"""
service = escape_for_ini(service)
username = escape_for_ini(username)
# fetch the password
try:
password_base64 = self.config.get(service, username).encode()
# decode with base64
password_encrypted = base64.decodestring(password_base64)
# decrypted the password
password = self.decrypt(password_encrypted).decode('utf-8')
except (configparser.NoOptionError, configparser.NoSectionError):
password = None
return password
def getDataLocation(data_name):
'''
Get the location of data set
@param data_name: Name of data set
@return string of data location, None if not found
'''
data_name = str.lower(data_name)
conf = DataFetcherLocal.getConfig()
try:
return conf.get(data_name, 'data_location')
except (NoOptionError, NoSectionError):
return None
def testConfigCommands(self):
self.assertFalse(os.path.exists(local_cfg))
info = self._runConfigScript(['--help'])
self.assertEqual(info['rc'], 0)
self.assertEqual(info['stderr'], '')
self.assertIn('Get and set configuration values for the worker',
info['stdout'])
info = self._runConfigScript(['list'])
self.assertEqual(info['rc'], 0)
self.assertIn('[girder_worker]', info['stdout'])
info = self._runConfigScript(['get', 'celery', 'app_main'])
self.assertEqual(info['rc'], 0)
self.assertEqual(info['stdout'].strip(), 'girder_worker')
info = self._runConfigScript(['set', 'celery', 'app_main', 'foo'])
self.assertEqual(info['rc'], 0)
info = self._runConfigScript(['get', 'celery', 'app_main'])
self.assertEqual(info['rc'], 0)
self.assertEqual(info['stdout'].strip(), 'foo')
info = self._runConfigScript(['rm', 'celery', 'app_main'])
self.assertEqual(info['rc'], 0)
with self.assertRaises(NoOptionError):
self._runConfigScript(['get', 'celery', 'app_main'])
ec2_mod.py 文件源码
项目:ansible-tower-automated-deployment
作者: OliverCable
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_option(self, config, section, option):
try:
opt = config.get('%s' % section, option)
except (NoOptionError, NoSectionError):
print("Couldn't find %s in profile %s" % (option, section))
sys.exit(1)
return opt
def get_confstr(self, section, name, default=NODEFAULT):
"""Return the value for a theme configuration setting, searching the
base theme chain.
"""
try:
return self.themeconf.get(section, name)
except (configparser.NoOptionError, configparser.NoSectionError):
if self.base is not None:
return self.base.get_confstr(section, name, default)
if default is NODEFAULT:
raise ThemeError('setting %s.%s occurs in none of the '
'searched theme configs' % (section, name))
else:
return default
def log_level(self):
try:
return getattr(logging, self.config.get("default", "log_level"))
except (AttributeError, configparser.NoOptionError):
return logging.WARNING
def get_active_cloud_name():
global_config = get_config_parser()
global_config.read(GLOBAL_CONFIG_PATH)
try:
return global_config.get('cloud', 'name')
except (configparser.NoOptionError, configparser.NoSectionError):
_set_active_cloud(AZURE_PUBLIC_CLOUD.name)
return AZURE_PUBLIC_CLOUD.name
def get_cloud_subscription(cloud_name):
config = get_config_parser()
config.read(CLOUD_CONFIG_FILE)
try:
return config.get(cloud_name, 'subscription')
except (configparser.NoOptionError, configparser.NoSectionError):
return None
def test_get_not_found_option(self):
section = 'MySection'
option = 'myoption'
self.az_config.config_parser.add_section(section)
with self.assertRaises(configparser.NoOptionError):
self.az_config.get(section, option)
def get(self, section, option, fallback=_UNSET):
try:
env = AzConfig.env_var_name(section, option)
return os.environ[env] if env in os.environ else self.config_parser.get(section, option)
except (configparser.NoSectionError, configparser.NoOptionError):
if fallback is _UNSET:
raise
else:
return fallback
def read(benchmark_result_file):
"""
Read benchmark
:param benchmark_result_file: benchmark result file
:return: {device: {metric: value, }, }
"""
result = {}
config = configparser.SafeConfigParser()
with io.open(benchmark_result_file) as fp:
config.readfp(fp) # pylint: disable=deprecated-method
for section in config.sections():
try:
device = config.get(section, _DEVICE)
result[device] = {}
for metric in Metrics:
result[device][metric.value] = config.get(
section,
metric.value
)
except configparser.NoOptionError:
_LOGGER.error(
'Incorrect section in %s',
benchmark_result_file
)
return result
def _default_pretty_print(self):
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.dodo_commands/config"))
try:
return config.get("DodoCommands", "pretty_print")
except configparser.NoOptionError:
return "true"
def test_get_not_found_option(self):
section = 'MySection'
option = 'myoption'
self.cli_config.config_parser.add_section(section)
with self.assertRaises(configparser.NoOptionError):
self.cli_config.get(section, option)
def get(self, section, option, fallback=_UNSET):
try:
env = self.env_var_name(section, option)
return os.environ[env] if env in os.environ else self.config_parser.get(section, option)
except (configparser.NoSectionError, configparser.NoOptionError):
if fallback is _UNSET:
raise
else:
return fallback
def load_state_value(self, section, option):
"""Load value from state."""
try:
logger.debug("Loading state [{}] {}".format(section, option))
return self.state.get(section, option)
except (configparser.NoSectionError, configparser.NoOptionError):
raise ValueError("State not saved or invalid.")
def _check_version(self, config):
"""
check for a valid version
an existing scheme implies an existing version as well
return True, if version is valid, and False otherwise
"""
try:
self.file_version = config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('version'),
)
except (configparser.NoSectionError, configparser.NoOptionError):
return False
return True
def get_password(self, service, username):
"""
Read the password from the file.
"""
assoc = self._generate_assoc(service, username)
service = escape_for_ini(service)
username = escape_for_ini(username)
# load the passwords from the file
config = configparser.RawConfigParser()
if os.path.exists(self.file_path):
config.read(self.file_path)
# fetch the password
try:
password_base64 = config.get(service, username).encode()
# decode with base64
password_encrypted = decodebytes(password_base64)
# decrypt the password with associated data
try:
password = self.decrypt(password_encrypted, assoc).decode('utf-8')
except ValueError:
# decrypt the password without associated data
password = self.decrypt(password_encrypted).decode('utf-8')
except (configparser.NoOptionError, configparser.NoSectionError):
password = None
return password
def get_param(self, key, action=None, manifest=None):
"""Returns a string value of a given pkglint parameter,
intended for use by pkglint Checker objects to provide hints as
to how particular checks should be run.
Keys are searched for first in the action, if provided, then as
manifest attributes, finally falling back to the pkglintrc
config file.
The return value is a space-separated string of parameters.
When searching for keys in the manifest or action, we prepend
"pkg.lint" to the key name to ensure that we play in our own
namespace and don't clash with other manifest or action attrs.
"""
param_key = "pkg.lint.{0}".format(key)
val = None
if action and param_key in action.attrs:
val = action.attrs[param_key]
if manifest and param_key in manifest:
val = manifest[param_key]
if val:
if isinstance(val, six.string_types):
return val
else:
return " ".join(val)
try:
val = self.conf.get("pkglint", key)
if val:
return val.replace("\n", " ")
except configparser.NoOptionError:
return None
def _get_config(self):
config_file = os.environ.get('ZUNCLIENT_TEST_CONFIG',
DEFAULT_CONFIG_FILE)
# SafeConfigParser was deprecated in Python 3.2
if six.PY3:
config = config_parser.ConfigParser()
else:
config = config_parser.SafeConfigParser()
if not config.read(config_file):
self.skipTest('Skipping, no test config found @ %s' % config_file)
try:
auth_strategy = config.get('functional', 'auth_strategy')
except config_parser.NoOptionError:
auth_strategy = 'keystone'
if auth_strategy not in ['keystone', 'noauth']:
raise self.fail(
'Invalid auth type specified: %s in functional must be '
'one of: [keystone, noauth]' % auth_strategy)
conf_settings = []
keystone_v3_conf_settings = []
if auth_strategy == 'keystone':
conf_settings += ['os_auth_url', 'os_username',
'os_password', 'os_project_name',
'os_identity_api_version']
keystone_v3_conf_settings += ['os_user_domain_id',
'os_project_domain_id']
else:
conf_settings += ['os_auth_token', 'zun_url']
cli_flags = {}
missing = []
for c in conf_settings + keystone_v3_conf_settings:
try:
cli_flags[c] = config.get('functional', c)
except config_parser.NoOptionError:
# NOTE(vdrok): Here we ignore the absence of KS v3 options as
# v2 may be used. Keystone client will do the actual check of
# the parameters' correctness.
if c not in keystone_v3_conf_settings:
missing.append(c)
if missing:
self.fail('Missing required setting in test.conf (%(conf)s) for '
'auth_strategy=%(auth)s: %(missing)s' %
{'conf': config_file,
'auth': auth_strategy,
'missing': ','.join(missing)})
return cli_flags
def __init__(self, name, warn=None):
if name not in self.themes:
self.load_extra_theme(name)
if name not in self.themes:
raise ThemeError('no theme named %r found '
'(missing theme.conf?)' % name)
self.name = name
# Do not warn yet -- to be compatible with old Sphinxes, people *have*
# to use "default".
# if name == 'default' and warn:
# warn("'default' html theme has been renamed to 'classic'. "
# "Please change your html_theme setting either to "
# "the new 'alabaster' default theme, or to 'classic' "
# "to keep using the old default.")
tdir, tinfo = self.themes[name]
if tinfo is None:
# already a directory, do nothing
self.themedir = tdir
self.themedir_created = False
else:
# extract the theme to a temp directory
self.themedir = tempfile.mkdtemp('sxt')
self.themedir_created = True
for name in tinfo.namelist():
if name.endswith('/'):
continue
dirname = path.dirname(name)
if not path.isdir(path.join(self.themedir, dirname)):
os.makedirs(path.join(self.themedir, dirname))
fp = open(path.join(self.themedir, name), 'wb')
fp.write(tinfo.read(name))
fp.close()
self.themeconf = configparser.RawConfigParser()
self.themeconf.read(path.join(self.themedir, THEMECONF))
try:
inherit = self.themeconf.get('theme', 'inherit')
except configparser.NoOptionError:
raise ThemeError('theme %r doesn\'t have "inherit" setting' % name)
# load inherited theme automatically #1794, #1884, #1885
self.load_extra_theme(inherit)
if inherit == 'none':
self.base = None
elif inherit not in self.themes:
raise ThemeError('no theme named %r found, inherited by %r' %
(inherit, name))
else:
self.base = Theme(inherit, warn=warn)
def test_get_config_value():
config_path = get_config_path('.testconfigrc')
if os.path.exists(config_path):
os.remove(config_path)
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting1', default_generator=lambda: 'somevalue', write_default=True)
assert value == 'somevalue'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting1', default_generator=lambda: 'someothervalue', write_default=True)
assert value == 'somevalue'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2', default_generator=lambda: 'blah', write_default=True)
assert value == 'blah'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting1')
assert value == 'somevalue'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2')
assert value == 'blah'
with raises(NoSectionError):
_ = get_config_value(config_filename='.testconfigrc', section='schmopts', option='setting3')
with raises(NoOptionError):
_ = get_config_value(config_filename='.testconfigrc', section='opts', option='setting3')
with raises(AssertionError):
_ = get_config_value(config_filename='.testconfigXXXrc', section='opts', option='setting3')
set_non_persistent_config_value(config_filename='.testconfigrc', section='opts', option='setting2',value="bob")
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2')
assert value == 'bob'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2', use_cashed_config=False)
assert value == 'blah'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2')
assert value == 'bob'
set_non_persistent_config_value(config_filename='.testconfigrc', section='schmapts', option='setting2', value="bob")
with raises(NoOptionError):
_ = get_config_value(config_filename='.testconfigrc', section='schmapts', option='setting3')
with raises(NoSectionError):
_ = get_config_value(config_filename='.testconfigrc', section='schmapts', option='setting2', use_cashed_config=False)
value = get_config_value(config_filename='.testconfigrc', section='schmapts', option='setting2')
assert value == 'bob'
os.remove(config_path)