def get_host_and_realm(self):
"""Return the hostname and IPA realm name.
IPA 4.4 introduced the requirement that the schema be
fetched when calling finalize(). This is really only used by
the ipa command-line tool but for now it is baked in.
So we have to get a TGT first but need the hostname and
realm. For now directly read the IPA config file which is
in INI format and pull those two values out and return as
a tuple.
"""
config = SafeConfigParser()
config.read('/etc/ipa/default.conf')
hostname = config.get('global', 'host')
realm = config.get('global', 'realm')
return (hostname, realm)
python类SafeConfigParser()的实例源码
def set_environment(self):
config = configparser.SafeConfigParser()
config.read(self.mcr_filepath)
try:
op_sys, env_var, mcr_path, set_paths = \
self._get_mcr_config(config, 'custom')
except (IOError, ValueError):
try:
op_sys, env_var, mcr_path, set_paths = \
self._get_mcr_config(config, 'linux_default')
except (IOError, ValueError):
op_sys, env_var, mcr_path, set_paths = \
self._get_mcr_config(config, 'macosx_default')
subprocess_env = os.environ.copy()
subprocess_env["MCR_CACHE_ROOT"] = "/tmp/emptydir"
subprocess_env["LANG"] = "en_US.utf8"
subprocess_env[env_var] = set_paths
return subprocess_env, op_sys
def get_lilypond_bin_path(self):
config = configparser.SafeConfigParser()
lily_cfgfile = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'config', 'lilypond.cfg')
config.read(lily_cfgfile)
# check custom
lilypath = config.get('custom', 'custom')
# linux path might be given with $HOME; convert it to the real path
lilypath = lilypath.replace('$HOME', os.path.expanduser('~'))
if lilypath:
assert os.path.exists(lilypath), \
'The lilypond path is not found. Please correct the custom ' \
'section in "tomato/config/lilypond.cfg".'
else: # defaults
lilypath = config.defaults()[self.sys_os]
assert (os.path.exists(lilypath) or
self.call('"which" "{0:s}"'.format(lilypath))[0]), \
'The lilypond path is not found. Please correct the custom ' \
'section in "tomato/config/lilypond.cfg".'
return lilypath
def parse_config_file(filename):
"""
Parses a configuration file and returns a settings dictionary.
Args:
filename (str): File to read configuration settings from.
Returns:
dict: A dictionary of settings options.
"""
parser = SafeConfigParser()
with open(filename) as fp:
parser.readfp(fp)
settings = {
section: {
item[0]: _parse_config_val(item[1])
for item in parser.items(section)
}
for section in parser.sections()
}
return settings
def test_passing_config_log(self):
"""
Test the with log_file
"""
new_config = configparser.SafeConfigParser()
new_config.add_section("scitokens")
new_config.set("scitokens", "log_level", "WARNING")
tmp_file = tempfile.NamedTemporaryFile()
new_config.set("scitokens", "log_file", tmp_file.name)
scitokens.set_config(new_config)
self.assertEqual(scitokens.utils.config.get("log_level"), "WARNING")
self.assertEqual(scitokens.utils.config.get("log_file"), tmp_file.name)
# Log a line
logger = logging.getLogger("scitokens")
logger.error("This is an error")
tmp_file.flush()
print(os.path.getsize(tmp_file.name))
self.assertTrue(os.path.getsize(tmp_file.name) > 0)
tmp_file.close()
def write(cfg_obj, output_file_path):
"""
Only supports writing out a conflagration object with namespaces that
follow the section.key=value pattern that ConfigFile.parse generates
"""
parser = SafeConfigParser()
for k in cfg_obj.__dict__.keys():
parser.add_section(k)
try:
for sub_k, sub_v in cfg_obj.__dict__[k].__dict__.items():
parser.set(k, sub_k, sub_v)
except Exception:
raise Exception(
"Output to config file not supported for conflagrations"
"nested beyond a one dot namespace.")
with open(output_file_path, 'w') as f:
parser.write(f)
support_matrix.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _load_support_matrix(self):
"""Reads the support-matrix.ini file and populates an instance
of the SupportMatrix class with all the data.
:returns: SupportMatrix instance
"""
cfg = configparser.SafeConfigParser()
env = self.state.document.settings.env
fname = self.arguments[0]
rel_fpath, fpath = env.relfn2path(fname)
with open(fpath) as fp:
cfg.readfp(fp)
# This ensures that the docs are rebuilt whenever the
# .ini file changes
env.note_dependency(rel_fpath)
matrix = SupportMatrix()
matrix.targets = self._get_targets(cfg)
matrix.features = self._get_features(cfg, matrix.targets)
return matrix
ec2_mod.py 文件源码
项目:ansible-tower-automated-deployment
作者: OliverCable
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def assume_role(self, region, profile):
# assume role
global connect_args
if six.PY3:
aws_creds = configparser.ConfigParser()
aws_config = configparser.ConfigParser()
else:
aws_creds = configparser.SafeConfigParser()
aws_config = configparser.SafeConfigParser()
aws_creds.read(os.path.expanduser("~/.aws/credentials"))
aws_config.read(os.path.expanduser("~/.aws/config"))
source_profile = self.get_option(aws_config, profile, 'source_profile')
arn = self.get_option(aws_config, profile, 'role_arn')
aws_access_key = self.get_option(aws_creds, source_profile, 'aws_access_key_id')
aws_secret_key = self.get_option(aws_creds, source_profile, 'aws_secret_access_key')
session_name = "role_session_name_" + self.boto_profile
sts_conn = sts.STSConnection(aws_access_key, aws_secret_key)
assume_role = sts_conn.assume_role(role_arn=arn, role_session_name=session_name)
connect_args['aws_access_key_id'] = assume_role.credentials.access_key
connect_args['aws_secret_access_key'] = assume_role.credentials.secret_key
connect_args['security_token'] = assume_role.credentials.session_token
def config_from_ini(self, ini):
config = {}
parser = configparser.SafeConfigParser()
ini = textwrap.dedent(six.u(ini))
parser.readfp(io.StringIO(ini))
for section in parser.sections():
config[section] = dict(parser.items(section))
return config
def config_from_ini(self, ini):
config = {}
if sys.version_info >= (3, 2):
parser = configparser.ConfigParser()
else:
parser = configparser.SafeConfigParser()
ini = textwrap.dedent(six.u(ini))
parser.readfp(io.StringIO(ini))
for section in parser.sections():
config[section] = dict(parser.items(section))
return config
def __init__(self, top_config=None):
usr_config = path.expanduser('~/.opp/opp.cfg')
sys_config = '/etc/opp/opp.cfg'
if not top_config:
# User config not provided, attempt to read from env
# This is mostly intended to be used for testing
try:
top_config = environ['OPP_TOP_CONFIG']
except KeyError:
pass
# Create config list in order of increasing priority
cfglist = []
if path.isfile(sys_config):
cfglist.append(sys_config)
if path.isfile(usr_config):
cfglist.append(usr_config)
if top_config and path.isfile(top_config):
cfglist.append(top_config)
if sys.version_info >= (3, 2):
self.cfg = configparser.ConfigParser()
else:
self.cfg = configparser.SafeConfigParser()
if cfglist:
self.cfg.read(cfglist)
# Set default values
self.def_sec = "DEFAULT"
cfg_defaults = [
['secret_key', "default-insecure"],
['exp_delta', "300"]]
for opt in cfg_defaults:
if not self.cfg.has_option(self.def_sec, opt[0]):
self.cfg.set(self.def_sec, opt[0], opt[1])
def _get_env():
"""
Get the current environment using the ENV_FILE.
Returns a ConfigParser.
"""
parser = configparser.SafeConfigParser()
# if env file doesn't exist, copy over the package default
if not os.path.exists(ENV_FILE):
shutil.copyfile(PKG_ENV_FILE, ENV_FILE)
with open(ENV_FILE) as fp:
parser.readfp(fp)
return parser
def test_passing_config(self):
"""
Test the passing of a configuration parser object
"""
new_config = configparser.SafeConfigParser()
new_config.add_section("scitokens")
new_config.set("scitokens", "log_level", "WARNING")
scitokens.set_config(new_config)
self.assertEqual(scitokens.utils.config.get("log_level"), "WARNING")
def read(cfgfile):
if not os.path.exists(cfgfile):
ex = IOError if six.PY2 else FileNotFoundError
raise ex('File {name} does not exist.'.format(name=cfgfile))
data = SafeConfigParser()
data.read(cfgfile)
return data
def __init__(self):
self.logger = logging.getLogger(__name__)
self.conf = SafeConfigParser()
if self.conf.read(CONFIG_FILE):
self.logger.debug("Using config file at {0}".format(CONFIG_FILE))
else:
self.logger.warning(
"Config file {0} not found".format(CONFIG_FILE))
def test_load_settings(self):
"""
Test that the right calls are made and the right errors generated when
loading configuration settings from a configuration file specified by
a path string.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._parse_settings = mock.MagicMock()
# Test that the right calls are made when correctly processing the
# configuration file.
with mock.patch('os.path.exists') as os_mock:
os_mock.return_value = True
with mock.patch(
'six.moves.configparser.SafeConfigParser.read'
) as parser_mock:
c.load_settings("/test/path/server.conf")
c._logger.info.assert_any_call(
"Loading server configuration settings from: "
"/test/path/server.conf"
)
parser_mock.assert_called_with("/test/path/server.conf")
self.assertTrue(c._parse_settings.called)
# Test that a ConfigurationError is generated when the path is invalid.
c._logger.reset_mock()
with mock.patch('os.path.exists') as os_mock:
os_mock.return_value = False
args = ('/test/path/server.conf', )
self.assertRaises(
exceptions.ConfigurationError,
c.load_settings,
*args
)
def load_settings(self, path):
"""
Load configuration settings from the file pointed to by path.
This will overwrite all current setting values.
Args:
path (string): The path to the configuration file containing
the settings to load. Required.
Raises:
ConfigurationError: Raised if the path does not point to an
existing file or if a setting value is invalid.
"""
if not os.path.exists(path):
raise exceptions.ConfigurationError(
"The server configuration file ('{0}') could not be "
"located.".format(path)
)
self._logger.info(
"Loading server configuration settings from: {0}".format(path)
)
parser = configparser.SafeConfigParser()
parser.read(path)
self._parse_settings(parser)
def __init__(self, wrapped):
# this is a SafeConfigParser instance
self._conf_values = None
# note we do not invoke the cfg_dir setter here, because we do not want anything to be created/copied yet.
# first check if there is a config dir set via environment
if 'CHAINSAW_CFG_DIR' in os.environ:
# TODO: probe?
self._cfg_dir = os.environ['CHAINSAW_CFG_DIR']
# try to read default cfg dir
elif os.path.isdir(self.DEFAULT_CONFIG_DIR) and os.access(self.DEFAULT_CONFIG_DIR, os.W_OK):
self._cfg_dir = self.DEFAULT_CONFIG_DIR
# use defaults, have no cfg_dir set.
else:
self._cfg_dir = ''
try:
self.load()
except RuntimeError as re:
warnings.warn("unable to read default configuration file. Logging and "
" progress bar handling could behave bad! Error: %s" % re)
from chainsaw.util.log import setup_logging, LoggingConfigurationError
try:
setup_logging(self)
except LoggingConfigurationError as e:
warnings.warn("Error during logging configuration. Logging might not be functional!"
"Error: %s" % e)
# wrap this module
self.wrapped = wrapped
self.__wrapped__ = wrapped
def __read_cfg(self, filenames):
config = configparser.SafeConfigParser()
try:
self._used_filenames = config.read(filenames)
except EnvironmentError as e:
# note: this file is mission crucial, so fail badly if this is not readable.
raise ReadConfigException("FATAL ERROR: could not read default configuration"
" file %s\n%s" % (self.default_config_file, e))
return config
def __init__(self):
self.logger = logging.getLogger(__name__)
self.conf = SafeConfigParser()
if self.conf.read(CONFIG_FILE):
self.logger.debug("Using config file at {0}".format(CONFIG_FILE))
else:
self.logger.warning(
"Config file {0} not found".format(CONFIG_FILE))
def test_load_settings(self):
"""
Test that the right calls are made and the right errors generated when
loading configuration settings from a configuration file specified by
a path string.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._parse_settings = mock.MagicMock()
# Test that the right calls are made when correctly processing the
# configuration file.
with mock.patch('os.path.exists') as os_mock:
os_mock.return_value = True
with mock.patch(
'six.moves.configparser.SafeConfigParser.read'
) as parser_mock:
c.load_settings("/test/path/server.conf")
c._logger.info.assert_any_call(
"Loading server configuration settings from: "
"/test/path/server.conf"
)
parser_mock.assert_called_with("/test/path/server.conf")
self.assertTrue(c._parse_settings.called)
# Test that a ConfigurationError is generated when the path is invalid.
c._logger.reset_mock()
with mock.patch('os.path.exists') as os_mock:
os_mock.return_value = False
args = ('/test/path/server.conf', )
self.assertRaises(
exceptions.ConfigurationError,
c.load_settings,
*args
)
def load_settings(self, path):
"""
Load configuration settings from the file pointed to by path.
This will overwrite all current setting values.
Args:
path (string): The path to the configuration file containing
the settings to load. Required.
Raises:
ConfigurationError: Raised if the path does not point to an
existing file or if a setting value is invalid.
"""
if not os.path.exists(path):
raise exceptions.ConfigurationError(
"The server configuration file ('{0}') could not be "
"located.".format(path)
)
self._logger.info(
"Loading server configuration settings from: {0}".format(path)
)
parser = configparser.SafeConfigParser()
parser.read(path)
self._parse_settings(parser)
def get_config_parser():
import sys
python_version = sys.version_info.major
return configparser.ConfigParser() if python_version == 3 else configparser.SafeConfigParser()
def read(benchmark_result_file):
"""
Read benchmark
:param benchmark_result_file: benchmark result file
:return: {device: {metric: value, }, }
"""
result = {}
config = configparser.SafeConfigParser()
with io.open(benchmark_result_file) as fp:
config.readfp(fp) # pylint: disable=deprecated-method
for section in config.sections():
try:
device = config.get(section, _DEVICE)
result[device] = {}
for metric in Metrics:
result[device][metric.value] = config.get(
section,
metric.value
)
except configparser.NoOptionError:
_LOGGER.error(
'Incorrect section in %s',
benchmark_result_file
)
return result
def write(benchmark_result_file, result):
"""Write benchmark result.
Sample output file format:
[device0]
device = 589d88bd-8098-4041-900e-7fcac18abab3
write_bps = 314572800
read_bps = 314572800
write_iops = 64000
read_iops = 4000
:param benchmark_result_file:
benchmark result file
:param result:
{device: {metric: value, }, }
"""
config = configparser.SafeConfigParser()
device_count = 0
for device, metrics in result.iteritems():
section = _DEVICE + str(device_count)
device_count += 1
config.add_section(section)
config.set(section, _DEVICE, device)
for metric, value in metrics.iteritems():
config.set(section, metric, str(value))
fs.write_safe(
benchmark_result_file,
config.write,
permission=0o644
)
def _load_support_matrix(self):
"""Reads the support-matrix.ini file and populates an instance
of the SupportMatrix class with all the data.
:returns: SupportMatrix instance
"""
# SafeConfigParser was deprecated in Python 3.2
if sys.version_info >= (3, 2):
cfg = configparser.ConfigParser()
else:
cfg = configparser.SafeConfigParser()
env = self.state.document.settings.env
fname = self.arguments[0]
rel_fpath, fpath = env.relfn2path(fname)
with open(fpath) as fp:
cfg.readfp(fp)
# This ensures that the docs are rebuilt whenever the
# .ini file changes
env.note_dependency(rel_fpath)
matrix = SupportMatrix()
matrix.targets = self._get_targets(cfg)
matrix.features = self._get_features(cfg, matrix.targets)
return matrix
def cfg(program, key):
cfg = configparser.SafeConfigParser()
cfg.read(['weak-local.cfg', 'weak.cfg'])
if cfg.has_option(program, key):
return cfg.get(program, key)
return None
# make a butterworth IIR bandpass filter
def __init__(self, filename, defaults=None):
self.filename = filename
self._parser = configparser.SafeConfigParser()
self.defaults = defaults or {}
self.read(filename, doraise=False)
def __init__(self, fileName):
cp = SafeConfigParser()
cp.read(fileName)
self.__parser = cp
self.fileName = fileName
def get_config_parser():
return configparser.ConfigParser() if sys.version_info.major == 3 else configparser.SafeConfigParser()