def test_get_config_policy():
sys.stdout = ThreadPrinter()
sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
col = MockStreamCollector("MyStreamCollector", 99)
col.start()
t_end = time.time() + 5
# wait for our collector to print its preamble
while len(sys.stdout.lines) == 0 and time.time() < t_end:
time.sleep(.1)
resp = json.loads(sys.stdout.lines[0])
client = StreamCollectorStub(
grpc.insecure_channel(resp["ListenAddress"]))
reply = client.GetConfigPolicy(Empty())
assert reply.error == ""
assert reply.string_policy["intel.streaming.random"].rules["password"].default == "pass"
col.stop()
test_stream_collector.py 文件源码
python
阅读 24
收藏 0
点赞 0
评论 0
评论列表
文章目录