def main():
"""
Creates all the argparse-rs and invokes the function from set_defaults().
:return: The result of the function from set_defaults().
"""
parser = get_parser()
args = parser.parse_args()
args.log_level = logging._nameToLevel[args.log_level]
setup_logging(args.log_level)
try:
handler = args.handler
except AttributeError:
def print_usage(_):
parser.print_usage()
handler = print_usage
return handler(args)
python类_nameToLevel()的实例源码
def __call__(self, parser, namespace, values, option_string=None):
values = values.split(':')
level, logger = values if len(values) > 1 else (values[0],
self.main_logger)
logger = logging.getLogger(logger)
try:
logger.setLevel(logging._nameToLevel[level.upper()])
except KeyError:
msg = "invalid level choice: %s (choose from %s)" % \
(level, parser.log_levels)
raise argparse.ArgumentError(self, msg)
super(LoggingAction, self).__call__(parser, namespace, values,
option_string)
def main():
parser = argparse.ArgumentParser(
description="asyncio-based source compiler and test runner")
parser.add_argument(
'-c',
'--conf',
type=argparse.FileType('r'),
help="custom yaml configuration file to use")
parser.add_argument(
'-l',
'--logging',
choices=[l.lower() for l in logging._nameToLevel],
help="logging level (overrides root logger level from file conf)")
cmd = parser.add_subparsers(dest='command')
commands = dict(getattr(module, 'build')(cmd)
for module in (languages, test, serve, benchmark))
args = parser.parse_args()
if args.conf:
# merge user defined conf
conf.merge(yaml.load(args.conf))
# default logging config from conf
logging.config.dictConfig(conf['logging'])
if args.logging:
# override root logger level
logging.root.setLevel(args.logging.upper())
# import built-in languages
load_builtins()
# import user languages from environ
load_from_environ()
try:
func = commands[args.command]
except KeyError:
parser.error("missing command")
else:
sys.exit(func(args))
def setup_logging(level):
"""
Makes stdout and stderr unicode friendly in case of misconfigured
environments, initializes the logging and enables colored logs if it is
appropriate.
:param level: The logging level, can be either an int or a string.
:return: None
"""
if not isinstance(level, int):
level = logging._nameToLevel[level]
def ensure_utf8_stream(stream):
if not isinstance(stream, io.StringIO) and hasattr(stream, "buffer"):
stream = codecs.getwriter("utf-8")(stream.buffer)
stream.encoding = "utf-8"
return stream
sys.stdout, sys.stderr = (ensure_utf8_stream(s)
for s in (sys.stdout, sys.stderr))
logging.basicConfig(level=level)
root = logging.getLogger()
# In some cases root has handlers and basicConfig() is a no-op
root.setLevel(level)
if not sys.stdin.closed and sys.stdout.isatty():
handler = root.handlers[0]
handler.setFormatter(ColorFormatter())
def handle_logger(arglar):
logger.setLevel(logging._nameToLevel[arglar.log_level])
formatter = logging.Formatter('[%(asctime)s][%(name)s][%(levelname)s] %(message)s')
logger.handlers = []
if arglar.debug:
handler = logging.handlers.RotatingFileHandler(filename=arglar.log_file.name,
maxBytes=1024 * 1024,
backupCount=3)
handler.setFormatter(formatter)
logger.addHandler(handler)
else:
logger.addHandler(logging.NullHandler())
def _process_level(value):
"""
Given an input (as string or int) convert to LOGGING level value
:param value:
"""
# allow settings.ini to use names like level=debug, logging._nameToLevel[] will throw exception
name_to_level = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARN': logging.WARNING,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET
}
if isinstance(value, str):
# handle the LOGGING level, such as INFO or DEBUG or 10
value = unquote_string(value)
try:
# start assuming is int, but it is probably str of int
value = int(value)
except ValueError:
# if here, then try if string like INFO, force to UPPER()
try:
value = name_to_level[value.upper()]
except KeyError:
raise ValueError("Logging level must as expected by Python LOGGING" +
"module - not {}".format(value))
if not isinstance(value, int):
raise TypeError("Logging value must be string or int")
return value
def set_msglevel(self, event):
self.msg_level = logging._nameToLevel[self.cmb_msglevel.get().upper()]
def __getattr__(self, item):
if item.upper() in logging._nameToLevel:
return extras_wrapper(self, item)(getattr(self.logger, item))
raise AttributeError
def main():
"""
Creates all the argparse-rs and invokes the function from set_defaults().
:return: The result of the function from set_defaults().
"""
parser = argparse.ArgumentParser()
parser.add_argument("--log-level", default="INFO",
choices=logging._nameToLevel,
help="Logging verbosity.")
subparsers = parser.add_subparsers(help="Commands", dest="command")
def add_backend_args(p):
p.add_argument("--backend", default=None, help="Backend to use.")
p.add_argument("--args", default=None, help="Backend's arguments.")
dump_parser = subparsers.add_parser(
"dump", help="Print a brief information about the model to stdout.")
dump_parser.set_defaults(handler=dump_model)
dump_parser.add_argument(
"input", help="Path to the model file, URL or UUID.")
add_backend_args(dump_parser)
publish_parser = subparsers.add_parser(
"publish", help="Upload the model to the cloud and update the "
"registry.")
publish_parser.set_defaults(handler=publish_model)
publish_parser.add_argument(
"model", help="The path to the model to publish.")
add_backend_args(publish_parser)
publish_parser.add_argument("-d", "--update-default", action="store_true",
help="Set this model as the default one.")
publish_parser.add_argument("--force", action="store_true",
help="Overwrite existing models.")
list_parser = subparsers.add_parser(
"list", help="Lists all the models in the registry.")
list_parser.set_defaults(handler=list_models)
add_backend_args(list_parser)
init_parser = subparsers.add_parser("init", help="Initialize the registry.")
init_parser.set_defaults(handler=initialize_registry)
add_backend_args(init_parser)
args = parser.parse_args()
args.log_level = logging._nameToLevel[args.log_level]
setup_logging(args.log_level)
try:
handler = args.handler
except AttributeError:
def print_usage(_):
parser.print_usage()
handler = print_usage
return handler(args)
def logger():
getpid_patch = patch('logging.os.getpid', return_value=111)
getpid_patch.start()
time_patch = patch('logging.time.time', return_value=946725071.111111)
time_patch.start()
localzone_patch = patch('rfc5424logging.handler.get_localzone', return_value=timezone)
localzone_patch.start()
hostname_patch = patch('rfc5424logging.handler.socket.gethostname', return_value="testhostname")
hostname_patch.start()
connect_patch = patch('logging.handlers.socket.socket.connect', side_effect=connect_mock)
connect_patch.start()
sendall_patch = patch('logging.handlers.socket.socket.sendall', side_effect=connect_mock)
sendall_patch.start()
if '_levelNames' in logging.__dict__:
# Python 2.7
level_patch = patch.dict(logging._levelNames)
level_patch.start()
else:
# Python 3.x
level_patch1 = patch.dict(logging._levelToName)
level_patch1.start()
level_patch2 = patch.dict(logging._nameToLevel)
level_patch2.start()
logging.raiseExceptions = True
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
yield logger
getpid_patch.stop()
time_patch.stop()
localzone_patch.stop()
hostname_patch.stop()
connect_patch.stop()
sendall_patch.stop()
if '_levelNames' in logging.__dict__:
# Python 2.7
level_patch.stop()
else:
# Python 3.x
level_patch1.stop()
level_patch2.stop()
Rfc5424SysLogAdapter._extra_levels_enabled = False
def log(self):
log = self.host.log
if 'exc' in self.res:
exc = '\n' + ''.join(self.res['exc'])
log.error('{0}\n{1}'.format(self, exc))
elif self.res.get('stderr'):
if self.res.get('rc') != 0:
log.error('{0}\n{1}'.format(self, self.res['stderr']))
elif self.res['changed']:
log.changed('{0}\n{1}'.format(self, self.res['stderr']))
else:
data = self.res.get('diff', '')
if data.strip():
data = data.strip()
log.changed('{0} diff=\n{1}\n'.format(self, data))
elif self.cancelled():
log.error(self)
elif self.res['changed']:
log.changed(self)
else:
log.info(self)
for cmd_ in self.meta.get('remote_calls', []):
rtime = round(cmd_['time'], 3)
inf = '^ sh({cmd}) time({rtime})'.format(
rtime=rtime, **cmd_)
if cmd_['exc']:
log.error(inf + '\n' + ''.join(cmd_['exc']))
elif nuka.cli.args.verbose > 1:
log.info(inf)
stds = {k: v for k, v in cmd_.items()
if k in ('stderr', 'stdout') and v}
if stds:
log.debug3('^ ' + str(stds))
data = self.res.get('log')
if data:
level = None
for line in data.rstrip().split('\n'):
line = line.rstrip()
try:
line_level, message = line.split(':', 1)
except ValueError:
if level:
log.log(level, '^ ' + line)
else:
if line_level in logging._nameToLevel:
level = getattr(logging, line_level)
log.log(level, '^ ' + message)
else:
if level:
log.log(level, '^ ' + line)