test_influxdb.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:antevents-python 作者: mpi-sws-rse 项目源码 文件源码
def test_influx_output():
    loop = asyncio.get_event_loop()
    s = ValueListSensor(1, value_stream)
    p = SensorPub(s)
    b = InfluxDBWriter(msg_format=Sensor(series_name='Sensor', fields=['val', 'ts'], tags=['sensor_id']), generate_timestamp=False)
    p.subscribe(b)

    scheduler = Scheduler(loop)
    scheduler.schedule_periodic(p, 0.2) # sample five times every second
    scheduler.run_forever()

    # Now play back
    c = InfluxDBClient(database='antevents')
    rs = c.query('SELECT * FROM Sensor;').get_points()
    for d in rs: 
        print(d)

    # Play back using a publisher
    p = InfluxDBReader('SELECT * FROM Sensor;')
    p.subscribe(CallableAsSubscriber(print))

    scheduler = Scheduler(loop)
    scheduler.schedule_periodic(p, 0.2) # sample five times every second
    scheduler.run_forever()
    print("That's all folks")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号