def __init__(self, config=None):
if config is None:
config = {}
self.config = config
self.must_stop = threading.Event()
self._consumers_queues = []
if self.config.get("concurrency", 1) > 1:
self._thread_pool = ThreadPoolExecutor(
max_workers=self.config.get("concurrency")
)
else:
self._thread_pool = None
self.import_submodules(__name__ + '.plugins.ext')
self.import_submodules(__name__ + '.consumers.ext')
for extra_plugin_path in self.config.get('extra_plugins', []):
self.import_directory_modules(extra_plugin_path)
self._current_checks = []
self._current_checks_lock = threading.Lock()
评论列表
文章目录