def create_logger():
#logging.basicConfig(format='%(levelname)s - %(message)s')
logging.basicConfig(format='%(message)s')
root = logging.getLogger()
root.setLevel(logging.getLevelName('INFO'))
#Add handler for standard output (console) any debug+
#ch = logging.StreamHandler(sys.stdout)
#ch.setLevel(logging.getLevelName('DEBUG'))
#formatter = logging.Formatter('%(message)s')
#ch.setFormatter(formatter)
#handler = ColorStreamHandler()
#handler.setLevel(logging.getLevelName("DEBUG"))
#root.addHandler(handler)
return root
python类getLevelName()的实例源码
def _cli_log_message(msg, logger_name=None, level="INFO"):
"""
Log a single message to Flightlog. Intended for CLI usage. Calling this
function multiple times within the same process will configure duplicate
handlers and result in duplicate messages.
"""
logger = logging.getLogger(logger_name)
levelnum = logging.getLevelName(level.upper())
try:
int(levelnum)
except ValueError:
raise ValueError("level must be one of DEBUG, INFO, WARNING, ERROR, CRITICAL")
handler = FlightlogHandler(background=False)
logger.addHandler(handler)
logger.setLevel(levelnum)
if msg == "-":
msg = sys.stdin.read()
for line in msg.splitlines():
if line:
logger.log(levelnum, line)
exit_code = 0
return None, exit_code
def test_log_msg():
# This test assumes the default message format found around line 139
# of linchpin/cli/context.py
lvl=logging.DEBUG
msg = 'Test Msg'
regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log(msg, level=lvl)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)
def test_log_debug():
lvl=logging.DEBUG
msg = 'Debug Msg'
regex = '^{0}.*{1}'.format(logging.getLevelName(lvl), msg)
lpc = LinchpinCliContext()
lpc.load_config(config_path)
lpc.setup_logging()
lpc.log_debug(msg)
with open(logfile) as f:
line = f.readline()
assert_regexp_matches(line, regex)
def get_log_config(level):
return {
'version': 1,
'formatters': {
'basicFormatter': {
'format': '[%(asctime)s %(levelname)s %(threadName)s] %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'basicFormatter',
'stream': 'ext://sys.stdout'
}
},
'loggers': {
'hephaestus': {
'level': logging.getLevelName(level),
'propagate': False,
'handlers': ['console']
}
}
}
def main(argv):
""" MySQL binlog to Google Pub/Sub entry point
Args:
argv (list): list of command line arguments
"""
args = _setup_arg_parser(argv)
conf_file = args.conf
if conf_file:
os.environ['BINLOG2GPUBSUB_CONF_FILE'] = conf_file
if args.logconf:
logging.config.fileConfig(args.logconf, disable_existing_loggers=False)
else:
logging.basicConfig()
if args.loglevel:
logging.root.setLevel(logging.getLevelName(args.loglevel.upper()))
import mysqlbinlog2gpubsub
mysqlbinlog2gpubsub.start_publishing()
def test_invalid_value(self):
"""
Sends an invalid value to the handler.
"""
message = 'Needs more cowbell.'
context = {
'key': 'test',
'value': "(Don't Fear) The Reaper",
}
self.handler.handle_invalid_value(message, False, context)
self.assertEqual(len(self.logs.records), 1)
self.assertEqual(self.logs[0].msg, message)
self.assertEqual(getattr(self.logs[0], 'context'), context)
# The log message level is set in the handler's initializer.
self.assertEqual(self.logs[0].levelname, getLevelName(WARNING))
# No exception info for invalid values (by default).
self.assertIsNone(self.logs[0].exc_text)
def _level_write(self, level, str_format, *args):
if level < self._level:
return
levelname = logging.getLevelName(level)
message = str_format % args if args else str_format
message = strutils.decode(message)
frame, filename, line_number, function_name, lines, index = inspect.stack()[2]
props = dict(
asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
name=self._name,
filename=os.path.basename(filename),
lineno=line_number,
message=message,
)
props['levelname'] = Logger.__alias.get(levelname, levelname)
output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props)
self._write(output)
def init(cls):
cls.logger = logging.getLogger(_LOGGER_NAME)
logger = cls.logger
levelname = os.environ.get('JUBAKIT_LOG_LEVEL', None)
if not levelname:
# Surpress printing logs by default.
logger.addHandler(cls._NullHandler())
logger.setLevel(CRITICAL)
return
# Setup logger from environment variable.
for lvl in (DEBUG, INFO, WARNING, ERROR, CRITICAL):
if logging.getLevelName(lvl) == levelname:
setup_logger(lvl)
break
else:
setup_logger(INFO)
logger.warning('invalid JUBAKIT_LOG_LEVEL (%s) specified; continue with INFO', levelname)
def _level_write(self, level, str_format, *args):
if level < self._level:
return
levelname = logging.getLevelName(level)
message = str_format % args if args else str_format
message = strutils.decode(message)
frame, filename, line_number, function_name, lines, index = inspect.stack()[2]
props = dict(
asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
name=self._name,
filename=os.path.basename(filename),
lineno=line_number,
message=message,
)
props['levelname'] = Logger.__alias.get(levelname, levelname)
output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props)
self._write(strutils.encode(output, 'utf-8'))
def setup_logging(args_obj):
if args_obj.verbose:
level = logging.DEBUG
else:
level = logging.getLevelName(args_obj.log_level.upper())
formatter = logging.Formatter(
fmt='%(asctime)-15s - %(levelname)s - %(name)s'
'[line:%(lineno)d thread:%(threadName)s(%(thread)d) '
'process:%(processName)s(%(process)d)]'
' - %(message)s'
)
logger = logging.getLogger()
logger.setLevel(level)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
def test_get_level_name(self):
"""Test getLevelName returns level constant."""
# NOTE(flaper87): Bug #1517
self.assertEqual(logging.getLevelName('NOTSET'), 0)
self.assertEqual(logging.getLevelName('DEBUG'), 10)
self.assertEqual(logging.getLevelName('INFO'), 20)
self.assertEqual(logging.getLevelName('WARN'), 30)
self.assertEqual(logging.getLevelName('WARNING'), 30)
self.assertEqual(logging.getLevelName('ERROR'), 40)
self.assertEqual(logging.getLevelName('CRITICAL'), 50)
self.assertEqual(logging.getLevelName(0), 'NOTSET')
self.assertEqual(logging.getLevelName(10), 'DEBUG')
self.assertEqual(logging.getLevelName(20), 'INFO')
self.assertEqual(logging.getLevelName(30), 'WARNING')
self.assertEqual(logging.getLevelName(40), 'ERROR')
self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def log_formatter(line):
split = line.split(' / ')
if len(split) == 4:
return {
# Other parsing options: http://stackoverflow.com/questions/466345/converting-string-into-datetime
#
"timestamp": date_parse(split[0] + "+00"),
"level": logging.getLevelName(split[1]),
"message": split[3],
"process": split[2],
"device_id": None,
}
elif len(split) == 6:
return {
# Other parsing options: http://stackoverflow.com/questions/466345/converting-string-into-datetime
"timestamp": date_parse(split[0]),
"level": logging.getLevelName(split[1]),
"message": split[5],
"process": split[3],
"device_id": split[4],
}
else:
raise RuntimeError("The logs in the log file are of an unknown format, cannot continue. "
"Line: {}".format(line))
def _has_streamhandler(logger, level=None, fmt=LOG_FORMAT,
stream=DEFAULT_STREAM):
"""Check the named logger for an appropriate existing StreamHandler.
This only returns True if a StreamHandler that exaclty matches
our specification is found. If other StreamHandlers are seen,
we assume they were added for a different purpose.
"""
# Ensure we are talking the same type of logging levels
# if they passed in a string we need to convert it to a number
if isinstance(level, basestring):
level = logging.getLevelName(level)
for handler in logger.handlers:
if not isinstance(handler, logging.StreamHandler):
continue
if handler.stream is not stream:
continue
if handler.level != level:
continue
if not handler.formatter or handler.formatter._fmt != fmt:
continue
return True
return False
def main():
config = options()
logging.basicConfig(level=logging.getLevelName(config.log_level),
format=config.log_format)
db_connection = get_connection(config.log_service.db.servers,
config.log_service.db.replica_name)
collection = get_collection(config.log_service.db.name,
config.log_service.db.collection,
db_connection)
agent_log_service = AgentLogService(config.log_service.bind_address,
collection)
LOG.info('Starting logging service on {}'.format(
config.log_service.bind_address))
agent_log_service.start()
def main():
config = get_inventory_configuration()
logging.basicConfig(level=logging.getLevelName(config.log_level),
format=config.log_format)
loop = zmq.asyncio.ZMQEventLoop()
loop.set_debug(config.asyncio_debug)
asyncio.set_event_loop(loop)
s = InventoryServer(bind_address=config.inventory.bind_address,
config=config)
try:
loop.run_until_complete(s.start())
except KeyboardInterrupt:
log.info('Stopping service')
s.kill()
finally:
pending = asyncio.Task.all_tasks(loop=loop)
loop.run_until_complete(asyncio.gather(*pending))
loop.close()
def _init_logger(verbosity):
# set up the logger
global logger
logger = logging.getLogger('conda_mirror')
logmap = {0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG}
loglevel = logmap.get(verbosity, '3')
# clear all handlers
for handler in logger.handlers:
logger.removeHandler(handler)
logger.setLevel(loglevel)
format_string = '%(levelname)s: %(message)s'
formatter = logging.Formatter(fmt=format_string)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(loglevel)
stream_handler.setFormatter(fmt=formatter)
logger.addHandler(stream_handler)
print("Log level set to %s" % logging.getLevelName(logmap[verbosity]),
file=sys.stdout)
def get_configuration():
"""
get a configuration (snapshot) that can be used to call configure
snapshot = get_configuration()
configure(**snapshot)
"""
root = getLogger()
name_levels = [('', logging.getLevelName(root.level))]
name_levels.extend(
(name, logging.getLevelName(logger.level))
for name, logger
in root.manager.loggerDict.items()
if hasattr(logger, 'level')
)
config_string = ','.join('%s:%s' % x for x in name_levels)
return dict(config_string=config_string, log_json=SLogger.manager.log_json)
def initialize_logger(name, log_level):
"""initializes the logger to a level with a name
logger = initialize_logger(name, log_level)
Parameters
----------
name : str
name of the logger
log_level :
Returns
-------
logging.Logger
a logger set with the name and level specified
"""
level = logging.getLevelName(log_level)
logging.basicConfig()
logger = logging.getLogger(name)
logger.setLevel(level=level)
return logger
def main():
args_parser = argparse.ArgumentParser()
args_parser.add_argument('-c', '--config')
args = args_parser.parse_args()
powergslb.system.parse_config(args.config)
config = powergslb.system.get_config()
logging.basicConfig(
format=config.get('logging', 'format'),
level=logging.getLevelName(config.get('logging', 'level'))
)
service_threads = [
powergslb.monitor.MonitorThread(name='Monitor'),
powergslb.server.ServerThread(name='Server')
]
service = powergslb.system.SystemService(service_threads)
service.start()
def test_get_level_name(self):
"""Test getLevelName returns level constant."""
# NOTE(flaper87): Bug #1517
self.assertEqual(logging.getLevelName('NOTSET'), 0)
self.assertEqual(logging.getLevelName('DEBUG'), 10)
self.assertEqual(logging.getLevelName('INFO'), 20)
self.assertEqual(logging.getLevelName('WARN'), 30)
self.assertEqual(logging.getLevelName('WARNING'), 30)
self.assertEqual(logging.getLevelName('ERROR'), 40)
self.assertEqual(logging.getLevelName('CRITICAL'), 50)
self.assertEqual(logging.getLevelName(0), 'NOTSET')
self.assertEqual(logging.getLevelName(10), 'DEBUG')
self.assertEqual(logging.getLevelName(20), 'INFO')
self.assertEqual(logging.getLevelName(30), 'WARNING')
self.assertEqual(logging.getLevelName(40), 'ERROR')
self.assertEqual(logging.getLevelName(50), 'CRITICAL')
def configure_logging(verbosity: int, logPath: str, isDaemon=False):
rootLogger = logging.getLogger()
if logPath:
logPath = Path(logPath).expanduser()
else:
name = 'i3configger-daemon.log' if isDaemon else 'i3configger.log'
logPath = Path(tempfile.gettempdir()) / name
if DEBUG:
print('logging to %s' % logPath)
level = logging.getLevelName('DEBUG')
else:
level = logging.getLevelName(
{0: 'ERROR', 1: 'WARNING', 2: 'INFO'}.get(verbosity, 'DEBUG'))
fmt = ('%(asctime)s %(name)s:%(funcName)s:%(lineno)s '
'%(levelname)s: %(message)s')
if not rootLogger.handlers:
logging.basicConfig(format=fmt, level=level)
fileHandler = logging.FileHandler(logPath)
fileHandler.setFormatter(logging.Formatter(fmt))
fileHandler.setLevel(level)
rootLogger.addHandler(fileHandler)
def get_logger(name, level=INFO, fac=SysLogHandler.LOG_LOCAL1):
global LOG_TRANS
for lt in LOG_TRANS:
if not LOG_TRANS[lt]['old']:
LOG_TRANS[lt]['old'] = logging.getLevelName(lt)
logging.addLevelName(lt, LOG_TRANS[lt]['new'])
fmt = F('[%(name)s.%(funcName)s]: %(message)s')
log = logging.getLogger('%s' % name.split('.')[-1])
h = SysLogHandler(address='/dev/log', facility=parse_fac(fac))
h.setFormatter(fmt)
log.addHandler(h)
# h = StreamHandler(stream=LOGBUF)
# h.setFormatter(fmt)
# log.addHandler(h)
log.setLevel(level)
log.success = lambda msg: log.log(LOG_SUCCES, msg)
return log
def get_configuration():
"""
get a configuration (snapshot) that can be used to call configure
snapshot = get_configuration()
configure(**snapshot)
"""
root = getLogger()
name_levels = [('', logging.getLevelName(root.level))]
name_levels.extend(
(name, logging.getLevelName(logger.level))
for name, logger
in root.manager.loggerDict.items()
if hasattr(logger, 'level')
)
config_string = ','.join('%s:%s' % x for x in name_levels)
return dict(config_string=config_string, log_json=SLogger.manager.log_json)
def _level_write(self, level, str_format, *args):
if level < self._level:
return
levelname = logging.getLevelName(level)
message = str_format % args if args else str_format
message = strutils.decode(message)
frame, filename, line_number, function_name, lines, index = inspect.stack()[2]
props = dict(
asctime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
name=self._name,
filename=os.path.basename(filename),
lineno=line_number,
message=message,
)
props['levelname'] = Logger.__alias.get(levelname, levelname)
output = u'{asctime} {levelname:<5s} [{name}:{lineno:>4}] {message}'.format(**props)
self._write(output)
def create_log(self, log_path=None, log_level='DEBUG'):
"""Create a log file for debug output.
Args:
log_path: Path to log file. If None or empty
the log path name will be the
command line invocation name (argv[0]) with
a '.log' suffix in the user's home directory.
log_level: Log level:
'DEBUG', 'INFO', 'WARNING', 'ERROR', or 'CRITICAL'.
Default is 'DEBUG'.
"""
if log_path is None or not log_path:
log_dir = os.path.expanduser('~')
log_path = os.path.join(log_dir, sys.argv[0] + '.log')
log_path = os.path.expanduser(log_path)
logging.basicConfig(filename=os.path.abspath(log_path),
filemode='w', level=log_level.upper())
logger = logging.getLogger(__name__)
logger.info('Log started %s, level=%s', datetime.datetime.now(),
logging.getLevelName(logger.getEffectiveLevel()))
def log_to_stderr(level, formatter=_LOG_FORMATTER,
handler=logging.StreamHandler):
"""Setup logging or set logging level to STDERR.
Args:
level: a logging level, like logging.INFO
formatter: a logging.Formatter object
handler: logging.StreamHandler (this argument is for testing)
"""
global _STDERR_HANDLER
_level = get_loglevel(level)
if type(_STDERR_HANDLER) is handler:
_STDERR_HANDLER.setLevel(_level)
else:
_STDERR_HANDLER = handler(stream=sys.stderr)
_STDERR_HANDLER.setLevel(_level)
_STDERR_HANDLER.setFormatter(formatter)
logging.getLogger('').addHandler(_STDERR_HANDLER)
logging.debug('Setting logging at level=%s',
logging.getLevelName(_level))
def log_to_file(filename, level=INFO, formatter=_LOG_FORMATTER,
filemode=APPEND, handler=logging.FileHandler):
"""Setup logging or set logging level to file.
Args:
filename: string of path/file to write logs
level: a logging level, like logging.INFO
formatter: a logging.Formatter object
filemode: a mode of writing, like app.APPEND or app.CLOBBER
handler: logging.FileHandler (this argument is for testing)
"""
global _FILE_HANDLER
_level = get_loglevel(level)
if type(_FILE_HANDLER) is handler:
_FILE_HANDLER.setLevel(_level)
else:
_FILE_HANDLER = handler(filename=filename, mode=filemode)
_FILE_HANDLER.setLevel(_level)
_FILE_HANDLER.setFormatter(formatter)
logging.getLogger('').addHandler(_FILE_HANDLER)
logging.info('Logging to file %s [mode=\'%s\', level=%s]',
os.path.abspath(filename), filemode,
logging.getLevelName(_level))
def setup_logger(args):
warnings.filterwarnings("ignore", category=TechPreviewWarning)
warnings.filterwarnings("ignore", category=SeenWarning)
# Set up logging according to command-line verbosity
logger = logging.getLogger()
logger.setLevel(int(30 - (args.loglevel * 10)))
ch = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter(u'%(asctime)s %(name)s %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("Set logging level to {0}".format(logging.getLevelName(logger.getEffectiveLevel())))
# If log level is reduced, disable emissions from the `warnings` module; aka the
if not logger.isEnabledFor(logging.WARNING):
warnings.simplefilter("ignore")
return logger
def checkConfig(self):
if not self.interface:
raise ConfigError("You must configure an interface")
if not self.domain:
raise ConfigError("You must configure a domain")
if not self.realm:
raise ConfigError("You must configure a realm")
if not self.honey_username:
raise ConfigError("You must configure a honeytoken username")
if self.log_level.upper() not in {"CRITICAL","ERROR","WARNING","INFO","DEBUG","NOTSET"}:
raise ConfigError("Invalid setting for log level")
else:
level = logging.getLevelName(self.log_level.upper())
logging.getLogger().setLevel(level)