def main(args=None):
args = parse_args(args)
if not os.path.exists(args.aws_credentials):
print("%s does not exist. Please run 'aws configure' or specify an "
"alternate credentials file with --aws-credentials."
% args.aws_credentials, file=sys.stderr)
return USER_RECOVERABLE_ERROR
if PY2:
credentials = configparser.ConfigParser()
else:
credentials = configparser.ConfigParser(default_section=None)
credentials.read(args.aws_credentials)
err = one_mfa(args, credentials)
if err != OK:
return err
if args.rotate_identity_keys:
err = rotate(args, credentials)
if err != OK:
return err
if args.env:
print_env_vars(credentials, args.target_profile)
return OK
python类ConfigParser()的实例源码
def _parse_config(self, domain_name):
config_path = os.path.join(utils.CONFIG_PATH, domain_name, 'config')
if not os.path.exists(config_path):
raise exception.DomainNotFound(domain=domain_name)
config = configparser.ConfigParser()
config.read(config_path)
bmc = {}
for item in ('username', 'password', 'address', 'domain_name',
'libvirt_uri', 'libvirt_sasl_username',
'libvirt_sasl_password'):
try:
value = config.get(DEFAULT_SECTION, item)
except configparser.NoOptionError:
value = None
bmc[item] = value
# Port needs to be int
bmc['port'] = config.getint(DEFAULT_SECTION, 'port')
return bmc
def _get_config(self):
config_file = os.environ.get('ECS_TEST_CONFIG_FILE',
os.path.join(os.getcwd(), "tests/test.conf"))
config = configparser.ConfigParser()
config.read(config_file)
self.config = config
if config.has_section('func_test'):
self.token_endpoint = config.get('func_test', 'token_endpoint')
self.ecs_endpoint = config.get('func_test', 'ecs_endpoint')
self.username = config.get('func_test', 'username')
self.password = config.get('func_test', 'password')
self.api_version = config.get('func_test', 'api_version')
license_file = config.get('func_test', 'license_file')
with open(license_file) as f:
self.license_text = f.read()
else:
self.skip_tests = True
def __init__(self):
self.config = configparser.ConfigParser({
'firsttime' : 'yes',
'style' : 'default'
})
self.config.add_section('Help Files')
self.config.add_section('Layout')
self.config.set('Help Files', 'command', 'help_dump.json')
self.config.set('Help Files', 'history', 'history.txt')
self.config.set('Layout', 'command_description', 'yes')
self.config.set('Layout', 'param_description', 'yes')
self.config.set('Layout', 'examples', 'yes')
azure_folder = get_config_dir()
if not os.path.exists(azure_folder):
os.makedirs(azure_folder)
if not os.path.exists(os.path.join(get_config_dir(), CONFIG_FILE_NAME)):
with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'w') as config_file:
self.config.write(config_file)
else:
with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'r') as config_file:
self.config.readfp(config_file) # pylint: disable=deprecated-method
self.update()
def _parse_config(self, domain_name):
config_path = os.path.join(self.config_dir, domain_name, 'config')
if not os.path.exists(config_path):
raise exception.DomainNotFound(domain=domain_name)
config = configparser.ConfigParser()
config.read(config_path)
bmc = {}
for item in ('username', 'password', 'address', 'domain_name',
'libvirt_uri', 'libvirt_sasl_username',
'libvirt_sasl_password'):
try:
value = config.get(DEFAULT_SECTION, item)
except configparser.NoOptionError:
value = None
bmc[item] = value
# Port needs to be int
bmc['port'] = config.getint(DEFAULT_SECTION, 'port')
return bmc
def read_config_file(self, file="./config.ini"):
#read config file
Config = configparser.ConfigParser()
try:
Config.read(file)
except Exception as e:
self.log.warn("Error reading config file %s" %e)
self.log.info("No Roomba specified, and no config file found - "
"attempting discovery")
if Password(self.address, file):
return self.read_config_file(file)
else:
return False
self.log.info("reading info from config file %s" % file)
addresses = Config.sections()
if self.address is None:
if len(addresses) > 1:
self.log.warn("config file has entries for %d Roombas, "
"only configuring the first!")
self.address = addresses[0]
self.blid = Config.get(self.address, "blid"),
self.password = Config.get(self.address, "password")
# self.roombaName = literal_eval(
# Config.get(self.address, "data"))["robotname"]
return True
def read_config_file(self, file="./config.ini"):
#read config file
Config = configparser.ConfigParser()
try:
Config.read(file)
except Exception as e:
self.log.warn("Error reading config file %s" %e)
self.log.info("No Roomba specified, and no config file found - "
"attempting discovery")
if Password(self.address, file):
return self.read_config_file(file)
else:
return False
self.log.info("reading info from config file %s" % file)
addresses = Config.sections()
if self.address is None:
if len(addresses) > 1:
self.log.warn("config file has entries for %d Roombas, "
"only configuring the first!")
self.address = addresses[0]
self.blid = Config.get(self.address, "blid"),
self.password = Config.get(self.address, "password")
# self.roombaName = literal_eval(
# Config.get(self.address, "data"))["robotname"]
return True
def read_string(self, string, source='<string>'):
"""
Read configuration from a string.
A backwards-compatible version of the ConfigParser.read_string()
method that was introduced in Python 3.
"""
# Python 3 added read_string() method
if six.PY3:
ConfigParser.read_string(self, string, source=source)
# Python 2 requires StringIO buffer
else:
import StringIO
self.readfp(StringIO.StringIO(string))
def main():
args = get_options()
config = ConfigParser.ConfigParser()
config.read(args.conffile)
if config.has_option('default', 'pidfile'):
pid_fn = os.path.expanduser(config.get('default', 'pidfile'))
else:
pid_fn = '/var/run/germqtt.pid'
if args.foreground:
_main(args, config)
else:
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
with daemon.DaemonContext(pidfile=pid):
_main(args, config)
def main(argv):
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
if os.environ.get('tools_path'):
root = os.environ['tools_path']
venv = os.path.join(root, '.venv')
if os.environ.get('venv'):
venv = os.environ['venv']
pip_requires = os.path.join(root, 'requirements.txt')
test_requires = os.path.join(root, 'test-requirements.txt')
py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
setup_cfg = configparser.ConfigParser()
setup_cfg.read('setup.cfg')
project = setup_cfg.get('metadata', 'name')
install = install_venv.InstallVenv(
root, venv, pip_requires, test_requires, py_version, project)
options = install.parse_args(argv)
install.check_python_version()
install.check_dependencies()
install.create_virtualenv(no_site_packages=options.no_site_packages)
install.install_dependencies()
install.post_process()
print_help(project, venv, root)
def _read_config_file(path=None):
"""
Reads config file.
First look for config file in the current directory, then in the
user's home directory, then in the same directory as this file.
Tries to find config file both with and without preceeding 'dot'
for hidden files (prefer non-hidden).
"""
cfg = configparser.ConfigParser()
if path is None: # pragma: no cover
dirs = [os.curdir, os.path.expanduser('~'),
os.path.dirname(os.path.realpath(__file__))]
locations = map(os.path.abspath, dirs)
for loc in locations:
if cfg.read(os.path.join(loc, 'gpflowrc')):
break
if cfg.read(os.path.join(loc, '.gpflowrc')):
break
else:
if not cfg.read(path):
raise RuntimeError("Config at '{0}' cannot be read".format(path))
return cfg
def __init__(self, config_file, workspace_dir):
if not os.path.isfile(config_file):
raise Exception("Error: file {} does not "
"exist.".format(config_file))
config = configparser.ConfigParser()
config.read(config_file)
for section in config.sections():
section_options = self.get_config_options_for_section(config,
section)
self.validate_config_options_for_section(section_options, section)
if section == RUNWAY_CONFIG_SECTION:
self.runway_options = section_options
else:
self.sections.append(section)
self.components_options[section] = section_options
self.workspace_dir = workspace_dir
def parse_config(cfg, ext_config={}):
config = {}
try:
options = cfg.options('pypdns')
except ConfigParser.NoSectionError:
log.debug('No config files provided and default not present')
options = []
for option in options:
config[option] = cfg.get('pypdns', option)
for ckey, cvalue in iteritems(DEFAULT_CONFIG):
if ckey not in config:
config[ckey] = cvalue
for k in ('endpoint', 'apikey'):
if ext_config.get(k) is not None:
config[k] = ext_config[k]
assert (config.get('endpoint') is not None and
config.get('apikey') is not None), 'Configuration not found'
return config
def __init__(self):
self.config = configparser.ConfigParser({
'firsttime': 'yes',
'style': 'default'
})
self.config.add_section('Help Files')
self.config.add_section('Layout')
self.config.set('Help Files', 'command', 'help_dump.json')
self.config.set('Help Files', 'history', 'history.txt')
self.config.set('Help Files', 'frequency', 'frequency.json')
self.config.set('Layout', 'command_description', 'yes')
self.config.set('Layout', 'param_description', 'yes')
self.config.set('Layout', 'examples', 'yes')
azure_folder = get_config_dir()
if not os.path.exists(azure_folder):
os.makedirs(azure_folder)
if not os.path.exists(os.path.join(get_config_dir(), CONFIG_FILE_NAME)):
with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'w') as config_file:
self.config.write(config_file)
else:
with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'r') as config_file:
self.config.readfp(config_file) # pylint: disable=deprecated-method
self.update()
def __init__(self, args):
self.args = args
self.image_width = 256
self.image_height = 256
self.color_map = None
self.dimension_colors = None
self.dimension_labels = None
logger = get_logger()
if self.args.color_map:
color_map_config = ConfigParser()
with FileIO(self.args.color_map, 'r') as config_f:
color_map_config.readfp(config_f)
self.color_map = parse_color_map_from_configparser(color_map_config)
color_label_map = {
(int(k), int(k), int(k)): v
for k, v in color_map_config.items('color_labels')
}
sorted_keys = sorted(six.iterkeys(self.color_map))
self.dimension_colors = [self.color_map[k] for k in sorted_keys]
self.dimension_labels = [color_label_map.get(k) for k in sorted_keys]
logger.debug("dimension_colors: %s", self.dimension_colors)
logger.debug("dimension_labels: %s", self.dimension_labels)
def parse_color_map(f):
color_map_config = ConfigParser()
color_map_config.readfp(f)
num_pattern = re.compile(r'(\d+)')
rgb_pattern = re.compile(r'\((\d+),(\d+),(\d+)\)')
def parse_color(s):
m = num_pattern.match(s)
if m:
x = int(m.group(1))
return (x, x, x)
else:
m = rgb_pattern.match(s)
if m:
return (int(m.group(1)), int(m.group(2)), int(m.group(3)))
raise Exception('invalid color value: {}'.format(s))
color_map = dict()
for k, v in color_map_config.items('color_map'):
color_map[parse_color(k)] = parse_color(v)
return color_map
def remove_all_auto_profiles(filePath):
"""
remove all profiles from the credentials file that contain 'auto-refresh-'
"""
log.info('Removing all autoAwsume profiles')
#remove the auto-awsume profiles from the credentials file
autoAwsumeProfileParser = ConfigParser.ConfigParser()
autoAwsumeProfileParser.read(filePath)
#scan all profiles to find auto-refresh profiles
for profile in autoAwsumeProfileParser._sections:
if 'auto-refresh-' in profile:
log.debug('Removing profile ' + profile + ' from credentials file')
autoAwsumeProfileParser.remove_section(profile)
#save changes
autoAwsumeProfileParser.write(open(filePath, 'w'))
def _get_config_object(config_path, use_cashed_config=True):
'''
Returns a ConfigParser for the config file at the given path. If no file exists, an empty config file is created.
:param config_path:
:param use_cashed_config: If set to True, will return the previously created ConfigParser file (if previously created).
If set to False, will re-read the config file from disk. If a ConfigParser was previously created, it will not be replaced!
:return:
'''
if config_path not in _CONFIG_OBJECTS or not use_cashed_config:
config = ConfigParser()
if not os.path.exists(config_path):
with open(config_path,'w') as f:
config.write(f)
else:
config.read(config_path)
if use_cashed_config:
_CONFIG_OBJECTS[config_path] = config
else:
config = _CONFIG_OBJECTS[config_path]
return config
def init_bridge():
"""Parse the configuration file and set relevant variables."""
conf_path = os.path.abspath(os.getenv('WAT_CONF', ''))
if not conf_path or not os.path.isfile(conf_path):
sys.exit('Could not find configuration file')
parser = configparser.ConfigParser()
parser.read(conf_path)
# Whatsapp settings
SETTINGS['wa_phone'] = parser.get('wa', 'phone')
SETTINGS['wa_password'] = parser.get('wa', 'password')
# Telegram settings
SETTINGS['owner'] = parser.getint('tg', 'owner')
SETTINGS['tg_token'] = parser.get('tg', 'token')
# TinyDB
global DB
DB = TinyDB(parser.get('db', 'path'))
DB.table_class = SmartCacheTable
def _get_config(self, unit, filename):
"""Get a ConfigParser object for parsing a unit's config file."""
file_contents = unit.file_contents(filename)
# NOTE(beisner): by default, ConfigParser does not handle options
# with no value, such as the flags used in the mysql my.cnf file.
# https://bugs.python.org/issue7005
config = configparser.ConfigParser(allow_no_value=True)
config.readfp(io.StringIO(file_contents))
return config
def _get_config(self, unit, filename):
"""Get a ConfigParser object for parsing a unit's config file."""
file_contents = unit.file_contents(filename)
# NOTE(beisner): by default, ConfigParser does not handle options
# with no value, such as the flags used in the mysql my.cnf file.
# https://bugs.python.org/issue7005
config = configparser.ConfigParser(allow_no_value=True)
config.readfp(io.StringIO(file_contents))
return config
def getConfig():
'''
Retrieve skdaccess configuration
@return configParser.ConfigParser object of configuration
'''
config_location = os.path.join(os.path.expanduser('~'), '.skdaccess.conf')
conf = configparser.ConfigParser()
conf.read(config_location)
return conf
def writeConfig(conf):
'''
Write config to disk
@param conf: configparser.ConfigParser object
'''
config_location = os.path.join(os.path.expanduser('~'), '.skdaccess.conf')
config_handle = open(config_location, "w")
conf.write(config_handle)
config_handle.close()
def update_credentials_file(filename, target_profile, source_profile,
credentials, new_access_key):
if target_profile != source_profile:
credentials.remove_section(target_profile)
# Hack: Python 2's implementation of ConfigParser rejects new sections
# named 'default'.
if PY2 and target_profile == 'default':
# noinspection PyProtectedMember
credentials._sections[
target_profile] = configparser._default_dict()
else:
credentials.add_section(target_profile)
for k, v in credentials.items(source_profile):
credentials.set(target_profile, k, v)
credentials.set(target_profile, 'aws_access_key_id',
new_access_key['AccessKeyId'])
credentials.set(target_profile, 'aws_secret_access_key',
new_access_key['SecretAccessKey'])
if 'SessionToken' in new_access_key:
credentials.set(target_profile, 'aws_session_token',
new_access_key['SessionToken'])
credentials.set(target_profile, 'awsmfa_expiration',
new_access_key['Expiration'].isoformat())
else:
credentials.remove_option(target_profile, 'aws_session_token')
credentials.remove_option(target_profile, 'awsmfa_expiration')
temp_credentials_file = filename + ".tmp"
with open(temp_credentials_file, "w") as out:
credentials.write(out)
try:
os.rename(temp_credentials_file, filename)
except WindowsError as E:
os.remove(filename)
os.rename(temp_credentials_file, filename)
def add(self, username, password, port, address, domain_name, libvirt_uri,
libvirt_sasl_username, libvirt_sasl_password):
# check libvirt's connection and if domain exist prior to adding it
utils.check_libvirt_connection_and_domain(
libvirt_uri, domain_name,
sasl_username=libvirt_sasl_username,
sasl_password=libvirt_sasl_password)
domain_path = os.path.join(utils.CONFIG_PATH, domain_name)
try:
os.makedirs(domain_path)
except OSError as e:
if e.errno == errno.EEXIST:
sys.exit('Domain %s already exist' % domain_name)
config_path = os.path.join(domain_path, 'config')
with open(config_path, 'w') as f:
config = configparser.ConfigParser()
config.add_section(DEFAULT_SECTION)
config.set(DEFAULT_SECTION, 'username', username)
config.set(DEFAULT_SECTION, 'password', password)
config.set(DEFAULT_SECTION, 'port', port)
config.set(DEFAULT_SECTION, 'address', address)
config.set(DEFAULT_SECTION, 'domain_name', domain_name)
config.set(DEFAULT_SECTION, 'libvirt_uri', libvirt_uri)
if libvirt_sasl_username and libvirt_sasl_password:
config.set(DEFAULT_SECTION, 'libvirt_sasl_username',
libvirt_sasl_username)
config.set(DEFAULT_SECTION, 'libvirt_sasl_password',
libvirt_sasl_password)
config.write(f)
def __init__(self):
config = configparser.ConfigParser()
config.read(CONFIG_FILE)
self._conf_dict = self._as_dict(config)
self._validate()
def setUpModule():
# Backup local config file if it exists
if os.path.exists(local_cfg):
os.rename(local_cfg, local_cfg + '.bak')
# Re-read the configs
girder_worker.config = ConfigParser()
_cfgs = ('worker.dist.cfg', 'worker.local.cfg')
girder_worker.config.read(
[os.path.join(girder_worker.PACKAGE_DIR, f) for f in _cfgs])
def _get_config(self, unit, filename):
"""Get a ConfigParser object for parsing a unit's config file."""
file_contents = unit.file_contents(filename)
# NOTE(beisner): by default, ConfigParser does not handle options
# with no value, such as the flags used in the mysql my.cnf file.
# https://bugs.python.org/issue7005
config = configparser.ConfigParser(allow_no_value=True)
config.readfp(io.StringIO(file_contents))
return config
def _get_config(self, unit, filename):
"""Get a ConfigParser object for parsing a unit's config file."""
file_contents = unit.file_contents(filename)
# NOTE(beisner): by default, ConfigParser does not handle options
# with no value, such as the flags used in the mysql my.cnf file.
# https://bugs.python.org/issue7005
config = configparser.ConfigParser(allow_no_value=True)
config.readfp(io.StringIO(file_contents))
return config
def _parse_options(self):
cp = configparser.ConfigParser()
cp.add_section('testoptions')
try:
cp.read(self.option_file)
except NoFileError:
pass
for name, value in cp.items('testoptions'):
conv = self._CONVERTERS.get(name, lambda v: v)
self.options[name] = conv(value)