def parse_front_matter(lines):
"""
Parse lines of front matter
"""
if not lines: return "toml", {}
if lines[0] == "{":
# JSON
import json
return "json", json.loads("\n".join(lines))
if lines[0] == "+++":
# TOML
import toml
return "toml", toml.loads("\n".join(lines[1:-1]))
if lines[0] == "---":
# YAML
import yaml
return "yaml", yaml.load("\n".join(lines[1:-1]), Loader=yaml.CLoader)
return {}
python类CLoader()的实例源码
def load_app_from_config(path):
"""
Generate app directly from config file, bypassing command line settings (useful for testing in ipython)
"""
Setting.generate_missing_shorthands()
defaults = Setting.generate_defaults_dict()
if osp.isfile(path):
file_stream = open(path, "r", encoding="utf-8")
config_defaults = load(file_stream, Loader=Loader)
file_stream.close()
for key, value in config_defaults.items():
defaults[key] = value
else:
raise ValueError("Settings file not found at: {0:s}".format(path))
args = ap.Namespace()
for key, value in defaults.items():
args.__dict__[key] = value
if args.unsynced:
app = ApplicationUnsynced(args)
else:
app = ApplicationSynced(args)
return app
def load_app_from_config(path):
"""
Generate app directly from config file, bypassing command line settings (useful for testing in ipython)
"""
Setting.generate_missing_shorthands()
defaults = Setting.generate_defaults_dict()
if osp.isfile(path):
file_stream = open(path, "r", encoding="utf-8")
config_defaults = load(file_stream, Loader=Loader)
file_stream.close()
for key, value in config_defaults.items():
defaults[key] = value
else:
raise ValueError("Settings file not found at: {0:s}".format(path))
args = ap.Namespace()
for key, value in defaults.items():
args.__dict__[key] = value
app = StereoMatcherApp(args)
return app
def load_app_from_config(path):
"""
Generate app directly from config file, bypassing command line settings (useful for testing in ipython)
"""
Setting.generate_missing_shorthands()
defaults = Setting.generate_defaults_dict()
if osp.isfile(path):
file_stream = open(path, "r", encoding="utf-8")
config_defaults = load(file_stream, Loader=Loader)
file_stream.close()
for key, value in config_defaults.items():
defaults[key] = value
else:
raise ValueError("Settings file not found at: {0:s}".format(path))
args = ap.Namespace()
for key, value in defaults.items():
args.__dict__[key] = value
if args.unsynced:
app = ApplicationUnsynced(args)
else:
app = ApplicationSynced(args)
return app
def get_module(filename_with_path):
"""
Given a filename with an absolute path, load the contents, instantiate a Module, and set the Module's path
attribute.
Parameters:
filename_with_path (str): the YAML filename with an absolute path
"""
try:
with open(filename_with_path) as config_file:
Module.temp_path = filename_with_path
this_module = yaml.load(config_file, Loader=Loader)
Module.temp_path = ""
return this_module
except IOError:
raise ModulePathError(filename_with_path)
except yaml.scanner.ScannerError:
raise ModuleConstraintParseError("Parsing of module {} failed. This is likely caused by a typo in the file."
"".format(filename_with_path))
# Add the YAML Module constructor so that YAML knows to use it in situations where the tag matches.
def read_configuration(cligraph, custom_suffix=''):
"""Read configuration dict for the given tool
"""
cfg = {}
layers = collections.OrderedDict()
layers['auto'] = [automatic_configuration, None]
layers['shared'] = [os.path.join(cligraph.tool_path, 'conf/%s.yaml' % cligraph.tool_shortname), None]
layers['custom'] = [os.path.join(os.path.abspath(os.path.expanduser('~/.' + cligraph.tool_shortname)), '%s.yaml%s' % (cligraph.tool_shortname, custom_suffix)), None]
for layer_name, layer_data in layers.items():
if callable(layer_data[0]):
layer = layer_data[0](cligraph, layer_name)
else:
if not os.path.exists(layer_data[0]):
continue
with open(layer_data[0], 'r') as filep:
layer = yaml.load(filep, Loader=Loader)
layers[layer_name][1] = layer
if layer:
update_recursive(cfg, layer)
resolve_config(cfg)
return AttrDict(**cfg), layers
def get_dataset(filename):
'''
Iterator for dataset's items
:param filename: Path to dataset's file
:type filename: str
:return: Dataset's items
:raises OSError: if has problem with file
:raises yaml.YAMLError: if has problem with format
:raises ValueError: if has problem with content
'''
with open(filename, 'rt', encoding='utf-8') as input:
package = load(input, Loader=Loader)
dataset = package.get('dataset')
if not isinstance(dataset, list):
raise ValueError('wrong format')
yield from dataset
def process_measurements(self):
"""Process measurements"""
loader = Loader(self.measurements_stream)
setattr(loader, 'collector', self.collector)
setattr(loader, 'system', self.system)
setattr(loader, 'config', self.config)
measurements = loader.get_data()
for measurement_name in measurements:
logging.debug('Process "{}" measurements: {}'.format(
measurement_name, measurements[measurement_name]))
for measurement in measurements[measurement_name]:
self.send_data(measurement)
def process_notify(self, notification):
"""Process events"""
loader = Loader(self.events_stream)
setattr(loader, 'notification', notification)
setattr(loader, 'system', self.system)
notifications = loader.get_data()
for notify_name in notifications:
logging.debug('Process "{}" notification'.format(notify_name))
if notifications[notify_name] is not None:
self.send_data(notifications[notify_name])
def parse_file(cls, path):
try:
if hasattr(path, 'read') and callable(path.read):
# File-like obj
conf = yaml.load(path.read(), Loader=_Loader)
else:
with io.open(path) as f:
conf = yaml.load(f.read(), Loader=_Loader)
except yaml.error.MarkedYAMLError as e:
raise InvalidSpecification(str(e))
return cls.parse(conf)
def load_yaml(document: Path, required=False):
document = Path(document)
if not document.exists():
if required:
click.secho('yaml document does not exists: {0}'.format(document), fg='red')
return {}
try:
return yaml.load(document.text(), Loader=Loader)
except yaml.YAMLError as exc:
click.secho(str(exc), fg='red')
return {}
def parse_annotations(self, ctx, symbol):
assert ctx and symbol
if ctx.comment:
comment = ctx.comment.text
symbol.comment = comment
if ctx.tagSymbol():
lines = [tag.line.text[1:] for tag in ctx.tagSymbol()]
try:
data = yaml.load('\n'.join(lines), Loader=Loader)
symbol._tags = data
except yaml.YAMLError as exc:
click.secho(exc, fg='red')
def read_yaml(file: str):
with open(file, 'rU', encoding="utf-8") as stream:
return yaml.load(stream, Loader=Loader)
def load_config(path):
# TODO: support old-style setting names? i.e. pass them through ARGS_TO_SETTINGS ?
data = {}
yaml_err = ""
if useYAML:
try:
# this assumes only one document
with open(path, encoding='utf8') as infp:
data = yaml_load(infp, Loader=YAMLLoader)
except FileNotFoundError:
return {}
except YAMLError as e:
yaml_err = 'YAML parsing error in file {}'.format(path)
if hasattr(e, 'problem_mark'):
mark = e.problem_mark
yaml_err + '\nError on Line:{} Column:{}'.format(mark.line + 1, mark.column + 1)
else:
return _convert_old_config(data)
try:
with open(path, encoding='utf8') as infp:
data = json.load(infp)
except FileNotFoundError:
return {}
except json.JSONDecodeError as e:
if useYAML and yaml_err:
print(yaml_err)
else:
print('JSON parsing error in file {}'.format(path),
'Error on Line: {} Column: {}'.format(e.lineno, e.colno), sep='\n')
data = _convert_old_config(data)
# if 'directory' in data:
# for k, v in data['directory'].items():
# data['directory'][k] = os.path.expanduser(v)
return data
def load(*args, **kwargs):
"""Delegate to yaml load.
"""
if kwargs is None:
kwargs = {}
kwargs['Loader'] = Loader
return yaml.load(*args, **kwargs)
def load_all(*args, **kwargs):
"""Delegate to yaml loadall.
"""
if kwargs is None:
kwargs = {}
kwargs['Loader'] = Loader
return yaml.load_all(*args, **kwargs)
def main(argv):
if len(argv) < 2:
usage()
filename = sys.argv[1]
with open(filename, 'rb') as f:
products = []
count, good, bad = 0, 0, 0
out = csv.writer(open("products.csv","w"))
for line in f:
count += 1
if not (count % 100000):
print "count:", count, "good:", good, ", bad:", bad
if ("'title':" in line) and ("'brand':" in line) and ("'categories':" in line):
try:
line = line.rstrip().replace("\\'","''")
product = yaml.load(line, Loader=Loader)
title, brand, categories = product['title'], product['brand'], product['categories']
description = product['description'] if 'description' in product else ''
categories = ' / '.join([item for sublist in categories for item in sublist])
out.writerow([title, brand, description, categories])
good += 1
except Exception as e:
print line
print e
bad += 1
print "good:", good, ", bad:", bad
def collect(self):
loader = yaml.CLoader if hasattr(yaml, 'CLoader') else yaml.Loader
with self.fspath.open() as f:
data = yaml.load(f.read(), Loader=loader)
for test in self._compose_tests(data):
yield test
def parse_native_yaml(path):
with open(path, 'r') as f:
return yaml.load(f, Loader=Loader)
def parse_nn_yaml(filename):
with open(filename, 'r') as f:
return yaml.load(f, Loader=Loader)
def load_deprecated_signatures(declarations_by_signature):
with open(deprecated_path, 'r') as f:
deprecated_defs = yaml.load(f, Loader=Loader)
declarations = []
def get_signature(name, params, call_args):
# create a mapping of parameter name to parameter type
types = dict([param.split(' ')[::-1] for param in params])
# if the name in the call is not in the parameter list, assume it's
# a literal Scalar
rearranged_types = [types.get(arg, 'Scalar') for arg in call_args]
return '{}({})'.format(name, ', '.join(rearranged_types))
for deprecated in deprecated_defs:
prototype = deprecated['name']
call_args = split_name_params(deprecated['aten'])[1]
name, params = split_name_params(prototype)
signature = get_signature(name, params, call_args)
for declaration in declarations_by_signature[signature]:
declaration = copy.deepcopy(declaration)
declaration['deprecated'] = True
declaration['call_args'] = call_args
if declaration['inplace']:
declaration['prototype'] = prototype.replace(name, name + '_')
else:
declaration['prototype'] = prototype
args_by_name = {arg['name']: arg for arg in declaration['arguments']}
declaration['arguments'] = []
for arg in params:
_, arg_name = arg.split(' ')
declaration['arguments'].append(args_by_name[arg_name])
declarations.append(declaration)
return declarations
def load_yaml_file(filepath):
with open(filepath) as fp:
parse_data(load(fp, Loader=Loader))
def read(self):
self.update(load(open(self.file), Loader))
self['playlists'] = self.get('playlists', {})
self['players'] = self.get('players', {})
if not 'chromecast' in self['players']:
self['players']['chromecast'] = {'type': 'chromecast'}
def load_data_yaml(self, *args):
''' Load one or several YAML stats files and merge them with current results '''
superstats = []
for filename in args:
self.stream = open(filename, 'r')
superstats.extend([data for data in yaml.load_all(self.stream, Loader=Loader) if data is not None])
return superstats
def parse_yaml(filename):
"""
Parses a YAML file and returns a nested dictionary containing its contents.
:param str filename: Name of YAML file to parse
:return: Parsed file contents
:rtype: dict or None
"""
try:
# Enables use of stdin if '-' is specified
with sys.stdin if filename == '-' else open(filename) as f:
try:
# Parses the YAML file into a dict
return load(f, Loader=Loader)
except YAMLError as exc:
logging.critical("Could not parse YAML file %s", filename)
if hasattr(exc, 'problem_mark'):
# Tell user exactly where the syntax error is
mark = exc.problem_mark
logging.error("Error position: (%s:%s)",
mark.line + 1, mark.column + 1)
else:
logging.error("Error: %s", exc)
return None
except FileNotFoundError:
logging.critical("Could not find YAML file for parsing: %s", filename)
return None
def load(cls, filename):
'''Load model from file in YAML format.'''
with open(filename, 'r') as fin:
return load(fin, Loader=Loader)
def main():
parser = argparse.ArgumentParser(description='holosocket server')
parser.add_argument('-c', '--config', help='config file')
parser.add_argument('-4', '--ipv4', action='store_true', help='ipv4 only')
parser.add_argument('--debug', action='store_true', help='debug mode')
args = parser.parse_args()
if args.config:
with open(args.config, 'r') as f:
config = yaml.load(f, Loader=Loader)
if args.debug:
LOGGING_MODE = logging.DEBUG
else:
LOGGING_MODE = logging.INFO
logging.basicConfig(
level=LOGGING_MODE,
format='{asctime} {levelname} {message}',
datefmt='%Y-%m-%d %H:%M:%S',
style='{')
if args.ipv4:
SERVER = config['server']
else:
SERVER = (config['server'], '::')
SERVER_PORT = config['server_port']
KEY = config['password']
try:
DNS = config['dns']
except KeyError:
DNS = None
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logging.info('uvloop mode')
except ImportError:
logging.info('pure asyncio mode')
loop = asyncio.get_event_loop()
server = Server(KEY, nameservers=DNS)
coro = asyncio.start_server(server.handle, SERVER, SERVER_PORT, loop=loop)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
def main():
parser = argparse.ArgumentParser(description='holosocket local')
parser.add_argument('-c', '--config', help='config file')
parser.add_argument('--debug', action='store_true', help='debug mode')
args = parser.parse_args()
if args.config:
with open(args.config, 'r') as f:
config = yaml.load(f, Loader=Loader)
if args.debug:
LOGGING_MODE = logging.DEBUG
else:
LOGGING_MODE = logging.INFO
logging.basicConfig(
level=LOGGING_MODE,
format='{asctime} {levelname} {message}',
datefmt='%Y-%m-%d %H:%M:%S',
style='{')
SERVER = config['server']
try:
V6_SERVER = config['v6_server']
except KeyError:
V6_SERVER = None
SERVER_PORT = config['server_port']
LOCAL = config['local']
PORT = config['local_port']
KEY = config['password']
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logging.info('uvloop mode')
except ImportError:
logging.info('pure asyncio mode')
loop = asyncio.get_event_loop()
server = Server(SERVER, V6_SERVER, SERVER_PORT, KEY)
coro = asyncio.start_server(server.handle, LOCAL, PORT, loop=loop)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
def main():
Setting.generate_missing_shorthands()
defaults = Setting.generate_defaults_dict()
conf_parser = \
Setting.generate_parser(defaults, console_only=True, description=
"Use one or more .mp4 video files to perform calibration: " +
"find the cameras' intrinsics and/or extrinsics.")
# ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================#
args, remaining_argv = conf_parser.parse_known_args()
defaults[Setting.save_settings.name] = args.save_settings
if args.settings_file:
defaults[Setting.settings_file.name] = args.settings_file
if osp.isfile(args.settings_file):
file_stream = open(args.settings_file, "r", encoding="utf-8")
config_defaults = load(file_stream, Loader=Loader)
file_stream.close()
for key, value in config_defaults.items():
defaults[key] = value
else:
raise ValueError("Settings file not found at: {0:s}".format(args.settings_file))
parser = Setting.generate_parser(defaults, parents=[conf_parser])
args = parser.parse_args(remaining_argv)
# process "special" setting values
if args.folder == "!settings_file_location":
if args.settings_file and osp.isfile(args.settings_file):
args.folder = osp.dirname(args.settings_file)
# save settings if prompted to do so
if args.save_settings and args.settings_file:
setting_dict = vars(args)
file_stream = open(args.settings_file, "w", encoding="utf-8")
file_name = setting_dict[Setting.save_settings.name]
del setting_dict[Setting.save_settings.name]
del setting_dict[Setting.settings_file.name]
dump(setting_dict, file_stream, Dumper=Dumper)
file_stream.close()
setting_dict[Setting.save_settings.name] = file_name
setting_dict[Setting.settings_file.name] = True
app = MultiStereoApplication(args)
def main():
Setting.generate_missing_shorthands()
defaults = Setting.generate_defaults_dict()
conf_parser = \
Setting.generate_parser(defaults, console_only=True, description=
"Test stereo algorithms on two image files.")
# ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================#
args, remaining_argv = conf_parser.parse_known_args()
defaults[Setting.save_settings.name] = args.save_settings
if args.settings_file:
defaults[Setting.settings_file.name] = args.settings_file
if osp.isfile(args.settings_file):
file_stream = open(args.settings_file, "r", encoding="utf-8")
config_defaults = load(file_stream, Loader=Loader)
file_stream.close()
if config_defaults:
for key, value in config_defaults.items():
defaults[key] = value
else:
raise ValueError("Settings file not found at: {0:s}".format(args.settings_file))
parser = Setting.generate_parser(defaults, parents=[conf_parser])
args = parser.parse_args(remaining_argv)
# process "special" setting values
if args.folder == "!settings_file_location":
if args.settings_file and osp.isfile(args.settings_file):
args.folder = osp.dirname(args.settings_file)
# save settings if prompted to do so
if args.save_settings and args.settings_file:
setting_dict = vars(args)
file_stream = open(args.settings_file, "w", encoding="utf-8")
file_name = setting_dict[Setting.save_settings.name]
del setting_dict[Setting.save_settings.name]
del setting_dict[Setting.settings_file.name]
dump(setting_dict, file_stream, Dumper=Dumper)
file_stream.close()
setting_dict[Setting.save_settings.name] = file_name
setting_dict[Setting.settings_file.name] = True
app = StereoMatcherApp(args)
app.disparity2()