def test_write_points_udp(self):
cli = InfluxDBClient(
'localhost',
self.influxd_inst.http_port,
'root',
'',
database='db',
use_udp=True,
udp_port=self.influxd_inst.udp_port
)
cli.write_points(dummy_point)
# The points are not immediately available after write_points.
# This is to be expected because we are using udp (no response !).
# So we have to wait some time,
time.sleep(3) # 3 sec seems to be a good choice.
rsp = self.cli.query('SELECT * FROM cpu_load_short')
self.assertEqual(
# this is dummy_points :
[
{'value': 0.64,
'time': '2009-11-10T23:00:00Z',
"host": "server01",
"region": "us-west"}
],
list(rsp['cpu_load_short'])
)
评论列表
文章目录