run.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:cozmo-python-sdk 作者: anki 项目源码 文件源码
def _connect_viewer(f, conn_factory, connector, viewer):
    # Run the viewer in the main thread, with the SDK running on a new background thread.
    loop = asyncio.new_event_loop()
    abort_future = concurrent.futures.Future()

    async def view_connector(coz_conn):
        try:
            await viewer.connect(coz_conn)

            if inspect.iscoroutinefunction(f):
                await f(coz_conn)
            else:
                await coz_conn._loop.run_in_executor(None, f, base._SyncProxy(coz_conn))
        finally:
            viewer.disconnect()

    try:
        if not inspect.iscoroutinefunction(f):
            conn_factory = functools.partial(conn_factory, _sync_abort_future=abort_future)
        lt = _LoopThread(loop, f=view_connector, conn_factory=conn_factory, connector=connector)
        lt.start()
        viewer.mainloop()
    except BaseException as e:
        abort_future.set_exception(exceptions.SDKShutdown(repr(e)))
        raise
    finally:
        lt.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号