def test_influx_output():
loop = asyncio.get_event_loop()
s = ValueListSensor(1, value_stream)
p = SensorPub(s)
b = InfluxDBWriter(msg_format=Sensor(series_name='Sensor', fields=['val', 'ts'], tags=['sensor_id']), generate_timestamp=False)
p.subscribe(b)
scheduler = Scheduler(loop)
scheduler.schedule_periodic(p, 0.2) # sample five times every second
scheduler.run_forever()
# Now play back
c = InfluxDBClient(database='antevents')
rs = c.query('SELECT * FROM Sensor;').get_points()
for d in rs:
print(d)
# Play back using a publisher
p = InfluxDBReader('SELECT * FROM Sensor;')
p.subscribe(CallableAsSubscriber(print))
scheduler = Scheduler(loop)
scheduler.schedule_periodic(p, 0.2) # sample five times every second
scheduler.run_forever()
print("That's all folks")
评论列表
文章目录