python类new_event_loop()的实例源码

run.py 文件源码 项目:falsy 作者: pingf 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(coro, loop=None):
    async def main_task():
        pycurl_task = aio.ensure_future(curl_loop())
        try:
            r = await coro
        finally:
            pycurl_task.cancel()
            with suppress(aio.CancelledError):
                await pycurl_task
        return r, pycurl_task

    if loop is None:
        loop = uvloop.new_event_loop()
        # loop = aio.get_event_loop()
    aio.set_event_loop(loop)
    loop.set_exception_handler(exception_handler)
    r, _ = loop.run_until_complete(main_task())
    return r
plugin.py 文件源码 项目:pytest-sanic 作者: yunstanford 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pytest_configure(config):
    global LOOP_INIT
    loop_name = config.getoption('--loop')
    factory = {
        "aioloop": asyncio.new_event_loop,
    }
    if uvloop is not None:
        factory["uvloop"] = uvloop.new_event_loop

    if loop_name:
        if loop_name not in factory:
            raise ValueError(
                "{name} is not valid option".format(name=loop_name)
            )
        LOOP_INIT = factory[loop_name]
    else:
        LOOP_INIT = factory["aioloop"]
conftest.py 文件源码 项目:aiomonitor 作者: aio-libs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def loop(request, loop_type):
    old_loop = asyncio.get_event_loop()
    asyncio.set_event_loop(None)
    if loop_type == 'uvloop':
        loop = uvloop.new_event_loop()
    elif loop_type == 'tokio':
        import tokio
        policy = tokio.TokioLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        loop = tokio.new_event_loop()
    else:
        loop = asyncio.new_event_loop()

    yield loop

    loop.close()
    asyncio.set_event_loop(old_loop)
    gc.collect()
loop.py 文件源码 项目:windflow 作者: hartym 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_event_loop(debug=False):
    if not debug:
        import uvloop
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        asyncio.set_event_loop(uvloop.new_event_loop())
    return asyncio.get_event_loop()
mixins.py 文件源码 项目:windflow 作者: hartym 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_event_loop(self, debug=False):
        if not debug:
            asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
            asyncio.set_event_loop(uvloop.new_event_loop())
        AsyncIOMainLoop().install()
        return asyncio.get_event_loop()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
retrieval.py 文件源码 项目:hdx-data-freshness 作者: OCHA-DAP 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def retrieve(urls):
    start_time = time.time()
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)
    future = asyncio.ensure_future(check_urls(urls, loop))
    results = loop.run_until_complete(future)
    logger.info('Execution time: %s seconds' % (time.time() - start_time))
    return results
conftest.py 文件源码 项目:swaggerit 作者: dutradda 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def loop():
    loop = uvloop.new_event_loop()
    # loop = asyncio.new_event_loop() # just for debugging
    asyncio.set_event_loop(loop)
    return loop
conftest.py 文件源码 项目:aiotask-context 作者: Skyscanner 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def asyncio_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()
conftest.py 文件源码 项目:aiotask-context 作者: Skyscanner 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def uvloop_loop():
    loop = uvloop.new_event_loop()
    yield loop
    loop.close()
bottle.py 文件源码 项目:bottle_beginner 作者: denzow 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:bottle_beginner 作者: denzow 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
bottle.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
restfm_test_case.py 文件源码 项目:py-restfmclient 作者: pcdummy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self.maxDiff = None
        self.loop = uvloop.new_event_loop()
        asyncio.set_event_loop(None)

        self.app = self.get_app(self.loop)
        self.client = RESTfmTestClient(self.app)
        self.loop.run_until_complete(self.client.start_server())
conftest.py 文件源码 项目:myreco 作者: dutradda 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def loop():
    loop = uvloop.new_event_loop()
    # loop = asyncio.new_event_loop() # just for debugging
    asyncio.set_event_loop(loop)
    return loop
server.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def aiohttp_tarantool_multi(port=None, uvloop_enable=False):
    """TBD."""
    if uvloop_enable:
        logging.info("start aiohttp_tarantool_multi_uvloop")
        if sys.platform == 'win32':
            logging.error("D'oh! UVLoop is not support Windows!")
            sys.exit()
        else:
            import uvloop
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        logging.info("start aiohttp_tarantool_multi")
        if sys.platform == 'win32':
            loop = asyncio.ProactorEventLoop()
        else:
            loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)

    from data_interface.tarantool_driver import TarantoolDriver
    from aiohttp_server.aiohttpmulticonnect import AiohttpTarantoolMulti
    db = TarantoolDriver
    web_server = AiohttpTarantoolMulti(loop=loop, port=port, db=db)
    loop.create_task(web_server.start())
    loop.create_task(web_server.listner_for_statistics())
    return loop
server.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def aiohttp_tarantool_one(port=None, uvloop_enable=False):
    """TBD."""
    if uvloop_enable:
        logging.info("start aiohttp_tarantool_one_uvloop")
        if sys.platform == 'win32':
            logging.error("D'oh! UVLoop is not support Windows!")
            sys.exit()
        else:
            import uvloop
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        logging.info("start aiohttp_tarantool_one")
        if sys.platform == 'win32':
            loop = asyncio.ProactorEventLoop()
        else:
            loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)

    from data_interface.tarantool_driver import TarantoolDriver
    from aiohttp_server.aiohttponeconnect import AiohttpUniversalOne
    db = TarantoolDriver()
    web_server = AiohttpUniversalOne(loop=loop, port=port, db=db)
    loop.create_task(web_server.start())
    loop.create_task(web_server.listner_for_statistics())
    return loop
server.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def aiohttp_postgres_pool(port=None, uvloop_enable=False):
    """TBD."""
    if uvloop_enable:
        logging.info("start aiohttp_postgres_pool_uvloop")
        if sys.platform == 'win32':
            logging.error("D'oh! UVLoop is not support Windows!")
            sys.exit()
        else:
            import uvloop
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        logging.info("start aiohttp_postgres_pool")
        if sys.platform == 'win32':
            loop = asyncio.ProactorEventLoop()
        else:
            loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)
    from data_interface.postgres_driver import PostgresDriver
    from aiohttp_server.aiohttponeconnect import AiohttpUniversalOne
    pd = PostgresDriver(loop)
    db = loop.run_until_complete(pd.open())
    web_server = AiohttpUniversalOne(loop=loop, port=port, db=db)
    loop.create_task(web_server.start())
    loop.create_task(web_server.listner_for_statistics())
    return loop
server.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def sanic_tarantool_one(port=None, uvloop_enable=False):
    """TBD."""
    if uvloop_enable:
        logging.info("start sanic_tarantool_one_uvloop")
        if sys.platform == 'win32':
            logging.error("D'oh! UVLoop is not support Windows!")
            sys.exit()
        else:
            import uvloop
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        logging.info("start sanic_tarantool_one")
        if sys.platform == 'win32':
            loop = asyncio.ProactorEventLoop()
        else:
            loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)

    from sanic_server.saniconeconnect import SanicTarantoolOne
    from data_interface.tarantool_driver import TarantoolDriver
    db = TarantoolDriver()
    web_server = SanicTarantoolOne(loop=loop, port=port, db=db)
    loop.create_task(web_server.start())
    loop.create_task(web_server.listner_for_statistics())
    return loop
server.py 文件源码 项目:python-tarantool-benchmark-and-bootstrap 作者: valentinmk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def japronto_tarantool_one(port=None, uvloop_enable=False):
    """TBD."""
    if uvloop_enable:
        logging.info("start japronto_tarantool_one_uvloop")
        if sys.platform == 'win32':
            logging.error("D'oh! UVLoop is not support Windows!")
            sys.exit()
        else:
            import uvloop
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        logging.info("start japronto_tarantool_one")
        if sys.platform == 'win32':
            loop = asyncio.ProactorEventLoop()
        else:
            loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)

    from japronto_server import janprontooneconnect
    from data_interface.tarantool_driver import TarantoolDriver
    db = TarantoolDriver()
    janprontooneconnect.init(loop_param=loop, port_param=port, db_driver=db)
    loop.create_task(janprontooneconnect.listner_for_statistics())
    janprontooneconnect.start()
    # Return false for compatibility japronto implementation with other
    # servers
    # Japronto hard coded to create own loop and run_forever it :(
    return False  # It will raise Exeptions
base.py 文件源码 项目:aiorchestra 作者: aiorchestra 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
        self.event_loop = asyncio.get_event_loop()
        super(BaseAIOrchestraTestCase, self).setUp()
amazon.py 文件源码 项目:QProb 作者: quant-trade 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)

        parse_by_categories(loop=loop)
        #TODO keywords should be fixed, as it has hard coded kws
        parse_by_keywords(loop=loop)

        loop.close()

        self.stdout.write(self.style.SUCCESS('Successfully done jobs'))
twitter.py 文件源码 项目:QProb 作者: quant-trade 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)

        tweets_to_db(loop=loop)
        tweets_by_tag_to_db(loop=loop)
        clean_tweet_hashtags(loop=loop)

        loop.close()

        self.stdout.write(self.style.SUCCESS('Successfully done parsing jobs'))
recheck_content.py 文件源码 项目:QProb 作者: quant-trade 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)

        content_if_empty_all(loop=loop)
        update_db_with_cleaned_content(loop=loop)
        empty_sources(loop=loop)

        loop.close()

        self.stdout.write(self.style.SUCCESS('Successfully done parsing jobs'))
youtube.py 文件源码 项目:QProb 作者: quant-trade 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)

        do_youtube(loop=loop)
        clean_youtube_text(loop=loop)

        loop.close()

        self.stdout.write(self.style.SUCCESS('Successfully done Youtube jobs'))
manager.py 文件源码 项目:Metis 作者: gusibi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run():
    asyncio.set_event_loop(uvloop.new_event_loop())
    server = app.create_server(host="0.0.0.0", port=7777)
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(server)
    signal(SIGINT, lambda s, f: loop.stop())
    try:
        loop.run_forever()
    except:
        loop.stop()
bottle.py 文件源码 项目:Knjiznica 作者: TilenNoc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:Knjiznica 作者: TilenNoc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()


问题


面经


文章

微信
公众号

扫码关注公众号