def enable_pretty_logging(logger, level='info', queue=None):
"""Turns on formatted logging output as configured.
"""
logger.setLevel(getattr(logging, level.upper()))
if not logger.handlers:
# Set up color if we are in a tty and curses is installed
color = False
if curses and sys.stderr.isatty():
try:
curses.setupterm()
if curses.tigetnum("colors") > 0:
color = True
except:
pass
channel = logging.StreamHandler()
channel.setFormatter(_LogFormatter(color=color))
logger.addHandler(channel)
if queue:
queue_handler = QueueHandler(queue)
queue_handler.setFormatter(_LogFormatter(color=color))
logger.addHandler(queue_handler)
python类QueueHandler()的实例源码
def _setup_task_process(mp_log_q):
# Setting up logging and cfg, needed since this is a new process
cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
utils.setup_logging()
# Log events need to be handled in the parent process
log_root = logging.getLogger(None).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
log_root.addHandler(handlers.QueueHandler(mp_log_q))
def start(self, ):
''' starts logger for multiprocessing using queue.
logdir: if provided, error and debug logs will be created in it.
logging_level: logging level from which to report
logger_format: formating per logging level
'''
# create console handler and set level to info
#if MpLogger.logger_initialized:
if self.logger_initialized:
return
self.logger_initialized=True
logger = logging.getLogger(name=self.logging_root)
logger.setLevel(self.logging_level)
manager=mp.Manager()
q=manager.Queue()
queue_handler = QueueHandler(q)
logger.addHandler(queue_handler)
self.queue_listener = MpQueueListener(q, name=self.name, logging_level=self.logging_level, logdir=self.logdir, formatter=self.record_formatter, process_key=self.process_key, force_global=self.force_global)
if len(self.handlers) == 0:
if self.console:
handlers=create_stream_handler(logging_level=self.logging_level, level_formats=self.level_formats, datefmt=self.datefmt)
for handler in handlers:
self.queue_listener.addConsoleHandler(handler)
if self.logdir and self.force_global:
self.add_file_handlers(process_key=self.name)
else: # len(self.handlers) > 0:
for handler in self.handlers:
self.queue_listener.addHandler(handler)
self.queue_listener.start()
def worker_init(logging_queue):
# Needed to pass logging messages from child processes to a queue
# handler which in turn passes them onto queue listener
queue_handler = QueueHandler(logging_queue)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(queue_handler)
def setup_logging_queues():
if sys.version_info.major < 3:
raise RuntimeError("This feature requires Python 3.")
queue_listeners = []
# Q: What about loggers created after this is called?
# A: if they don't attach their own handlers they should be fine
for logger in get_all_logger_names(include_root=True):
logger = logging.getLogger(logger)
if logger.handlers:
log_queue = queue.Queue(-1) # No limit on size
queue_handler = QueueHandler(log_queue)
queue_listener = QueueListener(
log_queue, respect_handler_level=True)
queuify_logger(logger, queue_handler, queue_listener)
# print("Replaced logger %s with queue listener: %s" % (
# logger, queue_listener
# ))
queue_listeners.append(queue_listener)
for listener in queue_listeners:
listener.start()
atexit.register(stop_queue_listeners, *queue_listeners)
return
def queuify_logger(logger, queue_handler, queue_listener):
"""Replace logger's handlers with a queue handler while adding existing
handlers to a queue listener.
This is useful when you want to use a default logging config but then
optionally add a logger's handlers to a queue during runtime.
Args:
logger (mixed): Logger instance or string name of logger to queue-ify
handlers.
queue_handler (QueueHandler): Instance of a ``QueueHandler``.
queue_listener (QueueListener): Instance of a ``QueueListener``.
"""
if isinstance(logger, str):
logger = logging.getLogger(logger)
# Get handlers that aren't being listened for.
handlers = [handler for handler in logger.handlers
if handler not in queue_listener.handlers]
if handlers:
# The default QueueListener stores handlers as a tuple.
queue_listener.handlers = \
tuple(list(queue_listener.handlers) + handlers)
# Remove logger's handlers and replace with single queue handler.
del logger.handlers[:]
logger.addHandler(queue_handler)
def _loop_wrapper_func(func, args, shared_mem_run, shared_mem_pause, interval, sigint, sigterm, name,
logging_level, conn_send, func_running, log_queue):
"""
to be executed as a separate process (that's why this functions is declared static)
"""
prefix = get_identifier(name) + ' '
global log
log = logging.getLogger(__name__+".log_{}".format(get_identifier(name, bold=False)))
log.setLevel(logging_level)
log.addHandler(QueueHandler(log_queue))
sys.stdout = StdoutPipe(conn_send)
log.debug("enter wrapper_func")
SIG_handler_Loop(sigint, sigterm, log, prefix)
func_running.value = True
error = False
while shared_mem_run.value:
try:
# in pause mode, simply sleep
if shared_mem_pause.value:
quit_loop = False
else:
# if not pause mode -> call func and see what happens
try:
quit_loop = func(*args)
except LoopInterruptError:
raise
except Exception as e:
log.error("error %s occurred in loop calling 'func(*args)'", type(e))
log.info("show traceback.print_exc()\n%s", traceback.format_exc())
error = True
break
if quit_loop is True:
log.debug("loop stooped because func returned True")
break
time.sleep(interval)
except LoopInterruptError:
log.debug("quit wrapper_func due to InterruptedError")
break
func_running.value = False
if error:
sys.exit(-1)
else:
log.debug("wrapper_func terminates gracefully")
# gets rid of the following warnings
# Exception ignored in: <_io.FileIO name='/dev/null' mode='rb'>
# ResourceWarning: unclosed file <_io.TextIOWrapper name='/dev/null' mode='r' encoding='UTF-8'>
try:
if mp.get_start_method() == "spawn":
sys.stdin.close()
except AttributeError:
pass