def set_up_logging(debug):
if debug:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO
logging.basicConfig(format='%(asctime)s ~ %(levelname)-10s %(name)-25s %(message)s',
datefmt='%Y-%m-%d %H:%M',
level=logging_level)
logging.getLogger('telegram').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('JobQueue').setLevel(logging.WARNING)
logging.addLevelName(logging.DEBUG, '?? DEBUG')
logging.addLevelName(logging.INFO, '?? INFO')
logging.addLevelName(logging.WARNING, '?? WARNING')
logging.addLevelName(logging.ERROR, '?? ERROR')
python类addLevelName()的实例源码
def prepare_logging():
global umap2_logger
global stdio_handler
if umap2_logger is None:
def add_debug_level(num, name):
def fn(self, message, *args, **kwargs):
if self.isEnabledFor(num):
self._log(num, message, args, **kwargs)
logging.addLevelName(num, name)
setattr(logging, name, num)
return fn
logging.Logger.verbose = add_debug_level(5, 'VERBOSE')
logging.Logger.always = add_debug_level(100, 'ALWAYS')
FORMAT = '[%(levelname)-6s] %(message)s'
stdio_handler = logging.StreamHandler()
stdio_handler.setLevel(logging.INFO)
formatter = logging.Formatter(FORMAT)
stdio_handler.setFormatter(formatter)
umap2_logger = logging.getLogger('umap2')
umap2_logger.addHandler(stdio_handler)
umap2_logger.setLevel(logging.VERBOSE)
return umap2_logger
def setup(debug=False, statsd_host=None):
level = 'DEBUG' if debug else 'INFO'
dictConfig(dict(
version=1,
disable_existing_loggers=True,
loggers={
'': {
'level': level,
'handlers': ['console']
},
},
handlers={
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'stream': sys.stdout
},
},
formatters={
'simple': {
'format': '[%(process)d] [%(levelname)s] %(message)s',
'class': 'logging.Formatter'
},
}
))
OdookuLogger._statsd_host = statsd_host
logging.setLoggerClass(OdookuLogger)
logging.addLevelName(25, 'INFO')
# Prevent odoo from overriding log config
import openerp.netsvc
openerp.netsvc._logger_init = True
def configure_logging(verbosity_loglevel):
llvl = LogLevel.getbyverbosity(verbosity_loglevel)
logging.TRACE = logging.DEBUG - 1
logging.addLevelName(logging.TRACE, "TRACE")
logging_loglevel = {
LogLevel.Silent: logging.WARNING,
LogLevel.Normal: logging.INFO,
LogLevel.Verbose: logging.DEBUG,
LogLevel.Debug: logging.TRACE,
}[llvl]
def __log_trace(self, message, *args, **kwargs):
if self.isEnabledFor(logging.TRACE):
self._log(logging.TRACE, message, args, **kwargs)
logging.Logger.trace = __log_trace
logging.basicConfig(format = " {name:>20s} [{levelname:.1s}]: {message}", style = "{", level = logging_loglevel)
def get_logger(name, level=INFO, fac=SysLogHandler.LOG_LOCAL1):
global LOG_TRANS
for lt in LOG_TRANS:
if not LOG_TRANS[lt]['old']:
LOG_TRANS[lt]['old'] = logging.getLevelName(lt)
logging.addLevelName(lt, LOG_TRANS[lt]['new'])
fmt = F('[%(name)s.%(funcName)s]: %(message)s')
log = logging.getLogger('%s' % name.split('.')[-1])
h = SysLogHandler(address='/dev/log', facility=parse_fac(fac))
h.setFormatter(fmt)
log.addHandler(h)
# h = StreamHandler(stream=LOGBUF)
# h.setFormatter(fmt)
# log.addHandler(h)
log.setLevel(level)
log.success = lambda msg: log.log(LOG_SUCCES, msg)
return log
def _configure_logging(level=logging.INFO, use_color=True):
"""
Configure te global logging settings.
"""
# Add a 'SUCCESS' level to the logger
logging.addLevelName(log.SUCCESS, 'SUCCESS')
logging.Logger.success = log.success
# Configure colored logging
logger = logging.getLogger()
logger.setLevel(level)
# Overwrite any existing handlers
if logger.handlers:
logger.handlers = []
colored_handler = logging.StreamHandler()
colored_handler.setFormatter(ColoredFormatter(use_color=use_color))
logger.addHandler(colored_handler)
def make_logging_level_names_consistent():
"""Rename the standard library's logging levels to match Twisted's.
Twisted's new logging system in `twisted.logger` that is.
"""
for level in list(logging._levelToName):
if level == logging.NOTSET:
# When the logging level is not known in Twisted it's rendered as
# a hyphen. This is not a common occurrence with `logging` but we
# cater for it anyway.
name = "-"
elif level == logging.WARNING:
# "Warning" is more consistent with the other level names than
# "warn", so there is a fault in Twisted here. However it's easier
# to change the `logging` module to match Twisted than vice-versa.
name = "warn"
else:
# Twisted's level names are all lower-case.
name = logging.getLevelName(level).lower()
# For a preexisting level this will _replace_ the name.
logging.addLevelName(level, name)
def setlogdir(logdir):
'''set the log directory'''
# set log color
logging.addLevelName(logging.INFO, print_style('%s', fore='green') % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, print_style('%s', fore='red') % logging.getLevelName(logging.WARNING))
ldir = os.path.dirname(logdir)
writelog = os.path.join(ldir, 'log.log')
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=writelog,
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
def get_logger():
'''
Returns logger used by multiprocessing
'''
global _logger
import logging, atexit
logging._acquireLock()
try:
if not _logger:
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function)
atexit.register(_exit_function)
else:
atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {}))
finally:
logging._releaseLock()
return _logger
def inject_verbose_info(self):
logging.VERBOSE = 15
logging.verbose = lambda x: logging.log(logging.VERBOSE, x)
logging.addLevelName(logging.VERBOSE, "VERBOSE")
def setup_logging(level=PLAIN_LOG_LEVEL, output=sys.stdout):
"""create the setup.py singleton logger."""
global logger
if logger:
return logger
logger = logging.getLogger('setup.py')
formatter = ScreenFormatter(plain_log_level=PLAIN_LOG_LEVEL)
stream_h = logging.StreamHandler(output)
stream_h.setFormatter(formatter)
# Logging timezone is UTC
stream_h.converter = time.gmtime
logger.setLevel(level)
logger.addHandler(stream_h)
logging.addLevelName(PLAIN_LOG_LEVEL, "PLAIN")
logging.Logger.plain = plain
logging.addLevelName(TO_FILE_LOG_LEVEL, 'TO_FILE')
logging.Logger.to_file = to_file
return logger
def __init__(self, capture_warnings=True, use_default_kvp=True, json=False):
self._config = deepcopy(DEFAULT_LOGGING_CONF)
if use_default_kvp:
self.update_default_formatter(DEFAULT_KVP_FORMAT)
if json:
self.enable_json_formatter()
# Custom level to suppress handlers
logging.addLevelName('DISABLED', LEVEL_MAP['DISABLED'])
logging.captureWarnings(capture_warnings)
def get_logger():
'''
Returns logger used by multiprocessing
'''
global _logger
import logging, atexit
logging._acquireLock()
try:
if not _logger:
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function)
atexit.register(_exit_function)
else:
atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {}))
finally:
logging._releaseLock()
return _logger
def init_logging(prefix="", suffix=""):
"""
Initializes logging, sets custom logging format and adds one
logging level with name and method to call.
prefix and suffix arguments can be used to modify log level prefixes.
"""
logging.basicConfig(format=LOG_FORMAT)
logger = logging.getLogger()
# Rename levels
logging.addLevelName(10, prefix + "D" + suffix) # Debug
logging.addLevelName(20, prefix + "I" + suffix) # Info
logging.addLevelName(30, prefix + "W" + suffix) # Warning
logging.addLevelName(40, prefix + "E" + suffix) # Error
# Create additional, "verbose" level
logging.addLevelName(15, prefix + "V" + suffix) # Verbose
# Add 'logging.verbose' method
def verbose(self, msg, *args, **kwargs):
return self.log(15, msg, *args, **kwargs)
logging.Logger.verbose = verbose
# Wrap Logger._log in something that can handle utf-8 exceptions
old_log = logging.Logger._log
def _log(self, level, msg, args, exc_info=None, extra=None):
args = tuple([
(str(c).decode("utf-8") if type(c) is str else c)
for c in args
])
msg = msg if type(msg) is unicode else str(msg).decode("utf-8")
old_log(self, level, msg, args, exc_info, extra)
logging.Logger._log = _log
def __setattr__(self, name, value):
if name == 'name' and 'name' in self.__dict__:
logging.addLevelName(int(self), value)
super(NamedLevel, self).__setattr__(name, value)
def _create_logger(self, logger=None):
logging.addLevelName(LOGGING_TRACE_LVL, 'TRACE')
logger = logger or logging.getLogger(inflection.underscore(type(self).__name__))
setattr(logger, 'trace', lambda *args: logger.log(LOGGING_TRACE_LVL, *args))
return logger
def logger_init(level = logging.INFO):
logging.basicConfig(format = '%(name)s: %(levelname)s: %(message)s', level = level)
logging.addLevelName(logging.VERBOSE, 'VERBOSE')
def __init__(self, name, level=logging.NOTSET):
super(Logger, self).__init__(name, level)
logging.addLevelName(OUTPUT, 'OUTPUT')
logging.addLevelName(SUCCESS, 'SUCCESS')
def main(project, env, action, verbose, format, template_path):
if verbose >= 2:
level = 5
elif verbose == 1:
level = logging.DEBUG
else:
logging.getLogger('googleapiclient').setLevel(logging.ERROR)
logging.getLogger('oauth2client').setLevel(logging.ERROR)
level = logging.INFO
logging.addLevelName(5, "TRACE")
logging.basicConfig(format='%(asctime)s %(levelname)s:%(name)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S', level=level)
logger.debug('Debug log enabled')
logger.info("Log level: {}".format(level))
if action in ['apply', 'template']:
if template_path is None:
logging.error('A path to a template file is required for {}'.format(action))
sys.exit(1)
template_class = load_template_module(template_path)
template = template_class(project, env)
if format == "json":
template.formatter = template.asJSON
if action == 'apply':
template.__repr__()
apply_deployment(project, template)
elif action == 'template':
t = template.__repr__()
logger.info('Template successfully rendered, printing to stdout...')
print(t)
sys.exit(0)
def configure_logging(log_file):
"""Configure root logger"""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Level name colored differently (both console and file)
logging.addLevelName(logging.WARNING, '\x1b[0;33m%s\x1b[0m' %
logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, '\x1b[0;31m%s\x1b[0m' %
logging.getLevelName(logging.ERROR))
# Configure console logging
console_log_handler = logging.StreamHandler()
console_log_handler.setLevel(logging.INFO)
# All console messages are the same color (except with colored level names)
console_formatter = logging.Formatter('\x1b[0;32m%(levelname)s'
'\t%(message)s\x1b[0m')
console_log_handler.setFormatter(console_formatter)
logger.addHandler(console_log_handler)
# Configure log file
if os.path.isfile(log_file):
os.remove(log_file)
file_log_handler = logging.FileHandler(log_file)
file_log_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(process)s %(asctime)s.%(msecs)03d'
' %(name)s %(levelname)s %(message)s',
datefmt="%H:%M:%S")
file_log_handler.setFormatter(file_formatter)
logger.addHandler(file_log_handler)
logger.debug("Root logger configured.")
# TODO(dbite): Remove CamelCasing.
# -----------------------------------------------------------------------------
# Custom data-types.
# -----------------------------------------------------------------------------
def setup_logging():
logging.addLevelName(logging.DEBUG, "\033[1;36m%s\033[1;0m" % logging.getLevelName(logging.DEBUG))
logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO))
logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, "\033[1;31m%s\033[1;0m" % logging.getLevelName(logging.ERROR))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s '
'[in %(pathname)s:%(lineno)d]'
))
app.logger.addHandler(handler)
app.logger.setLevel(logging.DEBUG)
arbitrage.py 文件源码
项目:Bitcoin-arbitrage---opportunity-detector
作者: yoshi717
项目源码
文件源码
阅读 39
收藏 0
点赞 0
评论 0
def inject_verbose_info(self):
logging.VERBOSE = 15
logging.verbose = lambda x: logging.log(logging.VERBOSE, x)
logging.addLevelName(logging.VERBOSE, "VERBOSE")
def configure():
"""configure logging levels"""
logging.addLevelName(DEBUG1, 'DEBUG1')
logging.addLevelName(DEBUG2, 'DEBUG2')
logging.addLevelName(DEBUG3, 'DEBUG3')
logging.addLevelName(DEBUG4, 'DEBUG4')
logging.addLevelName(DEBUG5, 'DEBUG5')
def get_logger():
'''
Returns logger used by multiprocessing
'''
global _logger
import logging, atexit
logging._acquireLock()
try:
if not _logger:
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function)
atexit.register(_exit_function)
else:
atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {}))
finally:
logging._releaseLock()
return _logger
def setUp(self):
BaseTest.setUp(self)
for k, v in my_logging_levels.items():
logging.addLevelName(k, v)
def instantiate( cls, streamType = "SCREEN", logLevel = "INFO" ):
try:
logging.VERBOSE = 5
logging.addLevelName(logging.VERBOSE, "VERBOSE")
logging.Logger.verbose = lambda inst, msg, *args, **kwargs: inst.log(logging.VERBOSE, msg, *args, **kwargs)
logging.verbose = lambda msg, *args, **kwargs: logging.log(logging.VERBOSE, msg, *args, **kwargs)
cls.logger = logging.getLogger()
if logLevel not in logging._levelNames:
raise Exception( 'Invalid file level' )
cls.logger.setLevel( logging._levelNames[logLevel] )
streamType = app.config['STREAMTYPE']
if streamType == "SCREEN":
stream = logging.StreamHandler()
else:
stream = logging.FileHandler( app.config['LOGFILE'] )
formatter = logging.Formatter( '[%(levelname)-7s - %(asctime)s] %(message)s' )
stream.setFormatter( formatter )
cls.logger.addHandler( stream )
except Exception, e:
print( 'Unable to get/set log configurations. Error: %s'%( e ) )
cls.logger = None
##
# Records a message in a file and/or displays it in the screen.
# @param level - String containing the name of the log message.
# @param message - String containing the message to be recorded.
#
def setup(level = 0):
"""Sets up the global logging environment"""
formatstr = "%(levelname)-8s: %(name)-20s: %(message)s"
logging.basicConfig(format = formatstr)
rootlogger = logging.getLogger('')
rootlogger.setLevel(logging.DEBUG + 1 - level)
for i in range(1, 9):
logging.addLevelName(logging.DEBUG - i, "DEBUG" + str(i))
def get_logger():
'''
Returns logger used by multiprocessing
'''
global _logger
import logging, atexit
logging._acquireLock()
try:
if not _logger:
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function)
atexit.register(_exit_function)
else:
atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {}))
finally:
logging._releaseLock()
return _logger
def setUp(self):
BaseTest.setUp(self)
for k, v in my_logging_levels.items():
logging.addLevelName(k, v)
def get_logger():
'''
Returns logger used by multiprocessing
'''
global _logger
import logging, atexit
logging._acquireLock()
try:
if not _logger:
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function)
atexit.register(_exit_function)
else:
atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {}))
finally:
logging._releaseLock()
return _logger