def parse_front_matter(lines):
"""
Parse lines of front matter
"""
if not lines: return "toml", {}
if lines[0] == "{":
# JSON
import json
return "json", json.loads("\n".join(lines))
if lines[0] == "+++":
# TOML
import toml
return "toml", toml.loads("\n".join(lines[1:-1]))
if lines[0] == "---":
# YAML
import yaml
return "yaml", yaml.load("\n".join(lines[1:-1]), Loader=yaml.CLoader)
return {}
python类loads()的实例源码
def getConfigDict(configPath):
configDict = None
try:
configRaw = open(configPath, "rb").read()
except IOError:
print("ERROR: I/O fatal error.", file = sys.stderr)
sys.exit(1)
if configRaw.startswith(codecs.BOM_UTF8):
configRaw = configRaw[3:]
try:
configDict = toml.loads(configRaw)
except toml.TomlDecodeError as tomlExp:
for string in tomlExp.args:
print("ERROR: Invalid TOML syntax. " + string, file = sys.stderr)
sys.exit(1)
except TypeError as typeExp:
for string in typeExp.args:
print("ERROR: Invalid config file. " + string, file = sys.stderr)
sys.exit(1)
except:
print("ERROR: Invalid config file. Please make sure it is UTF-8 encoded and complies TOML specification.", file = sys.stderr)
print("Please review TOML specification at: https://github.com/toml-lang/toml", file = sys.stderr)
sys.exit(1)
return configDict
def load_from_path(path, mutable=False):
"""Loads a TOML file from the path
:param path: Path to the TOML file
:type path: str
:param mutable: True if the returned Toml object should be mutable
:type mutable: boolean
:returns: Map for the configuration file
:rtype: Toml | MutableToml
"""
util.ensure_file_exists(path)
util.enforce_file_permissions(path)
with util.open_file(path, 'r') as config_file:
try:
toml_obj = toml.loads(config_file.read())
except Exception as e:
raise DCOSException(
'Error parsing config file at [{}]: {}'.format(path, e))
return (MutableToml if mutable else Toml)(toml_obj)
def read_config(args):
""" Read configuration options from ~/.shakedown (if exists)
:param args: a dict of arguments
:type args: dict
:return: a dict of arguments
:rtype: dict
"""
configfile = os.path.expanduser('~/.shakedown')
if os.path.isfile(configfile):
with open(configfile, 'r') as f:
config = toml.loads(f.read())
for key in config:
param = key.replace('-', '_')
if not param in args or args[param] in [False, None]:
args[param] = config[key]
return args
def _get_log_config(filename=None):
"""Determines if there is a log config in the config directory
and returns it. If it does not exist, return None.
Arguments:
filename (str): The name of the logging config specific to the
transaction processor that is being started.
Returns:
log_config (dict): The dictionary to pass to logging.config.dictConfig
"""
if filename is not None:
conf_file = os.path.join(get_config_dir(), filename)
if os.path.exists(conf_file):
with open(conf_file) as fd:
raw_config = fd.read()
if filename.endswith(".yaml"):
log_config = yaml.safe_load(raw_config)
else:
log_config = toml.loads(raw_config)
return log_config
return None
def _get_processor_config(filename=None):
"""Determines if there is a proccesor config in the config directory
and returns it. If it does not exist, return None.
Arguments:
filename (str): The name of the processor config specific to the
transaction processor that is being started.
Returns:
processor_config (dict): The dictionary to set transaction processor
"""
if filename is not None:
conf_file = os.path.join(get_config_dir(), filename)
if os.path.exists(conf_file):
with open(conf_file) as fd:
raw_config = fd.read()
log_config = toml.loads(raw_config)
return log_config
return None
def _load_toml_cli_config(filename=None):
if filename is None:
filename = os.path.join(
_get_config_dir(),
'cli.toml')
if not os.path.exists(filename):
LOGGER.info(
"Skipping CLI config loading from non-existent config file: %s",
filename)
return {}
LOGGER.info("Loading CLI information from config: %s", filename)
try:
with open(filename) as fd:
raw_config = fd.read()
except IOError as e:
raise CliConfigurationError(
"Unable to load CLI configuration file: {}".format(str(e)))
return toml.loads(raw_config)
def configure(args):
opts = parse_args(args)
config = {}
if opts["config"] is None:
config.update(toml.loads(open(opts["config"]).read()))
opts = {key: value for key, value in opts.items()
if value is not None}
config.update(opts)
if config["Verbose"]:
print("Configuration:")
PP.pprint(config)
return config
def _get_config():
"""Determines if there is a log config in the config directory
and returns it. If it does not exist, return None.
Returns:
log_config (dict): The dictionary to pass to logging.config.dictConfig
"""
conf_file = os.path.join(_get_config_dir(), 'log_config.toml')
if os.path.exists(conf_file):
with open(conf_file) as fd:
raw_config = fd.read()
log_config = toml.loads(raw_config)
return log_config
conf_file = os.path.join(_get_config_dir(), 'log_config.yaml')
if os.path.exists(conf_file):
with open(conf_file) as fd:
raw_config = fd.read()
log_config = yaml.safe_load(raw_config)
return log_config
return None
def load_from_path(path, mutable=False):
"""Loads a TOML file from the path
:param path: Path to the TOML file
:type path: str
:param mutable: True if the returned Toml object should be mutable
:type mutable: boolean
:returns: Map for the configuration file
:rtype: Toml | MutableToml
"""
util.ensure_file_exists(path)
with util.open_file(path, 'r') as config_file:
try:
toml_obj = toml.loads(config_file.read())
except Exception as e:
raise DCOSException(
'Error parsing config file at [{}]: {}'.format(path, e))
return (MutableToml if mutable else Toml)(toml_obj)
def get_config_schema(command):
"""
:param command: the subcommand name
:type command: str
:returns: the subcommand's configuration schema
:rtype: dict
"""
# core.* config variables are special. They're valid, but don't
# correspond to any particular subcommand, so we must handle them
# separately.
if command == "core":
return json.loads(
pkg_resources.resource_string(
'dcos',
'data/config-schema/core.json').decode('utf-8'))
executable = subcommand.command_executables(command)
return subcommand.config_schema(executable, command)
def loadConfig(self):
"""Read in TOML-structured config file."""
self.sanityCheckConfigFile()
with open(self.configFilenameAbs) as f:
config = toml.loads(f.read())
if self.verbose:
print("# Loaded config file {}".format(self.configFilenameAbs))
self.config = config
def __init__(self, file):
with open(file) as conffile:
config = toml.loads(conffile.read())
# Sections of the config file
self.general = config['general']
self.katana = config['katana']
self.files = config['files']
self.lcd = config['lcd']
self.leds = config['leds']
self.buttons = config['buttons']
def read(self, config_file):
config = {}
with open(config_file) as conffile:
config = toml.loads(conffile.read())
return config
def read(self, config_file):
config = {}
with open(config_file) as conffile:
config = json.loads(conffile.read())
return config
def get_config():
# Read config parameters from a TOML file.
config = None
config_file_path = sys.argv[1]
with open(config_file_path) as config_file:
config = toml.loads(config_file.read())
return config
def load_metadata_from_manifest(section):
with open('Cargo.toml', 'rt') as manifest_file:
manifest = toml.loads(manifest_file.read())
return (manifest
.get('package', {})
.get('metadata', {})
.get(section, {})
)
def get_conf(conf_file_path):
"""read toml conf file for latter use.
:param conf_file_path: absolute path of conf file.
:return:a dict contains configured infomation.
"""
if version_info[0] == 3:
with open(conf_file_path, encoding='utf-8') as conf_file:
config = toml.loads(conf_file.read())
else:
with open(conf_file_path) as conf_file:
config = toml.loads(conf_file.read())
return config
def _load_toml(self, event):
"""Loads TOML if necessary"""
return toml.loads(event["toml"]) if "toml" in event else event
def config_file_as_dict(**kwargs):
cfgfile = kwargs["cfgfile"]
cfgfile_contents = templated_file_contents(kwargs, kwargs["cfgfile"])
# print (cfgfile_contents)
cfg_data = {}
if cfgfile.endswith(".json"):
cfgdata = json.loads(cfgfile_contents)
elif cfgfile.endswith(".toml"):
cfgdata = toml.loads(cfgfile_contents)
elif cfgfile.endswith(".yaml"):
yaml.add_constructor('!join', join) # http://stackoverflow.com/questions/5484016/how-can-i-do-string-concatenation-or-string-replacement-in-yaml
cfgdata = yaml.load(cfgfile_contents)
else:
raise ValueError("Invalid config file format")
return merge_two_dicts(kwargs, cfgdata)
def test_should_generate_toml_frontmatter(self, pypandocMock, blogObjClass):
item = MainTests.posts["items"][0]
fm = getFrontMatter(item)
fmObj = toml.loads(fm)
assert fmObj["title"] == 'title'
assert fmObj["id"] == '100'
assert fmObj["aliases"][0] == 'url'
def get_config_schema(command):
"""
:param command: the subcommand name
:type command: str
:returns: the subcommand's configuration schema
:rtype: dict
"""
# import here to avoid circular import
from dcos.subcommand import (
command_executables, config_schema, default_subcommands)
# core.* config variables are special. They're valid, but don't
# correspond to any particular subcommand, so we must handle them
# separately.
if command == "core":
return json.loads(
pkg_resources.resource_string(
'dcos',
'data/config-schema/core.json').decode('utf-8'))
elif command in default_subcommands():
return json.loads(
pkg_resources.resource_string(
'dcos',
'data/config-schema/{}.json'.format(command)).decode('utf-8'))
else:
executable = command_executables(command)
return config_schema(executable, command)
def parse(self):
# Open the Pipfile.
with open(self.filename) as f:
content = f.read()
# Load the default configuration.
default_config = {
u'source': [{u'url': u'https://pypi.python.org/simple', u'verify_ssl': True, 'name': "pypi"}],
u'packages': {},
u'requires': {},
u'dev-packages': {}
}
config = {}
config.update(default_config)
# Load the Pipfile's configuration.
config.update(toml.loads(content))
# Structure the data for output.
data = {
'_meta': {
'sources': config['source'],
'requires': config['requires']
},
}
# TODO: Validate given data here.
self.groups['default'] = config['packages']
self.groups['develop'] = config['dev-packages']
# Update the data structure with group information.
data.update(self.groups)
return data
def parsed_pipfile(self):
# Open the pipfile, read it into memory.
with open(self.pipfile_location) as f:
contents = f.read()
# If any outline tables are present...
if ('[packages.' in contents) or ('[dev-packages.' in contents):
data = toml.loads(contents)
# Convert all outline tables to inline tables.
for section in ('packages', 'dev-packages'):
for package in data.get(section, {}):
# Convert things to inline tables — fancy :)
if hasattr(data[section][package], 'keys'):
_data = data[section][package]
data[section][package] = toml._get_empty_inline_table(dict)
data[section][package].update(_data)
# We lose comments here, but it's for the best.)
try:
return contoml.loads(toml.dumps(data, preserve=True))
except RuntimeError:
return toml.loads(toml.dumps(data, preserve=True))
else:
# Fallback to toml parser, for large files.
try:
return contoml.loads(contents)
except Exception:
return toml.loads(contents)
def _lockfile(self):
"""Pipfile.lock divided by PyPI and external dependencies."""
pfile = pipfile.load(self.pipfile_location)
lockfile = json.loads(pfile.lock())
for section in ('default', 'develop'):
lock_section = lockfile.get(section, {})
for key in list(lock_section.keys()):
norm_key = pep423_name(key)
lockfile[section][norm_key] = lock_section.pop(key)
return lockfile
def __init__(self, path, builder=Builder()):
self._path = path
self._dir = os.path.realpath(os.path.dirname(path))
self._builder = builder
self._git_config = None
self._name = None
self._version = None
self._description = None
self._authors = []
self._homepage = None
self._repository = None
self._keywords = []
self._python_versions = []
self._dependencies = []
self._dev_dependencies = []
self._pip_dependencies = []
self._pip_dev_dependencies = []
self._features = {}
self._scripts = {}
self._entry_points = {}
self._license = None
self._readme = None
self._include = []
self._exclude = []
self._extensions = {}
with open(self._path) as f:
self._config = toml.loads(f.read())
self.load()
def load_file(self, file):
with open(file) as conffile:
d = toml.loads(conffile.read())
self.i_rate = 1 + d.get('inflation', 0) / 100 # inflation rate: 2.5 -> 1.025
self.r_rate = 1 + d.get('returns', 6) / 100 # invest rate: 6 -> 1.06
self.startage = d['startage']
self.endage = d.get('endage', max(96, self.startage+5))
if 'prep' in d:
self.workyr = d['prep']['workyears']
self.maxsave = d['prep']['maxsave']
self.worktax = 1 + d['prep'].get('tax_rate', 25)/100
else:
self.workyr = 0
self.retireage = self.startage + self.workyr
self.numyr = self.endage - self.retireage
self.aftertax = d.get('aftertax', {'bal': 0})
if 'basis' not in self.aftertax:
self.aftertax['basis'] = 0
self.IRA = d.get('IRA', {'bal': 0})
if 'maxcontrib' not in self.IRA:
self.IRA['maxcontrib'] = 18000 + 5500*2
self.roth = d.get('roth', {'bal': 0});
if 'maxcontrib' not in self.roth:
self.roth['maxcontrib'] = 5500*2
self.parse_expenses(d)
self.sepp_end = max(5, 59-self.retireage) # first year you can spend IRA reserved for SEPP
self.sepp_ratio = 25 # money per-year from SEPP (bal/ratio)
def check_json(filename, contents):
if not filename.endswith(".json"):
raise StopIteration
try:
json.loads(contents, object_pairs_hook=check_json_requirements(filename))
except ValueError as e:
match = re.search(r"line (\d+) ", e.message)
line_no = match and match.group(1)
yield (line_no, e.message)
except KeyError as e:
yield (None, e.message)
def test_normal(self, capsys, table_name, header, value, expected):
writer = table_writer_class()
writer.table_name = table_name
writer.header_list = header
writer.value_matrix = value
writer.write_table()
out, _err = capsys.readouterr()
print("[expected]\n{}".format(expected))
print("[actual]\n{}".format(out))
assert toml.loads(out) == toml.loads(expected)
def _loads(content, fmt=None):
if fmt == 'toml':
return toml.loads(content)
elif fmt == 'json':
return json.loads(content, object_hook=json_util.object_hook)
elif fmt == 'python':
return ast.literal_eval(content)
elif fmt == 'pickle':
return pickle.loads(content)
else:
return content