def test_write_points_from_dataframe(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n"
b"foo column_one=\"2\",column_two=2i,column_three=2.0 "
b"3600000000000\n"
)
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)
cli = DataFrameClient(database='db')
cli.write_points(dataframe, 'foo')
self.assertEqual(m.last_request.body, expected)
cli.write_points(dataframe, 'foo', tags=None)
self.assertEqual(m.last_request.body, expected)
评论列表
文章目录