test_stream_collector.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:snap-plugin-lib-py 作者: intelsdi-x 项目源码 文件源码
def test_get_config_policy():
    sys.stdout = ThreadPrinter()
    sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
    col = MockStreamCollector("MyStreamCollector", 99)
    col.start()
    t_end = time.time() + 5
    # wait for our collector to print its preamble
    while len(sys.stdout.lines) == 0 and time.time() < t_end:
        time.sleep(.1)
    resp = json.loads(sys.stdout.lines[0])
    client = StreamCollectorStub(
        grpc.insecure_channel(resp["ListenAddress"]))
    reply = client.GetConfigPolicy(Empty())
    assert reply.error == ""
    assert reply.string_policy["intel.streaming.random"].rules["password"].default == "pass"
    col.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号