def ConvertLog4ToCFLevel( log4level ):
if log4level == logging.FATAL+1 :
return CF.LogLevels.OFF
if log4level == logging.FATAL :
return CF.LogLevels.FATAL
if log4level == logging.ERROR :
return CF.LogLevels.ERROR
if log4level == logging.WARN :
return CF.LogLevels.WARN
if log4level == logging.INFO :
return CF.LogLevels.INFO
if log4level == logging.DEBUG :
return CF.LogLevels.DEBUG
if log4level == logging.TRACE :
return CF.LogLevels.TRACE
if log4level == logging.NOTSET:
return CF.LogLevels.ALL
return CF.LogLevels.INFO
python类TRACE的实例源码
def ConvertToLog4Level( newLevel ):
level = logging.INFO
if newLevel == CF.LogLevels.OFF :
level=logging.FATAL+1
if newLevel == CF.LogLevels.FATAL :
level=logging.FATAL
if newLevel == CF.LogLevels.ERROR :
level=logging.ERROR
if newLevel == CF.LogLevels.WARN :
level=logging.WARN
if newLevel == CF.LogLevels.INFO:
level=logging.INFO
if newLevel == CF.LogLevels.DEBUG:
level=logging.DEBUG
if newLevel == CF.LogLevels.TRACE:
level=logging.TRACE
if newLevel == CF.LogLevels.ALL:
level=logging.TRACE
return level
def vm_logger():
logger = logging.getLogger('evm')
handler = logging.StreamHandler(sys.stdout)
# level = logging.TRACE
# level = logging.DEBUG
level = logging.INFO
logger.setLevel(level)
handler.setLevel(level)
logger.addHandler(handler)
return logger
# Uncomment this to have logs from tests written to a file. This is useful for
# debugging when you need to dump the VM output from test runs.
def configure_logging(verbosity_loglevel):
llvl = LogLevel.getbyverbosity(verbosity_loglevel)
logging.TRACE = logging.DEBUG - 1
logging.addLevelName(logging.TRACE, "TRACE")
logging_loglevel = {
LogLevel.Silent: logging.WARNING,
LogLevel.Normal: logging.INFO,
LogLevel.Verbose: logging.DEBUG,
LogLevel.Debug: logging.TRACE,
}[llvl]
def __log_trace(self, message, *args, **kwargs):
if self.isEnabledFor(logging.TRACE):
self._log(logging.TRACE, message, args, **kwargs)
logging.Logger.trace = __log_trace
logging.basicConfig(format = " {name:>20s} [{levelname:.1s}]: {message}", style = "{", level = logging_loglevel)
def test_get_configuration():
root_logger = slogging.getLogger()
root_logger.manager.loggerDict = {} # clear old loggers
config_string = ':INFO,a:TRACE,a.b:DEBUG'
log_json = False
slogging.configure(config_string=config_string, log_json=log_json)
config = slogging.get_configuration()
assert config['log_json'] == log_json
assert set(config['config_string'].split(',')) == set(config_string.split(','))
log_json = True
slogging.configure(config_string=config_string, log_json=log_json)
config = slogging.get_configuration()
assert config['log_json'] == log_json
assert set(config['config_string'].split(',')) == set(config_string.split(','))
# set config differntly
slogging.configure(config_string=':TRACE', log_json=False)
config2 = slogging.get_configuration()
# test whether we get original config
slogging.configure(**config)
config = slogging.get_configuration()
assert config['log_json'] == log_json
assert set(config['config_string'].split(',')) == set(config_string.split(','))
def test_recorder(caplog):
slogging.configure(log_json=True)
log = slogging.get_logger()
# test info
recorder = slogging.LogRecorder()
assert len(slogging.log_listeners) == 1
log.info('a', v=1)
assert "a" in caplog.text
r = recorder.pop_records()
assert r[0] == dict(event='a', v=1)
assert len(slogging.log_listeners) == 0
# test trace
log.setLevel(logging.TRACE)
recorder = slogging.LogRecorder()
assert len(slogging.log_listeners) == 1
log.trace('a', v=2)
assert '"v": 2' in caplog.text
r = recorder.pop_records()
assert r[0] == dict(event='a', v=2)
assert len(slogging.log_listeners) == 0
def test_bound_logger(caplog):
slogging.configure(config_string=':trace')
real_log = slogging.getLogger()
bound_log_1 = real_log.bind(key1="value1")
with caplog.at_level(slogging.TRACE):
bound_log_1.info("test1")
assert "test1" in caplog.text
assert 'key1=value1' in caplog.text
bound_log_2 = bound_log_1.bind(key2="value2")
with caplog.at_level(slogging.TRACE):
bound_log_2.info("test2")
assert "test2" in caplog.text
assert 'key1=value1' in caplog.text
assert 'key2=value2' in caplog.text
def test_get_configuration():
root_logger = slogging.getLogger()
root_logger.manager.loggerDict = {} # clear old loggers
config_string = ':INFO,a:TRACE,a.b:DEBUG'
log_json = False
slogging.configure(config_string=config_string, log_json=log_json)
config = slogging.get_configuration()
assert config['log_json'] == log_json
assert set(config['config_string'].split(',')) == set(config_string.split(','))
log_json = True
slogging.configure(config_string=config_string, log_json=log_json)
config = slogging.get_configuration()
assert config['log_json'] == log_json
assert set(config['config_string'].split(',')) == set(config_string.split(','))
# set config differntly
slogging.configure(config_string=':TRACE', log_json=False)
config2 = slogging.get_configuration()
# test whether we get original config
slogging.configure(**config)
config = slogging.get_configuration()
assert config['log_json'] == log_json
assert set(config['config_string'].split(',')) == set(config_string.split(','))
def test_recorder(caplog):
slogging.configure(log_json=True)
log = slogging.get_logger()
# test info
recorder = slogging.LogRecorder()
assert len(slogging.log_listeners) == 1
log.info('a', v=1)
assert "a" in caplog.text
r = recorder.pop_records()
assert r[0] == dict(event='a', v=1)
assert len(slogging.log_listeners) == 0
# test trace
log.setLevel(logging.TRACE)
recorder = slogging.LogRecorder()
assert len(slogging.log_listeners) == 1
log.trace('a', v=2)
assert '"v": 2' in caplog.text
r = recorder.pop_records()
assert r[0] == dict(event='a', v=2)
assert len(slogging.log_listeners) == 0
def test_bound_logger(caplog):
slogging.configure(config_string=':trace')
real_log = slogging.getLogger()
bound_log_1 = real_log.bind(key1="value1")
with caplog.at_level(slogging.TRACE):
bound_log_1.info("test1")
assert "test1" in caplog.text
assert 'key1=value1' in caplog.text
bound_log_2 = bound_log_1.bind(key2="value2")
with caplog.at_level(slogging.TRACE):
bound_log_2.info("test2")
assert "test2" in caplog.text
assert 'key1=value1' in caplog.text
assert 'key2=value2' in caplog.text
def _trace(msg, *args, **kw):
logging.log(logging.TRACE, msg, *args, **kw)
def trace(self, msg, *args, **kw):
self.log(logging.TRACE, msg, *args, **kw)
def ConvertLevelNameToDebugLevel( level_name ):
if level_name == "OFF" : return 0
if level_name == "FATAL" : return 0
if level_name == "ERROR" : return 1
if level_name == "WARN" : return 2
if level_name == "INFO" : return 3
if level_name == "DEBUG" : return 4
if level_name == "TRACE": return 5
if level_name == "ALL" : return 5
return 3
def SupportedCFLevel( newLevel ):
level = True
if newLevel != CF.LogLevels.OFF and \
newLevel != CF.LogLevels.FATAL and \
newLevel != CF.LogLevels.ERROR and \
newLevel != CF.LogLevels.WARN and \
newLevel != CF.LogLevels.INFO and \
newLevel != CF.LogLevels.DEBUG and \
newLevel != CF.LogLevels.TRACE and \
newLevel != CF.LogLevels.ALL :
level= False
return level
def trace(self, msg, *args, **kwargs):
self.log(logging.TRACE, msg, *args, **kwargs)
def trace(self, message, *args, **kws):
self.log(TRACE, message, *args, **kws)
def test_basic(caplog, level_name):
slogging.configure(":trace")
log = slogging.get_logger()
with caplog.at_level('TRACE'):
getattr(log, level_name)(level_name)
assert len(caplog.records) == 1
assert caplog.records[0].levelname == level_name.upper()
assert level_name in caplog.records[0].msg
def test_howto_use_in_tests():
# select what you want to see.
# e.g. TRACE from vm except for pre_state :DEBUG otherwise
slogging.configure(':DEBUG,eth.vm:TRACE,vm.pre_state:INFO')
log = slogging.get_logger('tests.logging')
log.info('test starts')
def test_basic(caplog, level_name):
slogging.configure(":trace")
log = slogging.get_logger()
with caplog.at_level('TRACE'):
getattr(log, level_name)(level_name)
assert len(caplog.records) == 1
assert caplog.records[0].levelname == level_name.upper()
assert level_name in caplog.records[0].msg
def test_howto_use_in_tests():
# select what you want to see.
# e.g. TRACE from vm except for pre_state :DEBUG otherwise
slogging.configure(':DEBUG,eth.vm:TRACE,vm.pre_state:INFO')
log = slogging.get_logger('tests.logging')
log.info('test starts')
def __add_options(parser):
levels = ('TRACE', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level',
choices=levels, metavar="LEVEL",
default='INFO',
dest='loglevel',
help=('Amount of detail in build-time console messages. '
'LEVEL may be one of %s (default: %%(default)s).'
% ', '.join(levels))
)
def __init__(self, msg):
logging.Formatter.__init__(self, msg)
#
# color for different logging levels. The current terminal color
# is used for INFO
self.LEVEL_COLOR = {
'TRACE': 'DARK_CYAN',
'DEBUG': 'BLUE',
'WARNING': 'PURPLE',
'ERROR': 'RED',
'CRITICAL': 'RED_BG',
}
def _set_logger(self, unused=None):
if not hasattr(logging, 'TRACE'):
logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
# create a logger, we current use the regular logger but we should
# switch to multiprocessing.get_logger if we notice trouble in, for example,
# logging from multiple processes.
self._logger = logging.getLogger()
# clear previous handler
while self._logger.hasHandlers():
self._logger.removeHandler(self._logger.handlers[0])
self._logger.setLevel(logging.DEBUG)
# output to standard output
cout = logging.StreamHandler()
levels = {
0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG,
4: logging.TRACE,
None: logging.INFO
}
#
cout.setLevel(levels[self._verbosity])
cout.setFormatter(ColoredFormatter('%(color_levelname)s: %(color_msg)s'))
self._logger.addHandler(cout)
self._logger.trace = lambda msg, *args: self._logger._log(logging.TRACE, msg, args)
# output to a log file
if self._logfile is not None:
ch = logging.FileHandler(self._logfile, mode = 'a')
# debug informaiton and time is always written to the log file
ch.setLevel(logging.DEBUG)
ch.setFormatter(logging.Formatter('%(asctime)s: %(levelname)s: %(message)s'))
self._logger.addHandler(ch)
#
# attribute exec_dir
def build(self):
""" Builds the model.
"""
if not self._parsed:
logger.warning('The model has not been parsed yet. We will try to '
'parse it without context, but this may easily fail. Make '
'sure Model.parse() is called before Model.build().')
self.parse(None)
logger.debug('Enumerating the model containers.')
# Construct the high-level network nodes.
nodes = self.enumerate_nodes(self.root)
logger.debug('Assembling the model dependency graph.')
input_nodes, output_nodes, network = self.assemble_graph(nodes)
if logger.isEnabledFor(logging.TRACE):
queue = deque(input_nodes.values())
while queue:
node = queue.popleft()
logger.trace('Assembled Node: %s', node.container.name)
logger.trace(' Uses: %s', ', '
.join([x.container.name for x in node.inputs]))
logger.trace(' Used by: %s', ', '
.join([x.container.name for x in node.outputs]))
logger.trace(' Aliases: %s', ', '.join(node.names))
queue.extend(node.outputs)
logger.debug('Connecting the model graph.')
inputs, input_aliases, outputs, output_aliases = \
self.build_graph(input_nodes, output_nodes, network)
logger.debug('Model inputs: %s', ', '.join(node for node in inputs))
logger.debug('Model outputs: %s', ', '.join(node for node in outputs))
self.inputs = inputs
self.outputs = outputs
self.network = network
self.input_aliases = input_aliases
self.output_aliases = output_aliases
self.key_cache = {}
self.compiled = None
###########################################################################
def main():
""" Entry point for the Kur command-line script.
"""
gotcha = False
plugin_dir = None
for arg in sys.argv[1:]:
if gotcha:
plugin_dir = arg
break
elif arg == '--plugin':
gotcha = True
load_plugins(plugin_dir)
parser, _ = build_parser()
args = parse_args(parser)
loglevel = {
0 : logging.WARNING,
1 : logging.INFO,
2 : logging.DEBUG,
3 : logging.TRACE
}
config = logging.basicConfig if args.no_color else logcolor.basicConfig
# feature_added
config(
level=loglevel.get(args.verbose, logging.TRACE),
format='{color}[%(levelname)s %(asctime)s %(name)s %(funcName)s:%(lineno)s]{reset} '
'%(message)s'.format(
color='' if args.no_color else '$COLOR',
reset='' if args.no_color else '$RESET'
)
)
logging.captureWarnings(True)
do_monitor(args)
if args.version:
args.func = version
elif not hasattr(args, 'func'):
print('Nothing to do!', file=sys.stderr)
print('For usage information, try: kur --help', file=sys.stderr)
print('Or visit our homepage: {}'.format(__homepage__))
sys.exit(1)
engine = JinjaEngine()
setattr(args, 'engine', engine)
sys.exit(args.func(args) or 0)
###############################################################################
def build(self):
""" Builds the model.
"""
if not self._parsed:
logger.warning('The model has not been parsed yet. We will try to '
'parse it without context, but this may easily fail. Make '
'sure Model.parse() is called before Model.build().')
self.parse(None)
logger.debug('Enumerating the model containers.')
# Construct the high-level network nodes.
nodes = self.enumerate_nodes(self.root)
logger.debug('Assembling the model dependency graph.')
input_nodes, output_nodes, network = self.assemble_graph(nodes)
if logger.isEnabledFor(logging.TRACE):
queue = deque(input_nodes.values())
while queue:
node = queue.popleft()
logger.trace('Assembled Node: %s', node.container.name)
logger.trace(' Uses: %s', ', '
.join([x.container.name for x in node.inputs]))
logger.trace(' Used by: %s', ', '
.join([x.container.name for x in node.outputs]))
logger.trace(' Aliases: %s', ', '.join(node.names))
queue.extend(node.outputs)
logger.debug('Connecting the model graph.')
inputs, input_aliases, outputs, output_aliases = \
self.build_graph(input_nodes, output_nodes, network)
logger.debug('Model inputs: %s', ', '.join(node for node in inputs))
logger.debug('Model outputs: %s', ', '.join(node for node in outputs))
self.inputs = inputs
self.outputs = outputs
self.network = network
self.input_aliases = input_aliases
self.output_aliases = output_aliases
self.key_cache = {}
self.compiled = None
###########################################################################
def main():
""" Entry point for the Kur command-line script.
"""
gotcha = False
plugin_dir = None
for arg in sys.argv[1:]:
if gotcha:
plugin_dir = arg
break
elif arg == '--plugin':
gotcha = True
plugin_dir = plugin_dir or os.environ.get('KUR_PLUGIN')
load_plugins(plugin_dir)
parser, _ = build_parser()
args = parse_args(parser)
loglevel = {
0 : logging.WARNING,
1 : logging.INFO,
2 : logging.DEBUG,
3 : logging.TRACE
}
config = logging.basicConfig if args.no_color else logcolor.basicConfig
config(
level=loglevel.get(args.verbose, logging.TRACE),
format='{color}[%(levelname)s %(asctime)s %(name)s:%(lineno)s]{reset} '
'%(message)s'.format(
color='' if args.no_color else '$COLOR',
reset='' if args.no_color else '$RESET'
)
)
logging.captureWarnings(True)
do_monitor(args)
if args.version:
args.func = version
elif not hasattr(args, 'func'):
print('Nothing to do!', file=sys.stderr)
print('For usage information, try: kur --help', file=sys.stderr)
print('Or visit our homepage: {}'.format(__homepage__))
sys.exit(1)
engine = JinjaEngine()
setattr(args, 'engine', engine)
sys.exit(args.func(args) or 0)
###############################################################################