def __init__(self, configfile, dryrun=False, debug=False):
self.configfile = configfile
self.config = yaml_load(open(configfile))
logger.info("Loaded configuration file: %s" % configfile)
src_root = self.config.get("src_root", "/")
if os.path.isabs(src_root):
self.src_root = src_root
logger.info("Source root directory: %s" % self.src_root)
else:
raise ValueError("Source root must be an absolute path")
self.syspath = syspath.union(self.config.get("syspath", []))
logger.info("Protected system paths: {0}".format(self.syspath))
dest_root = os.path.expanduser(self.config["dest_root"])
logger.info("Check backup destination against protected paths ...")
self.dest_root = self.check_dest(dest_root)
logger.info("Backup destination: %s" % self.dest_root)
self.dryrun = dryrun
logger.info("Dry run mode: %s" % dryrun)
self.debug = debug
logger.info("Show DEBUG information: %s" % debug)
python类safe_load()的实例源码
def load_schema(schema_file=None):
schema_file = schema_file or get_configuration().schema_loc
schema_file = os.path.abspath(schema_file)
schema_cache = getattr(load_schema, '_schemas', {})
if schema_file in schema_cache:
return schema_cache[schema_file]
with open(schema_file, 'r') as sf:
schema = yaml.safe_load(sf)
if 'tables' not in schema:
raise ValueError('Tables list missing from schema.')
if 'types' not in schema:
raise ValueError('Types missing from schema.')
schema_cache[schema_file] = schema
return schema_cache[schema_file]
def get_parsed_context(context_arg):
"""Parse input context string and returns context as dictionary."""
assert context_arg, ("pipeline must be invoked with --context set. For "
"this yaml parser you're looking for something "
"like --context './myyamlfile.yaml'")
logger.debug("starting")
logger.debug(f"attempting to open file: {context_arg}")
with open(context_arg) as yaml_file:
payload = yaml.safe_load(yaml_file)
logger.debug(f"yaml file parsed. Count: {len(payload)}")
if not isinstance(payload, MutableMapping):
raise TypeError("yaml input should describe a dictionary at the top "
"level. You should have something like "
"\n'key1: value1'\n key2: value2'\n"
"in the yaml top-level, not \n'- value1\n - value2'")
logger.debug("done")
return payload
def prepare_git_user(self):
"""
Tries to read the git name and email, so all git commits have correct author.
Requests /api/user-git to check which user is behind the current configured ssh key.
"""
import aetros.api
try:
response = aetros.api.request('user-git')
if response:
user = yaml.safe_load(response)
self.git_name = user['name']
self.git_email = user['email']
else:
self.go_offline()
except ApiConnectionError as e:
self.go_offline()
def load_server(self, server_id) -> bool:
if not os.path.exists("data/{}".format(server_id)):
return False
log.debug("Loading server: {}".format(server_id))
config = yaml.safe_load(open("data/{}/config.yml".format(server_id), "r"))
sections = yaml.safe_load(open("data/{}/sections.yml".format(server_id), "r"))
if os.path.exists("data/{}/notes.yml".format(server_id)):
notes = yaml.safe_load(open("data/{}/notes.yml".format(server_id), "r"))
else:
notes = DEFAULT_NOTES
if "notes_channel" not in config:
config["notes_channel"] = None
self.data[server_id] = {
"config": config,
"sections": self.load_sections(sections)
}
self.notes[server_id] = notes
return True
def _update_from_file(self, filename):
""" Helper method to update an existing configuration with the values from a file.
Loads a configuration file and replaces all values in the existing configuration
dictionary with the values from the file.
Args:
filename (str): The path and name to the configuration file.
"""
if os.path.exists(filename):
try:
with open(filename, 'r') as config_file:
yaml_dict = yaml.safe_load(config_file.read())
if yaml_dict is not None:
self._update_dict(self._config, yaml_dict)
except IsADirectoryError:
raise ConfigLoadError(
'The specified configuration file is a directory not a file')
else:
raise ConfigLoadError('The config file {} does not exist'.format(filename))
def initialize_kinto(loop, kinto_client, bucket, collection):
"""
Initialize the remote server with the initialization.yml file.
"""
# Leverage kinto-wizard async client.
thread_pool = ThreadPoolExecutor()
async_client = AsyncKintoClient(kinto_client, loop, thread_pool)
initialization_manifest = pkgutil.get_data('buildhub', 'initialization.yml')
config = yaml.safe_load(initialization_manifest)
# Check that we push the records at the right place.
if bucket not in config:
raise ValueError(f"Bucket '{bucket}' not specified in `initialization.yml`.")
if collection not in config[bucket]['collections']:
raise ValueError(f"Collection '{collection}' not specified in `initialization.yml`.")
await initialize_server(async_client,
config,
bucket=bucket,
collection=collection,
force=False)
def setup_logging(
default_path='./parameter/logger.yml',
default_level=logging.INFO,
env_key='LOG_CFG'
):
"""Setup logging configuration
"""
path = os.path.abspath(default_path)
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(os.path.abspath(path)):
with open(path, 'rt') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
else:
logging.basicConfig(level=default_level)
def task_example():
'''
cp|strip config.yml -> config.yml.example
'''
apikey = '82_CHAR_APIKEY'
punch = fmt('''
authorities:
digicert:
apikey: {apikey}
destinations:
zeus:
apikey: {apikey}
''')
return {
'actions': [
fmt('cp {CONFIG_YML}.example {CONFIG_YML}.bak'),
fmt('cp {CONFIG_YML} {CONFIG_YML}.example'),
lambda: _update_config(CONFIG_YML+'.example', yaml.safe_load(punch)),
],
}
def unbundle(dirpath, cert_name):
key, csr, crt, yml, readme = [None] * 5
tarpath = fmt('{dirpath}/{cert_name}.tar.gz')
with tarfile.open(tarpath, 'r:gz') as tar:
for info in tar.getmembers():
if info.name.endswith('.key'):
key = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.csr'):
csr = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.crt'):
crt = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.yml'):
yml = tar.extractfile(info.name).read().decode('utf-8')
yml = yaml.safe_load(yml)
elif info.name == 'README':
readme = tar.extractfile(info.name).read().decode('utf-8')
return key, csr, crt, yml, readme
def unbundle(dirpath, cert_name):
key, csr, crt, yml, readme = [None] * 5
tarpath = fmt('{dirpath}/{cert_name}.tar.gz')
with tarfile.open(tarpath, 'r:gz') as tar:
for info in tar.getmembers():
if info.name.endswith('.key'):
key = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.csr'):
csr = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.crt'):
crt = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.yml'):
yml = tar.extractfile(info.name).read().decode('utf-8')
yml = yaml.safe_load(yml)
elif info.name == 'README':
readme = tar.extractfile(info.name).read().decode('utf-8')
return key, csr, crt, yml, readme
def unbundle(dirpath, cert_name):
key, csr, crt, yml, readme = [None] * 5
tarpath = fmt('{dirpath}/{cert_name}.tar.gz')
with tarfile.open(tarpath, 'r:gz') as tar:
for info in tar.getmembers():
if info.name.endswith('.key'):
key = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.csr'):
csr = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.crt'):
crt = tar.extractfile(info.name).read().decode('utf-8')
elif info.name.endswith('.yml'):
yml = tar.extractfile(info.name).read().decode('utf-8')
yml = yaml.safe_load(yml)
elif info.name == 'README':
readme = tar.extractfile(info.name).read().decode('utf-8')
return key, csr, crt, yml, readme
def _fixup(obj):
if isinstance(obj, dict):
d = deepcopy(obj)
for k,v in obj.items():
if isinstance(v, str):
if 'url' in k:
d[k] = URL(v)
elif 'path' in k:
d[k] = Path(v)
elif 'auth' == k:
with open(fmt('{CONFIG_DIR}/{v}'), 'r') as f:
d[k] = yaml.safe_load(f.read())
elif isinstance(v, dict):
d[k] = _fixup(v)
return d
return obj
def run_step(context):
"""Fetch a json file from s3 and put the json values into context.
Args:
- context: pypyr.context.Context. Mandatory. Should contain keys for:
- s3Fetch: dict. mandatory. Must contain:
- Bucket: string. s3 bucket name.
- Key: string. s3 key name.
json parsed from the s3 file will be merged into the
context. This will overwrite existing values if the same keys are already
in there. I.e if s3 json has {'eggs' : 'boiled'} and context
{'eggs': 'fried'} already exists, returned context['eggs'] will be
'boiled'.
"""
logger.debug("started")
response = pypyraws.aws.s3.get_payload(context)
payload = yaml.safe_load(response)
logger.debug("successfully parsed yaml from s3 response bytes")
context.update(payload)
logger.info("loaded s3 yaml into pypyr context")
logger.debug("done")
def load_yaml(filename):
with open(filename) as myfile:
content = myfile.read()
if "win" in sys.platform:
content = content.replace("\\", "/")
return yaml.safe_load(content) # myfile.read())
def load(self, path_cfg: str):
with open(path_cfg, 'r') as stream:
try:
self.cfg_dict = yaml3ed.safe_load(stream)
except yaml3ed.YAMLError as exc:
print(exc)
self.check()
self.zbx = ZabbixAgent(self.cfg_dict['zabbix']['url'], self.cfg_dict['zabbix']['login'],
self.cfg_dict['zabbix']['password'])
log.debug('Config loaded')
def test_package_metadata_config_gen_task(self):
task = package_metadata_tasks.PackageMetadataConfigGenTask()
repo_root = os.path.abspath('.')
package_dependencies_yaml = os.path.join(
repo_root,
'test/testdata/googleapis_test/gapic/packaging/dependencies.yaml')
package_defaults_yaml = os.path.join(
repo_root,
'test/testdata/googleapis_test/gapic/packaging/api_defaults.yaml')
task.execute(
api_name='fake',
api_version='v1',
gapic_api_yaml=[],
language='python',
local_paths={
'googleapis': '%s/googleapis' % repo_root,
'reporoot': repo_root,
},
organization_name='google-cloud',
output_dir=str(self.output_dir),
package_dependencies_yaml=package_dependencies_yaml,
package_defaults_yaml=package_defaults_yaml,
proto_deps=['googleapis-common-protos'],
package_type="grpc_client",
src_proto_path=['path/to/protos'],
generated_package_version={'lower': '0.17.29', 'upper': '0.18dev'},
release_level='beta'
)
with open(os.path.join(str(self.output_dir),
'google-cloud-fake-v1_package.yaml')) as f:
actual = yaml.safe_load(f)
with open('test/testdata/google-cloud-fake-v1_package.yaml') as f:
expected = yaml.safe_load(f)
# Don't compare files directly because yaml doesn't preserve ordering
self.assertDictEqual(actual, expected)
def load(self, data):
self.data = yload(data)
def read_yaml_cases(config, open):
for case in yaml.safe_load(config)['cases']:
time = TIME_RE.fullmatch(case['time'])
if not time:
raise FormatError(case['time'], 'error parsing time')
memory = MEMORY_RE.fullmatch(case['memory'])
if not memory:
raise FormatError(case['memory'], 'error parsing memory')
yield DefaultCase(
partial(open, case['input']),
partial(open, case['output']),
int(float(time.group(1)) * TIME_UNITS[time.group(2)]),
int(float(memory.group(1)) * MEMORY_UNITS[memory.group(2)]),
int(case['score']))
def load_table(table_loc, table_type):
"""
Loads a table of type ``table_type`` from the YAML file ``table_loc``.
"""
with open(table_loc, 'r') as yf:
table_file = yaml.safe_load(yf)
assert table_file['db_version'] == DB_VERSION
table_list = table_file['data']
raw_table = (table_type(**params) for params in table_list)
table_by_id = {x.id: x for x in raw_table}
return table_by_id
def from_file(cls, file_loc, **kwargs):
if not os.path.exists(file_loc):
raise IOError('File not found: {}'.format(file_loc))
with open(file_loc, 'r') as yf:
config = yaml.safe_load(yf)
config.update(kwargs)
return cls(config_loc_=file_loc, **config)
def from_string(cls, src):
return cls.from_kwargs(None, **yaml.safe_load(src, yaml.RoundTripLoader))
def load():
def _load(yaml_string):
return ryaml.safe_load(dedent(yaml_string))
return _load
def from_filename(cls, filename: str) -> 'ShanghaiConfiguration':
with open(filename, 'r', encoding='utf-8') as f:
yaml_config = ryaml.safe_load(f)
return cls(yaml_config)
def __init__(self, *args, **kwargs):
self._defs = True
super(Definition, self).__init__(*args, **kwargs)
self.data = pkg_resources.resource_string(__name__,
"provisioning.yaml")
self._parsed_defs = yaml.safe_load(self.data)
self.value = '_NS/provisioning/definitions'
def get_parsed_defs(self):
if self._parsed_defs:
return self._parsed_defs
self._parsed_defs = yaml.safe_load(self.data)
return self._parsed_defs
def __init__(self, *args, **kwargs):
self._defs = {}
super(Definition, self).__init__(*args, **kwargs)
self.data = pkg_resources.resource_string(__name__, "node_agent.yaml")
self._parsed_defs = yaml.safe_load(self.data)
self.value = '_NS/node_agent/definitions'
def get_parsed_defs(self):
if self._parsed_defs:
return self._parsed_defs
self._parsed_defs = yaml.safe_load(self.data)
return self._parsed_defs
def __init__(self, *args, **kwargs):
self._defs = {}
super(CompiledDefinitions, self).__init__(*args, **kwargs)
self.data = definitions.data
self._parsed_defs = yaml.safe_load(self.data)
self.value = '_NS/node_agent/compiled_definitions'
def __init__(self, *args, **kwargs):
self._defs = {}
super(Definition, self).__init__(*args, **kwargs)
self.data = pkg_resources.resource_string(__name__, "gluster.yaml")
self._parsed_defs = yaml.safe_load(self.data)
self.value = '_NS/integrations/gluster/definitions'