def test_stream_max_metrics_buffer_with_max_collect_duration():
sys.stdout = ThreadPrinter()
sys.argv = ["", '{"LogLevel": 1, "PingTimeoutDuration": 5000}']
col = MockStreamCollector("MyStreamCollector", 99)
col.start()
t_end = time.time() + 5
# wait for our collector to print its preamble
while len(sys.stdout.lines) == 0 and time.time() < t_end:
time.sleep(.1)
resp = json.loads(sys.stdout.lines[0])
client = StreamCollectorStub(
grpc.insecure_channel(resp["ListenAddress"]))
metric = snap.Metric(
namespace=[snap.NamespaceElement(value="intel"),
snap.NamespaceElement(value="streaming"),
snap.NamespaceElement(value="random"),
snap.NamespaceElement(value="int")],
version=1,
config={
"stream_delay": 3,
"max-collect-duration": 4,
"max-metrics-buffer": 3
},
unit="some unit",
description="some description")
col_arg = CollectArg(metric).pb
mtr = iter([col_arg])
metrics = client.StreamMetrics(mtr)
start_waiting_for_new_metric = time.time()
a = next(metrics)
retrieve_metric_time = time.time()
assert round(retrieve_metric_time - start_waiting_for_new_metric) == 4
assert len(a.Metrics_Reply.metrics) == 1
start_waiting_for_new_metric = time.time()
a = next(metrics)
retrieve_metric_time = time.time()
assert round(retrieve_metric_time - start_waiting_for_new_metric) == 4
assert len(a.Metrics_Reply.metrics) == 1
col.stop()
test_stream_collector.py 文件源码
python
阅读 27
收藏 0
点赞 0
评论 0
评论列表
文章目录