core.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:phat 作者: danielfranca 项目源码 文件源码
def run(self):
        PluginHelpers.run_method_for_each('on_start', self.global_options, self.run_log)
        max_workers = 1 if self.global_options.get('tests_sequential', False) else None
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        futures = []

        for set_item in self.settings:
            if 'url' in set_item:
                url = set_item['url']
                futures.append(executor.submit(self.run_tests, url, set_item, self.global_options))
            elif 'wait' in set_item:
                now = time.time()
                executor.shutdown(True)  # wait for all current jobs to stop
                then = time.time()
                wait_more = set_item['wait'] - (then - now)
                if wait_more > 0:
                    time.sleep(wait_more)
                executor = concurrent.futures.ThreadPoolExecutor()  # replace the executor
        executor.shutdown(True)  # wait for all current jobs to stop

        for future in futures:
            self.run_log.append(future.result())

        PluginHelpers.run_method_for_each('on_end', self.global_options, self.run_log)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号