def ConvertLog4ToCFLevel( log4level ):
if log4level == logging.FATAL+1 :
return CF.LogLevels.OFF
if log4level == logging.FATAL :
return CF.LogLevels.FATAL
if log4level == logging.ERROR :
return CF.LogLevels.ERROR
if log4level == logging.WARN :
return CF.LogLevels.WARN
if log4level == logging.INFO :
return CF.LogLevels.INFO
if log4level == logging.DEBUG :
return CF.LogLevels.DEBUG
if log4level == logging.TRACE :
return CF.LogLevels.TRACE
if log4level == logging.NOTSET:
return CF.LogLevels.ALL
return CF.LogLevels.INFO
python类NOTSET的实例源码
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def run(self, argv = None, data = None, logger = None):
"""
Runs the function
"""
if not logger is None:
assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger"
self.logger = logger
if not data is None:
assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration"
self.data = data
self.options, self.arguments = self._argparser.parseArguments(argv)
if self._argparser.loglevel == 1:
self._configureConsoleLoggers(logging.NOTSET, True)
elif self._argparser.loglevel == -1:
self._configureConsoleLoggers(logging.CRITICAL, False)
try:
self._argparser.validateRequiredArguments()
return self._app(ApplicationData(self))
except Exception as e:
logger.exception(e)
return 1
def run(self, argv = None, data = None, logger = None):
"""
Runs the function
"""
if not logger is None:
assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger"
self.logger = logger
if not data is None:
assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration"
self.data = data
self.options, self.arguments = self._argparser.parseArguments(argv)
if self._argparser.loglevel == 1:
self._configureConsoleLoggers(logging.NOTSET, True)
elif self._argparser.loglevel == -1:
self._configureConsoleLoggers(logging.CRITICAL, False)
try:
self._argparser.validateRequiredArguments()
return self._app(ApplicationData(self))
except Exception as e:
logger.exception(e)
return 1
def test_custom_handler(self, mocker):
handler = DummyHandler()
mock = mocker.MagicMock()
handler.emit = mock
logger = make_logger()
logger.handlers = [handler]
disable(NOTSET)
logger.debug('test')
assert mock.call_count == 1
emit_call = mock.mock_calls[0]
name, args, kwargs = emit_call
assert name == ''
log_record = args[0]
assert isinstance(log_record, LogRecord)
assert log_record.msg == 'test'
assert log_record.levelname == 'DEBUG'
del logger
def level_to_int(level: Union[str, int]) -> int:
if isinstance(level, int):
if logging.NOTSET <= level <= logging.FATAL:
return level
else:
raise ValueError('Log level must be 0 <= level <= 50,'
'but gat: {}'.format(level))
elif isinstance(level, str):
try:
return getattr(logging, level.upper())
except AttributeError:
raise ValueError('Invalid log level: {}'.format(level))
else:
raise TypeError(
'Log level must be int (0 ~ 50) or string,'
'but gat type: {}'.format(type(level)))
def __init__(self, pathname, **settings):
"""initial config for singleton baka framework
:param import_name: the name of the application package
:param settings: *optional dict settings for pyramid configuration
"""
self.import_name = pathname
self.settings = settings
self.__include = {}
self.__trafaret = trafaret_yaml
# Only set up a default log handler if the
# end-user application didn't set anything up.
if not (logging.root.handlers and log.level == logging.NOTSET and settings.get('LOGGING')):
formatter = logging.Formatter(logging_format)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def get(self, key, *sources, default=None, log_level=None, log_value=True):
"""
Get config value for key using default sources or provided sources. A successful get (not returning None)
will be cache value and source, subsequent get for that key will return that value regardless of other
parameters.
key -- the key for the value
sources -- custom source order for this key, if no sources the sources set by constructor or source property
will be used
default -- return this value if all sources fail, default value will be cached and logged as specified
log_level -- override log_level from constructor, makes get log key, value and source on first use,
set to logging.NOTSET to turn off logging
log_value -- set to False to prevent logging of value but still log the source for this key
"""
value, source = self.get_with_source(key, *sources, default=default, log_level=log_level, log_value=log_value)
return value
logging_handlers.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def to(cls, channel, host='127.0.0.1',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
level=logging.NOTSET):
"""Convenience class method to create a ZmqLoghandler and
connect to a ZMQ subscriber.
Args:
channel (string): Logging channel name. This is used to build a
ZMQ topic.
host (string): Hostname / ip address of the subscriber to publish
to.
port (int, string): Port on which to publish messages.
level (int): Logging level
"""
context = zmq.Context()
publisher = context.socket(zmq.PUB)
address = 'tcp://{}:{}'.format(host, port)
publisher.connect(address)
time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem.
return cls(channel, publisher, level=level)
def getLevel( self ):
"""
A convenience wrapper around ``getEffectiveLevel()`` because the integer values for the
various logging levels are clunky and probably don't mean anything to you.
Returns:
str: the name of the effective log level for this logging object, in lowercase
(``"warning"``, ``"info"``, etc.)
"""
level = self.getEffectiveLevel()
if level == logging.CRITICAL:
return 'critical'
elif level == logging.ERROR:
return 'error'
elif level == logging.WARNING:
return 'warning'
elif level == logging.INFO:
return 'info'
elif level == logging.DEBUG:
return 'debug'
elif level == logging.NOTSET:
return 'notset'
else:
return 'unknown ({})'.format( level )
def __init__(self, *args, **kwargs):
client = kwargs.pop('client_cls', Client)
if len(args) == 1:
arg = args[0]
args = args[1:]
if isinstance(arg, Client):
self.client = arg
else:
raise ValueError(
'The first argument to %s must be a Client instance, '
'got %r instead.' % (
self.__class__.__name__,
arg,
))
elif 'client' in kwargs:
self.client = kwargs.pop('client')
else:
self.client = client(*args, **kwargs)
logging.Handler.__init__(self, level=kwargs.get('level', logging.NOTSET))
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def test_get_level_name(self):
"""Test getLevelName returns level constant."""
# NOTE(flaper87): Bug #1517
self.assertEqual(logging.getLevelName('NOTSET'), 0)
self.assertEqual(logging.getLevelName('DEBUG'), 10)
self.assertEqual(logging.getLevelName('INFO'), 20)
self.assertEqual(logging.getLevelName('WARN'), 30)
self.assertEqual(logging.getLevelName('WARNING'), 30)
self.assertEqual(logging.getLevelName('ERROR'), 40)
self.assertEqual(logging.getLevelName('CRITICAL'), 50)
self.assertEqual(logging.getLevelName(0), 'NOTSET')
self.assertEqual(logging.getLevelName(10), 'DEBUG')
self.assertEqual(logging.getLevelName(20), 'INFO')
self.assertEqual(logging.getLevelName(30), 'WARNING')
self.assertEqual(logging.getLevelName(40), 'ERROR')
self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def _handle_existing_loggers(existing, child_loggers, disable_existing):
"""
When (re)configuring logging, handle loggers which were in the previous
configuration but are not in the new configuration. There's no point
deleting them as other threads may continue to hold references to them;
and by disabling them, you stop them doing any logging.
However, don't disable children of named loggers, as that's probably not
what was intended by the user. Also, allow existing loggers to NOT be
disabled if disable_existing is false.
"""
root = logging.root
for log in existing:
logger = root.manager.loggerDict[log]
if log in child_loggers:
logger.level = logging.NOTSET
logger.handlers = []
logger.propagate = True
elif disable_existing:
logger.disabled = True
def get_console_logger():
try:
console_logger = logging.getLogger()
console_logger.setLevel(logging.NOTSET)
# console_logger.propagate = False
# if there are two console_logger use only one.
if console_logger.handlers:
console_logger.handlers.pop()
# Set-up the logging configs
ch = logging.StreamHandler()
# Use the standard formatter constant
ch.setFormatter(FORMATTER)
# Only send stout INFO level messages
ch.setLevel(logging.INFO)
# TODO: Delete LessThanFilter if not needed in future
# ch.addFilter(LessThanFilter(logging.WARNING))
# add the handler
console_logger.addHandler(ch)
except TypeError as e:
sys.stdout.write(str("Console logger is having issues: {}\n".format(e)))
return console_logger
def get_app_logger(name=None):
try:
logger_map = {"__webbreaker__": APP_LOG}
app_logger = logging.getLogger("__webbreaker__")
app_logger.setLevel(logging.NOTSET)
# if there are two app_loggers use only one.
if app_logger.handlers:
app_logger.handlers.pop()
formatter = logging.Formatter('%(asctime)s: %(name)s %(levelname)s(%(message)s')
fh = logging.FileHandler(logger_map[name], mode='a')
fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
fh.setLevel(logging.INFO)
app_logger.addHandler(fh)
except TypeError as e:
sys.stdout.write(str("App logger error: {}!\n".format(e)))
return app_logger
def get_debug_logger(name=None):
try:
debug_logger = logging.getLogger(name)
debug_logger.setLevel(logging.NOTSET)
# if there are two debug_logger use only one.
if debug_logger.handlers:
debug_logger.handlers.pop()
debug_formatter = logging.Formatter('%(asctime)s: %(name)s %(levelname)s(%(message)s')
fh = logging.FileHandler(DEBUG_LOG, mode='a')
fh.setFormatter(debug_formatter)
fh.setLevel(logging.DEBUG)
debug_logger.addHandler(fh)
except TypeError as e:
sys.stdout.write(str("Debug logger error: {}!\n".format(e)))
return debug_logger
# Override existing hierarchical filter logic in logger
def tune(self, src_batch, trg_batch, epochs):
self._ensure_model_loaded()
if self._tuner is None:
self._tuner = NMTEngineTrainer(self._model, self._optim, self._src_dict, self._trg_dict,
model_params=self._model_params, gpu_ids=([0] if self._using_cuda else None))
self._tuner.min_perplexity_decrement = -1.
self._tuner.set_log_level(logging.NOTSET)
self._tuner.min_epochs = self._tuner.max_epochs = epochs
# Convert words to indexes [suggestions]
tuning_src_batch, tuning_trg_batch = [], []
for source, target in zip(src_batch, trg_batch):
tuning_src_batch.append(self._src_dict.convertToIdx(source, Constants.UNK_WORD))
tuning_trg_batch.append(self._trg_dict.convertToIdx(target, Constants.UNK_WORD,
Constants.BOS_WORD, Constants.EOS_WORD))
# Prepare data for training on the tuningBatch
tuning_dataset = Dataset(tuning_src_batch, tuning_trg_batch, 32, self._using_cuda)
self._tuner.train_model(tuning_dataset, save_epochs=0)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def setLogLevel(self,level):
''' This method allows to change the default logging level'''
#if isinstance(level,basestring): level = level.upper()
if type(level)==type(logging.NOTSET):
self.log_obj.setLevel(level)
#self.debug('log.Logger: Logging level set to %s'%
#str(level).upper())
else:
l = self.getLogLevel(level)
if l is None:
self.warning('log.Logger: Logging level cannot be set to "%s"'
%level)
elif l!=self.log_obj.level:
self.log_obj.setLevel(l)
self.debug('log.Logger: Logging level set to "%s" = %s'
%(level,l))
return level
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def add_loggers(self, loggers, stdout_level=logging.NOTSET, file_level=logging.NOTSET):
"""Adds loggers for stdout/filesystem handling.
Stdout: loggers will log to stdout only when mentioned in `log` option. If they're
mentioned without explicit level, `stdout_level` will be used.
Filesystem: loggers will log to files at `file_level`.
:arg loggers: List of logger names.
:arg stdout_level: Default level at which stdout handlers will pass logs.
By default: `logging.NOTSET`, which means: pass everything.
:arg file_level: Level at which filesystem handlers will pass logs.
By default: `logging.NOTSET`, which means: pass everything.
"""
self._enabled = True
self._loggers.append((loggers, _sanitize_level(stdout_level), _sanitize_level(file_level)))
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def _resetLogging(self):
# ensure we dont attach the handlers multiple times.
if self.logging_initialized:
return
self.logging_initialized = True
with self.uid_manager:
util.mkdirIfAbsent(self.resultdir)
# attach logs to log files.
# This happens in addition to anything that
# is set up in the config file... ie. logs go everywhere
for (log, filename, fmt_str) in (
(self.state.state_log, "state.log", self.config['state_log_fmt_str']),
(self.build_log, "build.log", self.config['build_log_fmt_str']),
(self.root_log, "root.log", self.config['root_log_fmt_str'])):
fullPath = os.path.join(self.resultdir, filename)
fh = logging.FileHandler(fullPath, "a+")
formatter = logging.Formatter(fmt_str)
fh.setFormatter(formatter)
fh.setLevel(logging.NOTSET)
log.addHandler(fh)
log.info("Mock Version: %s", self.config['version'])
def emit(self, record):
if record.levelno < logging.WARNING and self._modules and not record.name in self._modules:
# Log INFO and DEBUG only with enabled modules
return
levels = {
logging.CRITICAL: xbmc.LOGFATAL,
logging.ERROR: xbmc.LOGERROR,
logging.WARNING: xbmc.LOGWARNING,
logging.INFO: xbmc.LOGNOTICE,
logging.DEBUG: xbmc.LOGSEVERE,
logging.NOTSET: xbmc.LOGNONE,
}
try:
xbmc.log(self.format(record), levels[record.levelno])
except:
try:
xbmc.log(self.format(record).encode('utf-8', 'ignore'), levels[record.levelno])
except:
xbmc.log(b"[%s] Unicode Error in message text" % self.pluginName, levels[record.levelno])
def DEFAULT_LOGGING_CONFIG(level=logging.WARN, format=LOG_FORMAT):
"""Returns a default logging config in dict format.
Compatible with logging.config.dictConfig(), this default set the root
logger to `level` with `sys.stdout` console handler using a formatter
initialized with `format`. A simple 'brief' formatter is defined that
shows only the message portion any log entries."""
return {
"version": 1,
"formatters": {"generic": {"format": format},
"brief": {"format": "%(message)s"},
},
"handlers": {"console": {"class": "logging.StreamHandler",
"level": "NOTSET",
"formatter": "generic",
"stream": "ext://sys.stdout",
},
},
"root": {"level": level,
"handlers": ["console"],
},
"loggers": {},
}
def __init__(self, login_ip, login_port, game_ip, game_port, magic=None, single_quotes=False, logger=None):
self._login_ip = login_ip
self._login_port = login_port
self._game_ip = game_ip
self._game_port = game_port
if logger is None:
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
self._logger = logger
self._magic = "Y(02.>'H}t\":E1" if magic is None else magic
self._single_quotes = single_quotes
self._connected = False
self._buffer = ""
self._handlers = {}
self._nexts = []
self._internal_room_id = -1
self._id = -1
self._coins = -1
self._room = -1
self._penguins = {}
self._follow = None