def load_params(fname=config_fname, param=""):
"""
Load parameters from file.
If config file doesn't exist, we ask user to build one
"""
params = {}
if not os.path.isfile(fname):
print()
params = my_setup()
if not params:
with open(fname, 'r', encoding='utf-8') as fp:
params.update(toml.load(fp))
if param:
return params[param]
else:
return params
python类load()的实例源码
def main():
config_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path.home() / '.config' / 'whereisit.toml'
with open(config_path) as f:
config = toml.load(f)
import logging
logging.getLogger('aiohttp.client').setLevel(logging.ERROR)
db_path = Path.home() / '.local' / 'share' / config['database']['path']
orm.sql_debug(config['database'].get('debug', False))
database.bind("sqlite", str(db_path), create_db=True)
database.generate_mapping(create_tables=True)
with orm.db_session():
orm.select(t for t in database.Tracking
if t.id not in list(config['trackings'].keys())).delete(bulk=True)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
with contextlib.closing(asyncio.get_event_loop()) as loop:
tracker = Tracker(loop=loop, db=database, config=config)
loop.create_task(tracker.run())
loop.run_forever()
def load(path: pathlib.Path) -> Config:
"""Load configuration from given path."""
if not path.exists():
error('File do not exists.')
if not path.is_file():
error('Given path is not file.')
if not path.match('*.config.toml'):
error('File suffix must be *.config.toml')
config = Config(copy.deepcopy(DEFAULT))
config.update(toml.load(path.open()))
return config
def cli(ctx, config_file, verbose):
"""Toolbox for Trac to GitLab migrations."""
# Read config file and update context_map
if config_file:
conf = toml.load(config_file).get('tracboat', {})
ctx.default_map.update(
{k: conf for k in ['users', 'migrate', 'export']})
# Convert verbosity to logging levels
if verbose == 1:
level = logging.INFO
elif verbose >= 2:
level = logging.DEBUG
else: # < 1
level = logging.ERROR
logging.basicConfig(level=level, format='%(levelname)-5s %(name)s: %(message)s')
# Pass configuration to subcommands
ctx.obj['verbose'] = verbose
ctx.obj['config-file'] = config_file
################################################################################
# subcommands
################################################################################
def from_file(filename):
task = default.copy()
task.update(toml.load(filename))
return task
def load_config(config_path=None):
if config_path is None:
config_path = environ.get(
'ISTHISLEGIT_CONFIG',
path.join(path.dirname(__file__), 'isthislegit.toml'),
)
with open(config_path) as f:
return toml.load(f)
def __init__(self, filename='pyproject.toml'):
project_root = find_project_root()
self.project_file = os.path.join(project_root, filename)
self.lock_file = os.path.join(project_root, '{}.lock'.format(filename))
self.setup_file = os.path.join(project_root, 'setup.py')
self.setup_is_managed = is_setup_managed(self.setup_file)
self.setup_user_section_error = None
self.setup_user_section = ''
if self.setup_is_managed:
try:
self.setup_user_section = parse_setup(self.setup_file)
except Exception as e:
self.setup_user_section_error = str(e)
self.raw = OrderedDict()
try:
with open(self.project_file) as f:
self.raw = toml.load(f, OrderedDict)
self.packages = SortedDict(self.raw.get('packages'))
self.dev_packages = SortedDict(self.raw.get('dev-packages'))
self.metadata = self.raw.get('metadata')
self.commands = self.raw.get('tool', {}).get('hatch', {}).get('commands', OrderedDict())
except (FileNotFoundError, IOError, ValueError):
self.packages = SortedDict()
self.dev_packages = SortedDict()
self.metadata = OrderedDict()
self.commands = OrderedDict()
def __init__(self):
parser = ArgumentParser(
description='CC-Server Configuration'
)
parser.add_argument(
'-f', '--config-file', dest='config_file', metavar='FILE',
help='path to a configuration FILE in TOML format'
)
parser.add_argument(
'-m', '--mongo-host', dest='mongo_host', metavar='HOST',
help='override the HOST name of the MongoDB configuration'
)
parser.add_argument(
'-p', '--mongo-port', dest='mongo_port', metavar='PORT',
help='override the PORT number of the MongoDB configuration'
)
args = parser.parse_args()
self.config_file_path = os.path.join(os.path.expanduser('~'), '.config', 'cc-server', 'config.toml')
if args.config_file:
self.config_file_path = args.config_file
with open(self.config_file_path) as f:
config = toml.load(f)
validate(config, cc_server_config_schema)
self.server_web = config['server_web']
self.server_master = config['server_master']
self.server_log = config['server_log']
self.server_files = config.get('server_files')
self.mongo = config['mongo']
self.docker = config['docker']
self.defaults = config['defaults']
if args.mongo_host:
self.mongo['host'] = args.mongo_host
if args.mongo_port:
self.mongo['port'] = int(args.mongo_port)
def read(self, stream):
"""
Read the given stream and returns it as a dict.
:param stream: The stream to read the configuration from.
:return IgnoreCaseDict: The configuration read from the stream.
"""
if stream is None:
raise ValueError('stream cannot be None')
return json.load(stream, object_pairs_hook=IgnoreCaseDict)
def read(self, stream):
"""
Read the given stream and returns it as a dict.
:param stream: The stream to read the configuration from.
:return IgnoreCaseDict: The configuration read from the stream.
"""
if stream is None:
raise ValueError('stream cannot be None')
return toml.load(stream, _dict=IgnoreCaseDict)
def read(self, stream):
"""
Read the given stream and returns it as a dict.
:param stream: The stream to read the configuration from.
:return IgnoreCaseDict: The configuration read from the stream.
"""
if stream is None:
raise ValueError('stream cannot be None')
return yaml.load(stream, Loader=self._get_loader())
def load_settings_file(self, settings_file=None):
"""
Load our settings file.
"""
if not settings_file:
settings_file = self.get_json_or_yaml_settings()
if not os.path.isfile(settings_file):
raise ClickException("Please configure your zappa_settings file or call `zappa init`.")
path, ext = os.path.splitext(settings_file)
if ext == '.yml' or ext == '.yaml':
with open(settings_file) as yaml_file:
try:
self.zappa_settings = yaml.load(yaml_file)
except ValueError: # pragma: no cover
raise ValueError("Unable to load the Zappa settings YAML. It may be malformed.")
elif ext == '.toml':
with open(settings_file) as toml_file:
try:
self.zappa_settings = toml.load(toml_file)
except ValueError: # pragma: no cover
raise ValueError("Unable to load the Zappa settings TOML. It may be malformed.")
else:
with open(settings_file) as json_file:
try:
self.zappa_settings = json.load(json_file)
except ValueError: # pragma: no cover
raise ValueError("Unable to load the Zappa settings JSON. It may be malformed.")
def load_args_from_config_file(path=None):
quiet = False
if path is None:
quiet = True
path = os.path.join(XDG_CONFIG_HOME, 'satori', 'rtm-cli.config')
result = {}
try:
try:
with open(path) as f:
fileconfig = toml.load(f)
for k, v in fileconfig.items():
print(
"From config file: {0} = {1}".format(k, v),
file=sys.stderr)
result[u'--' + k] = v
except toml.TomlDecodeError:
try:
# Just in case the config file has the format credentials.json
with open(path) as f:
fileconfig = json.load(f)
for k, v in fileconfig.items():
if k == 'auth_role_name':
k = 'role_name'
if k == 'auth_role_secret_key':
k = 'role_secret'
print(
"From config file: {0} = {1}".format(k, v),
file=sys.stderr)
result[u'--' + k] = v
except ValueError:
print(
"Invalid config file at {0}".format(path),
file=sys.stderr)
except (IOError, OSError):
if not quiet:
print(
"Couldn't read the config file at {0}".format(path),
file=sys.stderr)
return result
def read_config_options(configuration_file, gvanno_dir, logger):
## read default options
gvanno_config_options = {}
gvanno_configuration_file_default = os.path.join(gvanno_dir,'data','gvanno_configuration_default.toml')
if not os.path.exists(gvanno_configuration_file_default):
err_msg = "Default gvanno configuration file " + str(gvanno_configuration_file_default) + " does not exist - exiting"
gvanno_error_message(err_msg,logger)
try:
gvanno_config_options = toml.load(gvanno_configuration_file_default)
except IndexError,TypeError:
err_msg = 'Configuration file ' + str(configuration_file) + ' is not formatted correctly'
gvanno_error_message(err_msg, logger)
## override with options set by the users
try:
toml_options = toml.load(configuration_file)
except IndexError,TypeError:
err_msg = 'Configuration file ' + str(configuration_file) + ' is not formatted correctly'
gvanno_error_message(err_msg, logger)
#float_tags = ['maf_onekg_eur','maf_onekg_amr','maf_onekg_afr','maf_onekg_sas','maf_onekg_eas','maf_onekg_global','maf_gnomad_nfe','maf_gnomad_amr','maf_gnomad_fin','maf_gnomad_oth','maf_gnomad_afr','maf_gnomad_sas','maf_gnomad_eas','maf_gnomad_global']
boolean_tags = ['vep_skip_intergenic']
integer_tags = ['n_vcfanno_proc','n_vep_forks']
for section in ['other']:
if toml_options.has_key(section):
# for t in float_tags:
# if toml_options[section].has_key(t):
# if not isinstance(toml_options[section][t],float) and not isinstance(toml_options[section][t],int):
# err_msg = 'Configuration value ' + str(toml_options[section][t]) + ' for ' + str(t) + ' cannot be parsed properly (expecting float)'
# gvanno_error_message(err_msg, logger)
# gvanno_config_options[section][t] = toml_options[section][t]
for t in boolean_tags:
if toml_options[section].has_key(t):
if not isinstance(toml_options[section][t],bool):
err_msg = 'Configuration value ' + str(toml_options[section][t]) + ' for ' + str(t) + ' cannot be parsed properly (expecting true/false)'
gvanno_error_message(err_msg, logger)
gvanno_config_options[section][t] = int(toml_options[section][t])
for t in integer_tags:
if toml_options[section].has_key(t):
if not isinstance(toml_options[section][t],int):
err_msg = 'Configuration value ' + str(toml_options[section][t]) + ' for ' + str(t) + ' cannot be parsed properly (expecting integer)'
gvanno_error_message(err_msg, logger)
gvanno_config_options[section][t] = toml_options[section][t]
# for t in float_tags:
# if t.startswith('maf_'):
# if gvanno_config_options['variant_filter'][t] < 0 or gvanno_config_options['variant_filter'][t] > 1:
# err_msg = "MAF value: " + str(t) + " must be within the [0,1] range, current value is " + str(gvanno_config_options['variant_filter'][t]) + ")"
# gvanno_error_message(err_msg,logger)
return gvanno_config_options
def main():
# set default parameters:
default_port = 8080
default_conf_file = './conf.toml'
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--conf",
help='Configuration file [%(default)s]',
default=default_conf_file)
parser.add_argument("-p", "--port",
help='Local port to listen to [%(default)s]',
default=default_port,
type=int)
args = parser.parse_args()
conf_file = args.conf
port = args.port
conf = toml.load(conf_file)
token = conf['global']['token']
chat_ids = conf['global']['chat_ids']
bot = telegram.Bot(token)
episodes_q = Queue()
tg_q = Queue()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
crud = CRUDListener(episodes_q)
eei = EnhanceEpisodeInfo(episodes_q, tg_q, logger)
cons = SendTelegrams(tg_q, bot, chat_ids, logger)
cherry = CherrypyWrapper(crud, logger, port)
SignalHandler([eei, cons, cherry], logger)
threads = [threading.Thread(target=cons.run),
threading.Thread(target=eei.run),
threading.Thread(target=cherry.run)]
for i in threads:
i.start()
for i in threads:
i.join()
logger.info('finished')