def __init__(self, pool_names, max_restarts=0, options=None):
self.names = pool_names
self.queue = multiprocessing.Queue()
self.pool = dict()
self.max_restarts = max_restarts
self.options = options or dict()
self.dog_path = os.curdir
self.dog_handler = LiveReload(self)
# self.dog_observer = Observer()
# self.dog_observer.schedule(self.dog_handler, self.dog_path, recursive=True)
if multiprocessing.get_start_method() != 'fork': # pragma: no cover
root_logger = logging.getLogger()
self.log_listener = QueueListener(self.queue, *root_logger.handlers)
# TODO: Find out how to get the watchdog + livereload working on a later moment.
# self.dog_observer.start()
self._restarts = dict()
python类QueueListener()的实例源码
def logger_init(dirpath=None):
# Adapted from http://stackoverflow.com/a/34964369/164864
logging_queue = multiprocessing.Queue()
# this is the handler for all log records
filepath = "{}-{}.log".format(
'pandarus-worker', datetime.datetime.now().strftime("%d-%B-%Y-%I-%M%p")
)
if dirpath is not None:
filepath = os.path.join(dirpath, filepath)
handler = logging.FileHandler(
filepath,
encoding='utf-8',
)
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(lineno)d %(message)s"))
# queue_listener gets records from the queue and sends them to the handler
queue_listener = QueueListener(logging_queue, handler)
queue_listener.start()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
return queue_listener, logging_queue
def setup_logging_queues():
if sys.version_info.major < 3:
raise RuntimeError("This feature requires Python 3.")
queue_listeners = []
# Q: What about loggers created after this is called?
# A: if they don't attach their own handlers they should be fine
for logger in get_all_logger_names(include_root=True):
logger = logging.getLogger(logger)
if logger.handlers:
log_queue = queue.Queue(-1) # No limit on size
queue_handler = QueueHandler(log_queue)
queue_listener = QueueListener(
log_queue, respect_handler_level=True)
queuify_logger(logger, queue_handler, queue_listener)
# print("Replaced logger %s with queue listener: %s" % (
# logger, queue_listener
# ))
queue_listeners.append(queue_listener)
for listener in queue_listeners:
listener.start()
atexit.register(stop_queue_listeners, *queue_listeners)
return
def queuify_logger(logger, queue_handler, queue_listener):
"""Replace logger's handlers with a queue handler while adding existing
handlers to a queue listener.
This is useful when you want to use a default logging config but then
optionally add a logger's handlers to a queue during runtime.
Args:
logger (mixed): Logger instance or string name of logger to queue-ify
handlers.
queue_handler (QueueHandler): Instance of a ``QueueHandler``.
queue_listener (QueueListener): Instance of a ``QueueListener``.
"""
if isinstance(logger, str):
logger = logging.getLogger(logger)
# Get handlers that aren't being listened for.
handlers = [handler for handler in logger.handlers
if handler not in queue_listener.handlers]
if handlers:
# The default QueueListener stores handlers as a tuple.
queue_listener.handlers = \
tuple(list(queue_listener.handlers) + handlers)
# Remove logger's handlers and replace with single queue handler.
del logger.handlers[:]
logger.addHandler(queue_handler)
def start(self, timeout=None):
"""
uses multiprocess Process to call _wrapper_func in subprocess
"""
if self.is_alive():
log.warning("a process with pid %s is already running", self._proc.pid)
return
self._run.value = True
self._func_running.value = False
name = self.__class__.__name__
self.conn_recv, self.conn_send = mp.Pipe(False)
self._monitor_thread = threading.Thread(target = self._monitor_stdout_pipe)
self._monitor_thread.daemon=True
self._monitor_thread.start()
log.debug("started monitor thread")
self._log_queue = mp.Queue()
self._log_queue_listener = QueueListener(self._log_queue, *log.handlers)
self._log_queue_listener.start()
args = (self.func, self.args, self._run, self._pause, self.interval,
self._sigint, self._sigterm, name, log.level, self.conn_send,
self._func_running, self._log_queue)
self._proc = mp.Process(target = _loop_wrapper_func,
args = args)
self._proc.start()
log.info("started a new process with pid %s", self._proc.pid)
log.debug("wait for loop function to come up")
t0 = time.time()
while not self._func_running.value:
if self._proc.exitcode is not None:
exc = self._proc.exitcode
self._proc = None
if exc == 0:
log.warning("wrapper function already terminated with exitcode 0\nloop is not running")
return
else:
raise LoopExceptionError("the loop function return non zero exticode ({})!\n".format(exc)+
"see log (INFO level) for traceback information")
time.sleep(0.1)
if (timeout is not None) and ((time.time() - t0) > timeout):
err_msg = "could not bring up function on time (timeout: {}s)".format(timeout)
log.error(err_msg)
log.info("either it takes too long to spawn the subprocess (increase the timeout)\n"+
"or an internal error occurred before reaching the function call")
raise LoopTimeoutError(err_msg)
log.debug("loop function is up ({})".format(humanize_time(time.time()-t0)))