def __init__(self, config):
self.classification_data = None
self.classification_path = config.get(
"pkglint", "info_classification_path")
self.skip_classification_check = False
# a default error message used if we've parsed the
# data file, but haven't thrown any exceptions
self.bad_classification_data = _("no sections found in data "
"file {0}").format(self.classification_path)
if os.path.exists(self.classification_path):
try:
if six.PY2:
self.classification_data = \
configparser.SafeConfigParser()
self.classification_data.readfp(
open(self.classification_path))
else:
# SafeConfigParser has been renamed to
# ConfigParser in Python 3.2.
self.classification_data = \
configparser.ConfigParser()
self.classification_data.read_file(
open(self.classification_path))
except Exception as err:
# any exception thrown here results in a null
# classification_data object. We deal with that
# later.
self.bad_classification_data = _(
"unable to parse data file {path}: "
"{err}").format(
path=self.classification_path,
err=err)
pass
else:
self.bad_classification_data = _("missing file {0}").format(
self.classification_path)
super(ManifestChecker, self).__init__(config)
python类SafeConfigParser()的实例源码
def __init__(self, config_file=None):
if config_file:
try:
# ConfigParser doesn't do a good job of
# error reporting, so we'll just try to open
# the file
open(config_file, "r").close()
except (EnvironmentError) as err:
raise PkglintConfigException(
_("unable to read config file: {0} ").format(
err))
try:
if six.PY2:
self.config = configparser.SafeConfigParser(
defaults)
else:
# SafeConfigParser has been renamed to
# ConfigParser in Python 3.2.
self.config = configparser.ConfigParser(
defaults)
if not config_file:
if six.PY2:
self.config.readfp(
open("/usr/share/lib/pkg/pkglintrc"))
else:
self.config.read_file(
open("/usr/share/lib/pkg/pkglintrc"))
self.config.read(
[os.path.expanduser("~/.pkglintrc")])
else:
self.config.read(config_file)
# sanity check our config by looking for a known key
self.config.get("pkglint", "log_level")
except configparser.Error as err:
raise PkglintConfigException(
_("missing or corrupt pkglintrc file "
"{config_file}: {err}").format(**locals()))
def config_from_ini(self, ini):
config = {}
parser = configparser.SafeConfigParser()
ini = textwrap.dedent(six.u(ini))
parser.readfp(io.StringIO(ini))
for section in parser.sections():
config[section] = dict(parser.items(section))
return config
def read_config_file(config_file_path):
# type: (str) -> None
config_file_path = os.path.abspath(os.path.expanduser(config_file_path))
if not os.path.isfile(config_file_path):
raise IOError("Could not read config file {}: File not found.".format(config_file_path))
parser = SafeConfigParser()
parser.read(config_file_path)
for section in parser.sections():
bots_config[section] = {
"email": parser.get(section, 'email'),
"key": parser.get(section, 'key'),
"site": parser.get(section, 'site'),
}
def do_docker_create(self, label, parameters, environment, name, image,
volumes, memory_limit, folders, command):
"""
Create necessary directories in a working directory
for the mounts in the containers.
Write .ini file filled with given parameters in each folder.
Create a new docker container from a given image and
return the id of the container
"""
# Create needed folders for mounts
for folder in folders:
try:
os.makedirs(folder, 0o2775)
# Path already exists, ignore
except OSError:
if not os.path.isdir(folder):
raise
# Create ini file for containers
config = configparser.SafeConfigParser()
for section in parameters:
if not config.has_section(section):
config.add_section(section)
for key, value in parameters[section].items():
# TODO: find more elegant solution for this! ugh!
if not key == 'units':
if not config.has_option(section, key):
config.set(*map(str, [section, key, value]))
for folder in folders:
with open(os.path.join(folder, 'input.ini'), 'w') as f:
config.write(f) # Yes, the ConfigParser writes to f
# Create docker container
client = Client(base_url=settings.DOCKER_URL)
# We could also pass mem_reservation since docker-py 1.10
config = client.create_host_config(binds=volumes, mem_limit=memory_limit)
container = client.create_container(
image, # docker image
name=name,
host_config=config, # mounts
command=command, # command to run
environment=environment, # {'uuid' = ""} for cloud fs sync
labels=label # type of container
)
container_id = container.get('Id')
return container_id, ""
def test_do_docker_create(self, mockClient):
"""
Assert that the docker_create task
calls the docker client.create_container() function.
"""
image = "IMAGENAME"
volumes = ['/:/data/output:z',
'/:/data/input:ro']
memory_limit = '1g'
command = "echo test"
config = {}
environment = {'a': 1, 'b': 2}
label = {"type": "delft3d"}
folder = ['input', 'output']
name = 'test-8172318273'
workingdir = os.path.join(os.getcwd(), 'test')
folders = [os.path.join(workingdir, f) for f in folder]
parameters = {u'test':
{u'1': u'a', u'2': u'b', 'units': 'ignoreme'}
}
mockClient.return_value.create_host_config.return_value = config
do_docker_create.delay(label, parameters, environment, name,
image, volumes, memory_limit, folders, command)
# Assert that docker is called
mockClient.return_value.create_container.assert_called_with(
image,
host_config=config,
command=command,
name=name,
environment=environment,
labels=label
)
# Assert that folders are created
listdir = os.listdir(workingdir)
for f in listdir:
self.assertIn(f, listdir)
for folder in folders:
ini = os.path.join(folder, 'input.ini')
self.assertTrue(os.path.isfile(ini))
config = configparser.SafeConfigParser()
config.readfp(open(ini))
for key in parameters.keys():
self.assertTrue(config.has_section(key))
for option, value in parameters[key].items():
if option != 'units':
self.assertTrue(config.has_option(key, option))
self.assertEqual(config.get(key, option), value)
else: # units should be ignored
self.assertFalse(config.has_option(key, option))
def _get_config(self):
config_file = os.environ.get('ZUNCLIENT_TEST_CONFIG',
DEFAULT_CONFIG_FILE)
# SafeConfigParser was deprecated in Python 3.2
if six.PY3:
config = config_parser.ConfigParser()
else:
config = config_parser.SafeConfigParser()
if not config.read(config_file):
self.skipTest('Skipping, no test config found @ %s' % config_file)
try:
auth_strategy = config.get('functional', 'auth_strategy')
except config_parser.NoOptionError:
auth_strategy = 'keystone'
if auth_strategy not in ['keystone', 'noauth']:
raise self.fail(
'Invalid auth type specified: %s in functional must be '
'one of: [keystone, noauth]' % auth_strategy)
conf_settings = []
keystone_v3_conf_settings = []
if auth_strategy == 'keystone':
conf_settings += ['os_auth_url', 'os_username',
'os_password', 'os_project_name',
'os_identity_api_version']
keystone_v3_conf_settings += ['os_user_domain_id',
'os_project_domain_id']
else:
conf_settings += ['os_auth_token', 'zun_url']
cli_flags = {}
missing = []
for c in conf_settings + keystone_v3_conf_settings:
try:
cli_flags[c] = config.get('functional', c)
except config_parser.NoOptionError:
# NOTE(vdrok): Here we ignore the absence of KS v3 options as
# v2 may be used. Keystone client will do the actual check of
# the parameters' correctness.
if c not in keystone_v3_conf_settings:
missing.append(c)
if missing:
self.fail('Missing required setting in test.conf (%(conf)s) for '
'auth_strategy=%(auth)s: %(missing)s' %
{'conf': config_file,
'auth': auth_strategy,
'missing': ','.join(missing)})
return cli_flags
def set_config(config = None):
"""
Set the configuration of SciTokens library
:param config: config may be: A full path to a ini configuration file,
A ConfigParser instance, or None, which will use all defaults.
"""
global configuration # pylint: disable=C0103
if isinstance(config, six.string_types):
configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS)
configuration.read([config])
elif isinstance(config, configparser.RawConfigParser):
configuration = config
elif config is None:
print("Using built-in defaults")
configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS)
configuration.add_section("scitokens")
else:
pass
logger = logging.getLogger("scitokens")
if configuration.has_option("scitokens", "log_file"):
log_file = configuration.get("scitokens", "log_file")
if log_file is not None:
# Create loggers with 100MB files, rotated 5 times
logger.addHandler(logging.handlers.RotatingFileHandler(log_file, maxBytes=100 * (1024*1000), backupCount=5))
else:
logger.addHandler(logging.StreamHandler())
# Set the logging
log_level = configuration.get("scitokens", "log_level")
if log_level == "DEBUG":
logger.setLevel(logging.DEBUG)
elif log_level == "INFO":
logger.setLevel(logging.INFO)
elif log_level == "WARNING":
logger.setLevel(logging.WARNING)
elif log_level == "ERROR":
logger.setLevel(logging.ERROR)
elif log_level == "CRITICAL":
logger.setLevel(logging.CRITICAL)
else:
logger.setLevel(logging.WARNING)
def benchmark(directory,
volume=BENCHMARK_VOLUME,
rw_type=BENCHMARK_RW_TYPE,
job_number=BENCHMARK_JOB_NUMBER,
thread_number=BENCHMARK_THREAD_NUMBER,
block_size=BENCHMARK_IOPS_BLOCK_SIZE,
max_seconds=BENCHMARK_MAX_SECONDS):
"""Use fio to do benchmark.
"""
result = {}
config_file = os.path.join(directory, _BENCHMARK_CONFIG_FILE)
result_file = os.path.join(directory, _BENCHMARK_RESULT_FILE)
# prepare fio config
config = configparser.SafeConfigParser()
global_section = 'global'
config.add_section(global_section)
config.set(global_section, 'group_reporting', '1')
config.set(global_section, 'unlink', '1')
config.set(global_section, 'time_based', '1')
config.set(global_section, 'direct', '1')
config.set(global_section, 'size', volume)
config.set(global_section, 'rw', rw_type)
config.set(global_section, 'numjobs', job_number)
config.set(global_section, 'iodepth', thread_number)
config.set(global_section, 'bs', block_size)
config.set(global_section, 'runtime', max_seconds)
drive_section = 'drive'
config.add_section(drive_section)
config.set(drive_section, 'directory', directory)
fs.write_safe(
config_file,
lambda f: config.write(EqualSpaceRemover(f))
)
# start fio
ret = subproc.call(
['fio', config_file, '--norandommap',
'--minimal', '--output', result_file]
)
# parse fio terse result
# http://fio.readthedocs.io/en/latest/fio_doc.html#terse-output
if ret == 0:
with io.open(result_file) as fp:
metric_list = fp.read().split(';')
result[Metrics.READ_BPS.value] = int(
float(metric_list[6]) * 1024
)
result[Metrics.READ_IOPS.value] = int(metric_list[7])
result[Metrics.WRITE_BPS.value] = int(
float(metric_list[47]) * 1024
)
result[Metrics.WRITE_IOPS.value] = int(metric_list[48])
return result
def read_settings(self):
''' Reads the settings from the profitbricks_inventory.ini file '''
if six.PY3:
config = configparser.ConfigParser()
else:
config = configparser.SafeConfigParser()
config.read(os.path.dirname(os.path.realpath(__file__)) + '/profitbricks_inventory.ini')
# Credentials
if config.has_option('profitbricks', 'subscription_user'):
self.subscription_user = config.get('profitbricks', 'subscription_user')
if config.has_option('profitbricks', 'subscription_password'):
self.subscription_password = config.get('profitbricks', 'subscription_password')
if config.has_option('profitbricks', 'subscription_password_file'):
self.subscription_password_file = config.get('profitbricks', 'subscription_password_file')
if config.has_option('profitbricks', 'api_url'):
self.api_url = config.get('profitbricks', 'api_url')
# Cache
if config.has_option('profitbricks', 'cache_path'):
self.cache_path = config.get('profitbricks', 'cache_path')
if config.has_option('profitbricks', 'cache_max_age'):
self.cache_max_age = config.getint('profitbricks', 'cache_max_age')
# Group variables
if config.has_option('profitbricks', 'vars'):
self.vars = ast.literal_eval(config.get('profitbricks', 'vars'))
# Groups
group_by_options = [
'group_by_datacenter_id',
'group_by_location',
'group_by_availability_zone',
'group_by_image_name',
'group_by_licence_type'
]
for option in group_by_options:
if config.has_option('profitbricks', option):
setattr(self, option, config.getboolean('profitbricks', option))
else:
setattr(self, option, True)
# Inventory Hostname
option = 'server_name_as_inventory_hostname'
if config.has_option('profitbricks', option):
setattr(self, option, config.getboolean('profitbricks', option))
else:
setattr(self, option, False)
def parse_config_file(self, config_file):
config = SafeConfigParser(self.DEFAULT_CONFIG)
config.readfp(config_file)
blessconfig = {
'CLIENT_CONFIG': {
'domain_regex': config.get('CLIENT', 'domain_regex'),
'cache_dir': config.get('CLIENT', 'cache_dir'),
'cache_file': config.get('CLIENT', 'cache_file'),
'mfa_cache_dir': config.get('CLIENT', 'mfa_cache_dir'),
'mfa_cache_file': config.get('CLIENT', 'mfa_cache_file'),
'ip_urls': [s.strip() for s in config.get('CLIENT', 'ip_urls').split(",")],
'update_script': config.get('CLIENT', 'update_script'),
'user_session_length': int(config.get('CLIENT', 'user_session_length')),
'usebless_role_session_length': int(config.get('CLIENT', 'usebless_role_session_length')),
},
'BLESS_CONFIG': {
'userrole': config.get('LAMBDA', 'user_role'),
'accountid': config.get('LAMBDA', 'account_id'),
'functionname': config.get('LAMBDA', 'functionname'),
'functionversion': config.get('LAMBDA', 'functionversion'),
'certlifetime': config.getint('LAMBDA', 'certlifetime'),
'ipcachelifetime': config.getint('LAMBDA', 'ipcachelifetime'),
'timeoutconfig': {
'connect': config.getint('LAMBDA', 'timeout_connect'),
'read': config.getint('LAMBDA', 'timeout_read')
}
},
'AWS_CONFIG': {
'bastion_ips': config.get('MAIN', 'bastion_ips'),
'remote_user': config.get('MAIN', 'remote_user')
},
'REGION_ALIAS': {}
}
regions = config.get('MAIN', 'region_aliases').split(",")
regions = [region.strip() for region in regions]
for region in regions:
region = region.upper()
kms_region_key = 'KMSAUTH_CONFIG_{}'.format(region)
blessconfig.update({kms_region_key: self._get_region_kms_config(region, config)})
blessconfig['REGION_ALIAS'].update({region: blessconfig[kms_region_key]['awsregion']})
return blessconfig
def update_ini_file(self):
''' Update INI file with added number of nodes '''
scriptbasename = "ocp-on-vmware"
defaults = {'vmware': {
'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename),
'master_nodes':'3',
'infra_nodes':'2',
'storage_nodes': '0',
'app_nodes':'3' }
}
# where is the config?
if six.PY3:
config = configparser.ConfigParser()
else:
config = configparser.SafeConfigParser()
vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path'])
vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path))
config.read(vmware_ini_path)
if 'app' in self.node_type:
self.app_nodes = int(self.app_nodes) + int(self.node_number)
config.set('vmware', 'app_nodes', str(self.app_nodes))
print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes))
if 'infra' in self.node_type:
self.infra_nodes = int(self.infra_nodes) + int(self.node_number)
config.set('vmware', 'infra_nodes', str(self.infra_nodes))
print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes))
if 'storage' in self.node_type:
if 'clean' in self.tag:
self.storage_nodes = int(self.storage_nodes) - int(self.node_number)
else:
self.storage_nodes = int(self.storage_nodes) + int(self.node_number)
config.set('vmware', 'storage_nodes', str(self.storage_nodes))
print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes))
for line in fileinput.input(vmware_ini_path, inplace=True):
if line.startswith("app_nodes"):
print "app_nodes=" + str(self.app_nodes)
elif line.startswith("infra_nodes"):
print "infra_nodes=" + str(self.infra_nodes)
elif line.startswith("storage_nodes"):
print "storage_nodes=" + str(self.storage_nodes)
else:
print line,
def update_ini_file(self):
''' Update INI file with added number of nodes '''
scriptbasename = "ocp-on-vmware"
defaults = {'vmware': {
'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename),
'master_nodes':'3',
'infra_nodes':'2',
'storage_nodes': '0',
'app_nodes':'3' }
}
# where is the config?
if six.PY3:
config = configparser.ConfigParser()
else:
config = configparser.SafeConfigParser()
vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path'])
vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path))
config.read(vmware_ini_path)
if 'app' in self.node_type:
self.app_nodes = int(self.app_nodes) + int(self.node_number)
config.set('vmware', 'app_nodes', str(self.app_nodes))
print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes))
if 'infra' in self.node_type:
self.infra_nodes = int(self.infra_nodes) + int(self.node_number)
config.set('vmware', 'infra_nodes', str(self.infra_nodes))
print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes))
if 'storage' in self.node_type:
if 'clean' in self.tag:
self.storage_nodes = int(self.storage_nodes) - int(self.node_number)
else:
self.storage_nodes = int(self.storage_nodes) + int(self.node_number)
config.set('vmware', 'storage_nodes', str(self.storage_nodes))
print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes))
for line in fileinput.input(vmware_ini_path, inplace=True):
if line.startswith("app_nodes"):
print "app_nodes=" + str(self.app_nodes)
elif line.startswith("infra_nodes"):
print "infra_nodes=" + str(self.infra_nodes)
elif line.startswith("storage_nodes"):
print "storage_nodes=" + str(self.storage_nodes)
else:
print line,