def parse_toml(tml_profile):
with open(tml_profile) as conf:
config = toml.loads(conf.read())
return config
# read_policy reads the resource policy (js,css,img etc.) from the conf dict
python类loads()的实例源码
def build_inheritance(current_fnm):
with open(current_fnm) as f:
current_dict = toml.loads(f.read())
if 'inherits' in current_dict.keys():
config_dir = os.path.dirname(current_fnm)
inherits_fnm = os.path.join(config_dir, current_dict['inherits'])
parent_dict = build_inheritance(inherits_fnm)
current_dict = update_recursively(parent_dict, current_dict)
return current_dict
def _get_dir(toml_config_setting,
sawtooth_home_dir,
windows_dir,
default_dir):
"""Determines the directory path based on configuration.
Arguments:
toml_config_setting (str): The name of the config setting related
to the directory which will appear in path.toml.
sawtooth_home_dir (str): The directory under the SAWTOOTH_HOME
environment variable. For example, for 'data' if the data
directory is $SAWTOOTH_HOME/data.
windows_dir (str): The windows path relative to the computed base
directory.
default_dir (str): The default path on Linux.
Returns:
directory (str): The path.
"""
conf_file = os.path.join(get_config_dir(), 'path.toml')
if os.path.exists(conf_file):
with open(conf_file) as fd:
raw_config = fd.read()
toml_config = toml.loads(raw_config)
if toml_config_setting in toml_config:
return toml_config[toml_config_setting]
if 'SAWTOOTH_HOME' in os.environ:
return os.path.join(os.environ['SAWTOOTH_HOME'], sawtooth_home_dir)
if os.name == 'nt':
base_dir = \
os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))
return os.path.join(base_dir, windows_dir)
return default_dir
def initialize(cls, config_dir, data_dir):
# See if our configuration file exists. If so, then we are going to
# see if there is a configuration value for the validator ID. If so,
# then we'll use that when constructing the simulated anti-Sybil ID.
# Otherwise, we are going to fall back on trying to create one that is
# unique.
validator_id = datetime.datetime.now().isoformat()
config_file = os.path.join(config_dir, 'poet_enclave_simulator.toml')
if os.path.exists(config_file):
LOGGER.info(
'Loading PoET enclave simulator config from : %s',
config_file)
try:
with open(config_file) as fd:
toml_config = toml.loads(fd.read())
except IOError as e:
LOGGER.info(
'Error loading PoET enclave simulator configuration: %s',
e)
LOGGER.info('Continuing with default configuration')
invalid_keys = set(toml_config.keys()).difference(['validator_id'])
if invalid_keys:
LOGGER.warning(
'Ignoring invalid keys in PoET enclave simulator config: '
'%s',
', '.join(sorted(list(invalid_keys))))
validator_id = toml_config.get('validator_id', validator_id)
LOGGER.debug(
'PoET enclave simulator creating anti-Sybil ID from: %s',
validator_id)
# Create an anti-Sybil ID that is unique for this validator
cls._anti_sybil_id = hashlib.sha256(validator_id.encode()).hexdigest()
def _get_dir(toml_config_setting, sawtooth_home_dir, windows_dir, default_dir):
"""Determines the directory path based on configuration.
Arguments:
toml_config_setting (str): The name of the config setting related
to the directory which will appear in path.toml.
sawtooth_home_dir (str): The directory under the SAWTOOTH_HOME
environment variable. For example, for 'data' if the data
directory is $SAWTOOTH_HOME/data.
windows_dir (str): The windows path relative to the computed base
directory.
default_dir (str): The default path on Linux.
Returns:
directory (str): The path.
"""
conf_file = os.path.join(_get_config_dir(), 'path.toml')
if os.path.exists(conf_file):
with open(conf_file) as fd:
raw_config = fd.read()
toml_config = toml.loads(raw_config)
if toml_config_setting in toml_config:
return toml_config[toml_config_setting]
if 'SAWTOOTH_HOME' in os.environ:
return os.path.join(os.environ['SAWTOOTH_HOME'], sawtooth_home_dir)
if os.name == 'nt':
base_dir = \
os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))
return os.path.join(base_dir, windows_dir)
return default_dir
def _read_json_request(self):
data_string = self.rfile.read(
int(self.headers['Content-Length']))
return json.loads(data_string.decode('utf-8'))
def get_server():
config_file = os.path.join(config.get_config_dir(), 'ias_proxy.toml')
LOGGER.info('Loading IAS Proxy config from: %s', config_file)
# Lack of a config file is a fatal error, so let the exception percolate
# up to caller
with open(config_file) as fd:
proxy_config = toml.loads(fd.read())
# Verify the integrity (as best we can) of the TOML configuration file
valid_keys = set(['proxy_name', 'proxy_port', 'ias_url', 'spid_cert_file'])
found_keys = set(proxy_config.keys())
invalid_keys = found_keys.difference(valid_keys)
if invalid_keys:
raise \
ValueError(
'IAS Proxy config file contains the following invalid '
'keys: {}'.format(
', '.join(sorted(list(invalid_keys)))))
missing_keys = valid_keys.difference(found_keys)
if missing_keys:
raise \
ValueError(
'IAS Proxy config file missing the following keys: '
'{}'.format(
', '.join(sorted(list(missing_keys)))))
return IasProxyServer(proxy_config)
def load_toml_rest_api_config(filename):
"""Returns a RestApiConfig created by loading a TOML file from the
filesystem.
"""
if not os.path.exists(filename):
LOGGER.info(
"Skipping rest api loading from non-existent config file: %s",
filename)
return RestApiConfig()
LOGGER.info("Loading rest api information from config: %s", filename)
try:
with open(filename) as fd:
raw_config = fd.read()
except IOError as e:
raise RestApiConfigurationError(
"Unable to load rest api configuration file: {}".format(str(e)))
toml_config = toml.loads(raw_config)
invalid_keys = set(toml_config.keys()).difference(
['bind', 'connect', 'timeout', 'opentsdb_db', 'opentsdb_url',
'opentsdb_username', 'opentsdb_password'])
if invalid_keys:
raise RestApiConfigurationError(
"Invalid keys in rest api config: {}".format(
", ".join(sorted(list(invalid_keys)))))
config = RestApiConfig(
bind=toml_config.get("bind", None),
connect=toml_config.get('connect', None),
timeout=toml_config.get('timeout', None),
opentsdb_url=toml_config.get('opentsdb_url', None),
opentsdb_db=toml_config.get('opentsdb_db', None),
opentsdb_username=toml_config.get('opentsdb_username', None),
opentsdb_password=toml_config.get('opentsdb_password', None),
)
return config
def load_toml_path_config(filename):
"""Returns a PathConfig created by loading a TOML file from the
filesystem.
"""
if not os.path.exists(filename):
LOGGER.info(
"Skipping path loading from non-existent config file: %s",
filename)
return PathConfig()
LOGGER.info("Loading path information from config: %s", filename)
try:
with open(filename) as fd:
raw_config = fd.read()
except IOError as e:
raise LocalConfigurationError(
"Unable to load path configuration file: {}".format(str(e)))
toml_config = toml.loads(raw_config)
invalid_keys = set(toml_config.keys()).difference(
['data_dir', 'key_dir', 'log_dir', 'policy_dir'])
if invalid_keys:
raise LocalConfigurationError("Invalid keys in path config: {}".format(
", ".join(sorted(list(invalid_keys)))))
config = PathConfig(
config_dir=None,
data_dir=toml_config.get('data_dir', None),
key_dir=toml_config.get('key_dir', None),
log_dir=toml_config.get('log_dir', None),
policy_dir=toml_config.get('policy_dir', None)
)
return config
def parse(self):
# Open the Pipfile.
with open(self.filename) as f:
content = f.read()
# Load the default configuration.
default_config = {
u'source': [{u'url': u'https://pypi.python.org/simple', u'verify_ssl': True, 'name': "pypi"}],
u'packages': {},
u'requires': {},
u'dev-packages': {}
}
config = {}
config.update(default_config)
# Load the Pipfile's configuration.
config.update(toml.loads(content))
# Structure the data for output.
data = {
'_meta': {
'sources': config['source'],
'requires': config['requires']
},
}
# TODO: Validate given data here.
self.groups['default'] = config['packages']
self.groups['develop'] = config['dev-packages']
# Update the data structure with group information.
data.update(self.groups)
return data
def check_lock(file_name, contents):
def find_reverse_dependencies(name, content):
for package in itertools.chain([content["root"]], content["package"]):
for dependency in package.get("dependencies", []):
if dependency.startswith("{} ".format(name)):
yield package["name"], dependency
if not file_name.endswith(".lock"):
raise StopIteration
# Package names to be neglected (as named by cargo)
exceptions = config["ignore"]["packages"]
content = toml.loads(contents)
packages_by_name = {}
for package in content.get("package", []):
if "replace" in package:
continue
source = package.get("source", "")
if source == r"registry+https://github.com/rust-lang/crates.io-index":
source = "crates.io"
packages_by_name.setdefault(package["name"], []).append((package["version"], source))
for (name, packages) in packages_by_name.iteritems():
if name in exceptions or len(packages) <= 1:
continue
message = "duplicate versions for package `{}`".format(name)
packages.sort()
packages_dependencies = list(find_reverse_dependencies(name, content))
for version, source in packages:
short_source = source.split("#")[0].replace("git+", "")
message += "\n\t\033[93mThe following packages depend on version {} from '{}':\033[0m" \
.format(version, short_source)
for name, dependency in packages_dependencies:
if version in dependency and short_source in dependency:
message += "\n\t\t" + name
yield (1, message)
# Check to see if we are transitively using any blocked packages
for package in content.get("package", []):
package_name = package.get("name")
package_version = package.get("version")
for dependency in package.get("dependencies", []):
dependency = dependency.split()
dependency_name = dependency[0]
whitelist = config['blocked-packages'].get(dependency_name)
if whitelist is not None:
if package_name not in whitelist:
fmt = "Package {} {} depends on blocked package {}."
message = fmt.format(package_name, package_version, dependency_name)
yield (1, message)
def load_toml_xo_config(filename):
"""Returns a XOConfig created by loading a TOML file from the
filesystem.
Args:
filename (string): The name of the file to load the config from
Returns:
config (XOConfig): The XOConfig created from the stored
toml file.
Raises:
LocalConfigurationError
"""
if not os.path.exists(filename):
LOGGER.info(
"Skipping transaction proccesor config loading from non-existent"
" config file: %s", filename)
return XOConfig()
LOGGER.info("Loading transaction processor information from config: %s",
filename)
try:
with open(filename) as fd:
raw_config = fd.read()
except IOError as e:
raise LocalConfigurationError(
"Unable to load transaction processor configuration file:"
" {}".format(str(e)))
toml_config = toml.loads(raw_config)
invalid_keys = set(toml_config.keys()).difference(
['connect'])
if invalid_keys:
raise LocalConfigurationError(
"Invalid keys in transaction processor config: "
"{}".format(", ".join(sorted(list(invalid_keys)))))
config = XOConfig(
connect=toml_config.get("connect", None)
)
return config
def load_toml_settings_config(filename):
"""Returns a SettingsConfig created by loading a TOML file from the
filesystem.
Args:
filename (string): The name of the file to load the config from
Returns:
config (SettingsConfig): The SettingsConfig created from the stored
toml file.
Raises:
LocalConfigurationError
"""
if not os.path.exists(filename):
LOGGER.info(
"Skipping transaction proccesor config loading from non-existent"
" config file: %s", filename)
return SettingsConfig()
LOGGER.info("Loading transaction processor information from config: %s",
filename)
try:
with open(filename) as fd:
raw_config = fd.read()
except IOError as e:
raise LocalConfigurationError(
"Unable to load transaction processor configuration file:"
" {}".format(str(e)))
toml_config = toml.loads(raw_config)
invalid_keys = set(toml_config.keys()).difference(
['connect'])
if invalid_keys:
raise LocalConfigurationError(
"Invalid keys in transaction processor config: "
"{}".format(", ".join(sorted(list(invalid_keys)))))
config = SettingsConfig(
connect=toml_config.get("connect", None)
)
return config
def load_toml_identity_config(filename):
"""Returns a IdentityConfig created by loading a TOML file from the
filesystem.
Args:
filename (string): The name of the file to load the config from
Returns:
config (IdentityConfig): The IdentityConfig created from the stored
toml file.
Raises:
LocalConfigurationError
"""
if not os.path.exists(filename):
LOGGER.info(
"Skipping transaction proccesor config loading from non-existent"
" config file: %s", filename)
return IdentityConfig()
LOGGER.info("Loading transaction processor information from config: %s",
filename)
try:
with open(filename) as fd:
raw_config = fd.read()
except IOError as e:
raise LocalConfigurationError(
"Unable to load transaction processor configuration file:"
" {}".format(str(e)))
toml_config = toml.loads(raw_config)
invalid_keys = set(toml_config.keys()).difference(
['connect'])
if invalid_keys:
raise LocalConfigurationError(
"Invalid keys in transaction processor config: "
"{}".format(", ".join(sorted(list(invalid_keys)))))
config = IdentityConfig(
connect=toml_config.get("connect", None)
)
return config