def get_options(self, overrides):
"""Return a dictionary of theme options and their values."""
chain = [self.themeconf]
base = self.base
while base is not None:
chain.append(base.themeconf)
base = base.base
options = {}
for conf in reversed(chain):
try:
options.update(conf.items('options'))
except configparser.NoSectionError:
pass
for option, value in iteritems(overrides):
if option not in options:
raise ThemeError('unsupported theme option %r given' % option)
options[option] = value
return options
python类NoSectionError()的实例源码
def parse_config(cfg, ext_config={}):
config = {}
try:
options = cfg.options('pypdns')
except ConfigParser.NoSectionError:
log.debug('No config files provided and default not present')
options = []
for option in options:
config[option] = cfg.get('pypdns', option)
for ckey, cvalue in iteritems(DEFAULT_CONFIG):
if ckey not in config:
config[ckey] = cvalue
for k in ('endpoint', 'apikey'):
if ext_config.get(k) is not None:
config[k] = ext_config[k]
assert (config.get('endpoint') is not None and
config.get('apikey') is not None), 'Configuration not found'
return config
def set_cloud_subscription(cloud_name, subscription):
if not _get_cloud(cloud_name):
raise CloudNotRegisteredException(cloud_name)
config = get_config_parser()
config.read(CLOUD_CONFIG_FILE)
if subscription:
try:
config.add_section(cloud_name)
except configparser.DuplicateSectionError:
pass
config.set(cloud_name, 'subscription', subscription)
else:
try:
config.remove_option(cloud_name, 'subscription')
except configparser.NoSectionError:
pass
if not os.path.isdir(GLOBAL_CONFIG_DIR):
os.makedirs(GLOBAL_CONFIG_DIR)
with open(CLOUD_CONFIG_FILE, 'w') as configfile:
config.write(configfile)
def _check_file(self):
"""
Check if the file exists and has the expected password reference.
"""
if not os.path.exists(self.file_path):
return False
self._migrate()
config = configparser.RawConfigParser()
config.read(self.file_path)
try:
config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('password reference'),
)
except (configparser.NoSectionError, configparser.NoOptionError):
return False
try:
self._check_scheme(config)
except AttributeError:
# accept a missing scheme
return True
return self._check_version(config)
def _check_scheme(self, config):
"""
check for a valid scheme
raise ValueError otherwise
raise AttributeError if missing
"""
try:
scheme = config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('scheme'),
)
except (configparser.NoSectionError, configparser.NoOptionError):
raise AttributeError("Encryption scheme missing")
# remove pointless crypto module name
if scheme.startswith('PyCrypto '):
scheme = scheme[9:]
if scheme != self.scheme:
raise ValueError("Encryption scheme mismatch "
"(exp.: %s, found: %s)" % (self.scheme, scheme))
def get_password(self, service, username):
"""Read the password from the file.
"""
service = escape_for_ini(service)
username = escape_for_ini(username)
# fetch the password
try:
password_base64 = self.config.get(service, username).encode()
# decode with base64
password_encrypted = base64.decodestring(password_base64)
# decrypted the password
password = self.decrypt(password_encrypted).decode('utf-8')
except (configparser.NoOptionError, configparser.NoSectionError):
password = None
return password
def delete_password(self, service, username):
"""Delete the password for the username of the service.
"""
service = escape_for_ini(service)
username = escape_for_ini(username)
config = configparser.RawConfigParser()
if os.path.exists(self.file_path):
config.read(self.file_path)
try:
if not config.remove_option(service, username):
raise PasswordDeleteError("Password not found")
except configparser.NoSectionError:
raise PasswordDeleteError("Password not found")
# update the file
with open(self.file_path, 'w') as config_file:
config.write(config_file)
def get_configured_defaults():
_reload_config()
try:
options = az_config.config_parser.options(DEFAULTS_SECTION)
defaults = {}
for opt in options:
value = az_config.get(DEFAULTS_SECTION, opt)
if value:
defaults[opt] = value
return defaults
except configparser.NoSectionError:
return {}
def _find_configured_default(argument):
if not (hasattr(argument.type, 'default_name_tooling') and argument.type.default_name_tooling):
return None
try:
return az_config.get(DEFAULTS_SECTION, argument.type.default_name_tooling, None)
except configparser.NoSectionError:
return None
def getDataLocation(data_name):
'''
Get the location of data set
@param data_name: Name of data set
@return string of data location, None if not found
'''
data_name = str.lower(data_name)
conf = DataFetcherLocal.getConfig()
try:
return conf.get(data_name, 'data_location')
except (NoOptionError, NoSectionError):
return None
def _update_default_info(self):
try:
options = az_config.config_parser.options(DEFAULTS_SECTION)
self.config_default = ""
for opt in options:
self.config_default += opt + ": " + az_config.get(DEFAULTS_SECTION, opt) + " "
except configparser.NoSectionError:
self.config_default = ""
def get_confstr(self, section, name, default=NODEFAULT):
"""Return the value for a theme configuration setting, searching the
base theme chain.
"""
try:
return self.themeconf.get(section, name)
except (configparser.NoOptionError, configparser.NoSectionError):
if self.base is not None:
return self.base.get_confstr(section, name, default)
if default is NODEFAULT:
raise ThemeError('setting %s.%s occurs in none of the '
'searched theme configs' % (section, name))
else:
return default
def _update_default_info(self):
try:
options = az_config.config_parser.options(DEFAULTS_SECTION)
self.config_default = ""
for opt in options:
self.config_default += opt + ": " + az_config.get(DEFAULTS_SECTION, opt) + " "
except configparser.NoSectionError:
self.config_default = ""
def get_active_cloud_name():
global_config = get_config_parser()
global_config.read(GLOBAL_CONFIG_PATH)
try:
return global_config.get('cloud', 'name')
except (configparser.NoOptionError, configparser.NoSectionError):
_set_active_cloud(AZURE_PUBLIC_CLOUD.name)
return AZURE_PUBLIC_CLOUD.name
def get_cloud_subscription(cloud_name):
config = get_config_parser()
config.read(CLOUD_CONFIG_FILE)
try:
return config.get(cloud_name, 'subscription')
except (configparser.NoOptionError, configparser.NoSectionError):
return None
def test_get_not_found_section(self):
section = 'MySection'
option = 'myoption'
with self.assertRaises(configparser.NoSectionError):
self.az_config.get(section, option)
def get_area_file():
conf, successes = get_config("satpy.cfg")
if conf is None or not successes:
LOG.warning(
"Couldn't find the satpy.cfg file. Do you have one ? is it in $PPP_CONFIG_DIR ?")
return None
try:
fn = os.path.join(conf.get("projector", "area_directory") or "",
conf.get("projector", "area_file"))
return get_config_path(fn)
except configparser.NoSectionError:
LOG.warning("Couldn't find 'projector' section of 'satpy.cfg'")
def test_get_not_found_section(self):
section = 'MySection'
option = 'myoption'
with self.assertRaises(configparser.NoSectionError):
self.cli_config.get(section, option)
def get(self, section, option, fallback=_UNSET):
try:
env = self.env_var_name(section, option)
return os.environ[env] if env in os.environ else self.config_parser.get(section, option)
except (configparser.NoSectionError, configparser.NoOptionError):
if fallback is _UNSET:
raise
else:
return fallback
def read_section(section, fname):
"""Read the specified section of an .ini file."""
conf = configparser.ConfigParser()
conf.read(fname)
val = {}
try:
val = dict((v, k) for v, k in conf.items(section))
return val
except configparser.NoSectionError:
return None
#############################################################################
def load_state_value(self, section, option):
"""Load value from state."""
try:
logger.debug("Loading state [{}] {}".format(section, option))
return self.state.get(section, option)
except (configparser.NoSectionError, configparser.NoOptionError):
raise ValueError("State not saved or invalid.")
def _check_version(self, config):
"""
check for a valid version
an existing scheme implies an existing version as well
return True, if version is valid, and False otherwise
"""
try:
self.file_version = config.get(
escape_for_ini('keyring-setting'),
escape_for_ini('version'),
)
except (configparser.NoSectionError, configparser.NoOptionError):
return False
return True
def delete_password(self, service, username):
service = escape_for_ini(service)
username = escape_for_ini(username)
try:
self.config.remove_option(service, username)
except configparser.NoSectionError:
raise errors.PasswordDeleteError('Password not found')
config_file = UnicodeWriterAdapter(self._open('w'))
self.config.write(config_file)
config_file.close()
def setup_logging(config_path, config_opts, options):
log_ini = os.path.join(config_path, config_opts["log_config_file"])
try:
if not os.path.exists(log_ini):
if os.path.normpath('/etc/mock') != os.path.normpath(config_path):
log.warning("Could not find required logging config file: %s. Using default...",
log_ini)
log_ini = os.path.join("/etc/mock", config_opts["log_config_file"])
if not os.path.exists(log_ini):
raise IOError("Could not find log config file %s" % log_ini)
else:
raise IOError("Could not find log config file %s" % log_ini)
except IOError as exc:
log.error(exc)
sys.exit(50)
try:
log_cfg = configparser.ConfigParser()
logging.config.fileConfig(log_ini)
log_cfg.read(log_ini)
except (IOError, OSError, configparser.NoSectionError) as exc:
log.error("Log config file(%s) not correctly configured: %s", log_ini, exc)
sys.exit(50)
try:
# set up logging format strings
config_opts['build_log_fmt_str'] = log_cfg.get("formatter_%s" % config_opts['build_log_fmt_name'],
"format", raw=1)
config_opts['root_log_fmt_str'] = log_cfg.get("formatter_%s" % config_opts['root_log_fmt_name'],
"format", raw=1)
config_opts['state_log_fmt_str'] = log_cfg.get("formatter_%s" % config_opts['state_log_fmt_name'],
"format", raw=1)
except configparser.NoSectionError as exc:
log.error("Log config file (%s) missing required section: %s", log_ini, exc)
sys.exit(50)
# set logging verbosity
if options.verbose == 0:
log.handlers[0].setLevel(logging.WARNING)
tmplog = logging.getLogger("mockbuild.Root.state")
if tmplog.handlers:
tmplog.handlers[0].setLevel(logging.WARNING)
elif options.verbose == 1:
log.handlers[0].setLevel(logging.INFO)
elif options.verbose == 2:
log.handlers[0].setLevel(logging.DEBUG)
logging.getLogger("mockbuild.Root.build").propagate = 1
logging.getLogger("mockbuild").propagate = 1
# enable tracing if requested
logging.getLogger("trace").propagate = 0
if options.trace:
logging.getLogger("trace").propagate = 1
def test_get_config_value():
config_path = get_config_path('.testconfigrc')
if os.path.exists(config_path):
os.remove(config_path)
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting1', default_generator=lambda: 'somevalue', write_default=True)
assert value == 'somevalue'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting1', default_generator=lambda: 'someothervalue', write_default=True)
assert value == 'somevalue'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2', default_generator=lambda: 'blah', write_default=True)
assert value == 'blah'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting1')
assert value == 'somevalue'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2')
assert value == 'blah'
with raises(NoSectionError):
_ = get_config_value(config_filename='.testconfigrc', section='schmopts', option='setting3')
with raises(NoOptionError):
_ = get_config_value(config_filename='.testconfigrc', section='opts', option='setting3')
with raises(AssertionError):
_ = get_config_value(config_filename='.testconfigXXXrc', section='opts', option='setting3')
set_non_persistent_config_value(config_filename='.testconfigrc', section='opts', option='setting2',value="bob")
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2')
assert value == 'bob'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2', use_cashed_config=False)
assert value == 'blah'
value = get_config_value(config_filename='.testconfigrc', section='opts', option='setting2')
assert value == 'bob'
set_non_persistent_config_value(config_filename='.testconfigrc', section='schmapts', option='setting2', value="bob")
with raises(NoOptionError):
_ = get_config_value(config_filename='.testconfigrc', section='schmapts', option='setting3')
with raises(NoSectionError):
_ = get_config_value(config_filename='.testconfigrc', section='schmapts', option='setting2', use_cashed_config=False)
value = get_config_value(config_filename='.testconfigrc', section='schmapts', option='setting2')
assert value == 'bob'
os.remove(config_path)
def __call__(names, config, verbose=False, refresh=False):
# TODO?: we might want to embed get_resource_inventory()
# within ConfigManager (even though it would make it NICEMAN specific)
# This would allow to make more sensible error messages etc
cm = ResourceManager.get_config_manager(config)
inventory_path = cm.getpath('general', 'inventory_file')
inventory = ResourceManager.get_inventory(inventory_path)
id_length = 19 # todo: make it possible to output them long
template = '{:<20} {:<20} {:<%(id_length)s} {:<10}' % locals()
ui.message(template.format('RESOURCE NAME', 'TYPE', 'ID', 'STATUS'))
ui.message(template.format('-------------', '----', '--', '------'))
for name in sorted(inventory):
if name.startswith('_'):
continue
# if refresh:
inventory_resource = inventory[name]
try:
config = dict(cm.items(inventory_resource['type'].split('-')[0]))
except NoSectionError:
config = {}
config.update(inventory_resource)
env_resource = ResourceManager.factory(config)
try:
if refresh:
env_resource.connect()
# TODO: handle the actual refreshing in the inventory
inventory_resource['id'] = env_resource.id
inventory_resource['status'] = env_resource.status
if not env_resource.id:
# continue # A missing ID indicates a deleted resource.
inventory_resource['id'] = 'DELETED'
except ResourceError as exc:
ui.error("%s resource query error: %s" % (name, exc_str(exc)))
for f in 'id', 'status':
inventory_resource[f] = inventory_resource.get(f, "?")
msgargs = (
name,
inventory_resource['type'],
inventory_resource['id'][:id_length],
inventory_resource['status']
)
ui.message(template.format(*msgargs))
lgr.debug('list result: {}, {}, {}, {}'.format(*msgargs))
# if not refresh:
# ui.message('(Use --refresh option to view current status.)')
#
# if refresh:
# niceman.interface.base.set_resource_inventory(inventory)