def test_write_points_from_dataframe_with_tag_escaped(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(
data=[['blue', 1, "1", 1, 1.0, 'hot'],
['red,green=orange', 0, "2", 2, 2.0, 'cold']],
index=[now, now + timedelta(hours=1)],
columns=["tag_one", "tag_two", "column_one",
"column_two", "column_three",
"tag_three"])
expected_escaped_tags = (
b"foo,tag_one=blue "
b"column_one=\"1\",column_two=1i "
b"0\n"
b"foo,tag_one=red\\,green\\=orange "
b"column_one=\"2\",column_two=2i "
b"3600000000000\n"
)
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)
cli = DataFrameClient(database='db')
cli.write_points(dataframe, 'foo',
field_columns=['column_one', 'column_two'],
tag_columns=['tag_one'])
self.assertEqual(m.last_request.body, expected_escaped_tags)
评论列表
文章目录