test_events.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:vAdvisor 作者: kubevirt 项目源码 文件源码
def _app():
    class Broker:

        def subscribe(self, subscriber):
            for idx, _ in enumerate(LIFECYCLE_EVENTS):
                subscriber.put(event(idx))
            subscriber.put(StopIteration)

        def unsubscribe(self, queue):
            queue.put(StopIteration)

    app = vadvisor.app.rest.app
    broker = Broker()
    app.eventBroker = broker
    app.eventStore = InMemoryStore()

    q = queue.Queue()
    broker.subscribe(q)
    for element in q:
        app.eventStore.put(element)

    return app
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号