test_collector.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码
def collector_client():
    """Returns a client (grpc) fixture that is passed into collector tests"""
    sys.stdout = ThreadPrinter()
    sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
    col = MockCollector("MyCollector", 99)
    col.start()
    t_end = time.time() + 5
    # wait for our collector to print its preamble
    while len(sys.stdout.lines) == 0 and time.time() < t_end:
        time.sleep(.1)
    resp = json.loads(sys.stdout.lines[0])
    client = CollectorStub(
        grpc.insecure_channel(resp["ListenAddress"]))
    yield client
    col.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号