def __init__(self,
task_list: List[Task]=None,
config: Config=Config,
stats: Stats=Stats,
executor: Executor=None,
debug: bool=False) -> None:
'''Initialize Tasky and automatically start a list of tasks.
One of the following methods must be called on the resulting objects
to start the event loop: `run_forever()`, `run_until_complete()`, or
`run_for_time()`.'''
if uvloop:
Log.debug('using uvloop event loop')
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
self.loop = asyncio.new_event_loop()
self.loop.add_signal_handler(signal.SIGINT, self.sigint)
self.loop.add_signal_handler(signal.SIGTERM, self.sigterm)
self.loop.set_exception_handler(self.exception)
asyncio.set_event_loop(self.loop)
if debug:
Log.debug('enabling asyncio debug mode')
self.loop.set_debug(True)
self.all_tasks = {}
self.running_tasks = set()
self.initial_tasks = list(task_list)
self.configuration = config
self.stats = stats
self.executor = executor
self.monitor = False
self.terminate_on_finish = False
self.stop_attempts = 0
评论列表
文章目录