loop.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:tasky 作者: jreese 项目源码 文件源码
def __init__(self,
                 task_list: List[Task]=None,
                 config: Config=Config,
                 stats: Stats=Stats,
                 executor: Executor=None,
                 debug: bool=False) -> None:
        '''Initialize Tasky and automatically start a list of tasks.
        One of the following methods must be called on the resulting objects
        to start the event loop: `run_forever()`, `run_until_complete()`, or
        `run_for_time()`.'''

        if uvloop:
            Log.debug('using uvloop event loop')
            asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

        self.loop = asyncio.new_event_loop()
        self.loop.add_signal_handler(signal.SIGINT, self.sigint)
        self.loop.add_signal_handler(signal.SIGTERM, self.sigterm)
        self.loop.set_exception_handler(self.exception)
        asyncio.set_event_loop(self.loop)

        if debug:
            Log.debug('enabling asyncio debug mode')
            self.loop.set_debug(True)

        self.all_tasks = {}
        self.running_tasks = set()
        self.initial_tasks = list(task_list)

        self.configuration = config
        self.stats = stats
        self.executor = executor

        self.monitor = False
        self.terminate_on_finish = False
        self.stop_attempts = 0
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号