python类new_event_loop()的实例源码

manage.py 文件源码 项目:momo 作者: gusibi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run():
    asyncio.set_event_loop(uvloop.new_event_loop())
    server = app.create_server(host="0.0.0.0", port=8888)
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(server)
    signal(SIGINT, lambda s, f: loop.stop())
    try:
        loop.run_forever()
    except:
        loop.stop()
bottle.py 文件源码 项目:base1k 作者: gumblex 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:base1k 作者: gumblex 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
config.py 文件源码 项目:hammertime 作者: delvelabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def custom_event_loop():
    try:
        import uvloop
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)

        logger = logging.getLogger(__name__)
        logger.debug('Using uvloop')
    except ImportError:
        pass

    return asyncio.get_event_loop()
_testbase.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def new_loop(self):
        return uvloop.new_event_loop()
_testbase.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 111 收藏 0 点赞 0 评论 0
def new_loop(self):
        return asyncio.new_event_loop()


###############################################################################
# Socket Testing Utilities
###############################################################################
main.py 文件源码 项目:talkbot 作者: nimnull 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def init(config):
    log.debug("Loglevel set to %s", logging.getLevelName(log.getEffectiveLevel()))
    asyncio.set_event_loop(None)
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

    app = create_app(loop, config)
    ssl_context = create_ssl_context(app['config'])
    run_app(app, loop, ssl_context=ssl_context)
l018_tasks.py 文件源码 项目:coroutine-presentation 作者: joshmarshall 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_uvloop(self):
        loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)

        async def uvwait(x):
            assert isinstance(asyncio.get_event_loop(), uvloop.Loop)
            result = await wait(x)
            return result

        result = loop.run_until_complete(uvwait(1))
        self.assertEqual(1, result)
test_rpc.py 文件源码 项目:aiorpc 作者: choleraehyq 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def set_up_server():
    global loop, server
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)
    register('echo', echo)
    register('echo_delayed', echo_delayed)
    register('raise_error', raise_error)
    coro = asyncio.start_server(serve, HOST, PORT)
    server = loop.run_until_complete(coro)
benchmark_aiorpc.py 文件源码 项目:aiorpc 作者: choleraehyq 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run_sum_server():
    def sum(x, y):
        return x + y

    aiorpc.register('sum', sum)
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)
    coro = asyncio.start_server(aiorpc.serve, 'localhost', 6000, loop=loop)
    loop.run_until_complete(coro)
    loop.run_forever()
benchmark_aiorpc.py 文件源码 项目:aiorpc 作者: choleraehyq 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def call():
    async def do(cli):
        for i in range(NUM_CALLS):
            await cli.call('sum', 1, 2)
            # print('{} call'.format(i))
    client = aiorpc.RPCClient('localhost', 6000)
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)
    start = time.time()

    loop.run_until_complete(do(client))

    print('call: %d qps' % (NUM_CALLS / (time.time() - start)))
bottle.py 文件源码 项目:pickymap 作者: tcmaps 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:pickymap 作者: tcmaps 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
bottle.py 文件源码 项目:mfa_slipstream_poc 作者: decidedlygray 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:mfa_slipstream_poc 作者: decidedlygray 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
bottle.py 文件源码 项目:silvia-pi 作者: brycesub 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import asyncio
        return asyncio.new_event_loop()
bottle.py 文件源码 项目:silvia-pi 作者: brycesub 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_event_loop(self):
        import uvloop
        return uvloop.new_event_loop()
manager.py 文件源码 项目:instawow 作者: layday 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _init_loop():
    try:
        import uvloop
    except ImportError:
        return asyncio.get_event_loop()
    else:
        return uvloop.new_event_loop()
context.py 文件源码 项目:aiorchestra 作者: aiorchestra 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, name, path=None,
                 template_inputs=None,
                 logger=None,
                 event_loop=None,
                 enable_rollback=False):
        """
        Represents AIOrchestra deployment context designed to
        manage deployment through its lifecycle

        :param name: deployment context name
        :type name: str
        :param path: path to TOSCA template
        :type path: str
        :param template_inputs: TOSCA template input parameters
        :type template_inputs: dict
        :param logger: python logger instance
        :param event_loop: asyncio or any compatible event loop
        :type event_loop: asyncio.Loop
        :param enable_rollback: weather to enable rollback on failure or not
        """
        self.__name = name
        self._tmplt = tosca_template.ToscaTemplate(
            path=path, a_file=True,
            parsed_params=template_inputs)
        self.__path = path
        self.origin_nodes = self._tmplt.graph.nodetemplates
        self.vertices = self._tmplt.graph.vertices
        self.inputs_definitions = self._tmplt.inputs
        self.__outputs = self._tmplt.outputs
        self.template_inputs = template_inputs if template_inputs else {}
        self.__status = self.PENDING
        if not logger:
            self.logger = log.UnifiedLogger(
                log_to_console=True,
                level="DEBUG").setup_logger(__name__)
        else:
            self.logger = logger
        if not event_loop:
            if uvloop:
                uv_loop = uvloop.new_event_loop()
                asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
                asyncio.set_event_loop(uv_loop)
            self.event_loop = asyncio.get_event_loop()
        else:
            self.event_loop = event_loop
        self.__orchestra_nodes = [node.OrchestraNode(self, origin_node)
                                  for origin_node in self.origin_nodes]
        self.__deployment_plan = None
        self.rollback_enabled = enable_rollback
main.py 文件源码 项目:fake-useragent-cache-server 作者: pcinkh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    asyncio.set_event_loop(None)

    loop = uvloop.new_event_loop()

    handler = Handler(loop=loop)
    _root = os.path.abspath(os.path.dirname(__file__))
    handler.lookup_files(os.path.join(_root, 'data'))

    app = web.Application(loop=loop)

    setup_routes(app, handler)

    handler = app.make_handler(access_log=None)

    server = loop.create_server(
        handler,
        os.environ.get('HOST', '0.0.0.0'),
        int(os.environ.get('PORT', 5000)),
    )

    url = URL('https://fake-useragent.herokuapp.com/')

    _heartbeat = loop.create_task(heartbeat(url, 10, 60, loop=loop))

    srv = loop.run_until_complete(server)

    signal.signal(signal.SIGTERM, _sigint)

    try:
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass

        _heartbeat.cancel()

        try:
            loop.run_until_complete(_heartbeat)
        except asyncio.CancelledError:
            pass

        srv.close()

        loop.run_until_complete(srv.wait_closed())

        loop.run_until_complete(app.shutdown())

        loop.run_until_complete(handler.finish_connections(5.0))

        loop.run_until_complete(app.cleanup())

    finally:
        loop.call_soon(loop.stop)
        loop.run_forever()
        loop.close()


问题


面经


文章

微信
公众号

扫码关注公众号