def __get_result_set_from_mock(mock, query):
result_wrapper = Mock()
influx_points = []
result = re.search(r"(\d{19}).*?(\d{19})", query)
start, end = result.groups()
# extract range
points = list(filter(lambda point: point["time"] > int(start) and point["time"] < int(end),
mock.points))
country_result = re.search(r"\"country\"=\'(\w+)\'", query)
if country_result:
country = country_result.groups()[0]
points = list(filter(lambda point: point["tags"]["country"] == country, points))
for point in points:
d = {**point["fields"], **point.get("tags", {})}
d["time"] = datetime.utcfromtimestamp(point["time"] // 1000000000).strftime('%Y-%m-%dT%H:%M:%SZ')
influx_points.append(d)
result_wrapper.get_points.return_value = influx_points
return result_wrapper
test_history_data_driver.py 文件源码
python
阅读 30
收藏 0
点赞 0
评论 0
评论列表
文章目录