def __init__(self, config, flows, sender, logger, **kwargs):
self.logger = logger
self.sender = sender
self.config = config
# collector is a class to execute queries for network status.
self.collector = StatsCollector(InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT, username=INFLUXDB_USER, password=INFLUXDB_PASS, database=INFLUXDB_DB, timeout=10))
table_id = None
self.fm_builder = FlowModMsgBuilder(0, self.config.flanc_auth["key"])
try:
table_id = config.tables['monitor']
except KeyError, e:
print "Monitoring table does not exists in the sdx_global.cfg file! - Add a table named %s." % str(e)
# Build initial monitoring flows
if flows != None:
self.monitor_flows_builder(flows)
python类InfluxDBClient()的实例源码
historical_delivery.py 文件源码
项目:kafka-spark-influx-csv-analysis
作者: bwsw
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def instance_data_delivery(self):
if self._config["historical"]["method"] == "influx":
influx_options = self._config["historical"]["influx_options"]
client = InfluxDBClient(influx_options["host"], influx_options["port"], influx_options["username"],
influx_options["password"], influx_options["database"])
return HistoryDataSingleton(client)
def instance_writer(self, output_config, struct, enumerate_input_field):
output = output_config.content["output"]
if output["method"] == "influx":
client = InfluxDBClient(output["options"]["influx"]["host"], output["options"]["influx"]["port"],
output["options"]["influx"]["username"], output["options"]["influx"]["password"],
output["options"]["influx"]["database"])
return InfluxWriter(client, output["options"]["influx"]["database"],
output["options"]["influx"]["measurement"], struct, enumerate_input_field)
elif output["method"] == "stdout":
return StdOutWriter()
raise errors.UnsupportedOutputFormat("Format {} not supported".format(output["method"]))
def test_invalid_port_fails(self):
with self.assertRaises(ValueError):
InfluxDBClient('host', '80/redir', 'username', 'password')
def test_write_points_udp(self):
cli = InfluxDBClient(
'localhost',
self.influxd_inst.http_port,
'root',
'',
database='db',
use_udp=True,
udp_port=self.influxd_inst.udp_port
)
cli.write_points(dummy_point)
# The points are not immediately available after write_points.
# This is to be expected because we are using udp (no response !).
# So we have to wait some time,
time.sleep(3) # 3 sec seems to be a good choice.
rsp = self.cli.query('SELECT * FROM cpu_load_short')
self.assertEqual(
# this is dummy_points :
[
{'value': 0.64,
'time': '2009-11-10T23:00:00Z',
"host": "server01",
"region": "us-west"}
],
list(rsp['cpu_load_short'])
)
def init(db):
global CLI
CLI = InfluxDBClient(
db.conf["host"],
db.conf["port"],
db.conf["user"],
db.conf["password"],
db.conf["db"],
)
def client(self):
if not hasattr(self, '_client'):
self._client = InfluxDBClient(
self.HOST, self.PORT, self.ADMIN_USER, self.ADMIN_PASSWORD,
self.DB_NAME)
return self._client
def connect_to_influxdb(module):
hostname = module.params['hostname']
port = module.params['port']
username = module.params['username']
password = module.params['password']
database_name = module.params['database_name']
client = InfluxDBClient(
host=hostname,
port=port,
username=username,
password=password,
database=database_name
)
return client
def __init__(self, type, model):
self.influx = InfluxDBClient(config.INFLUX_HOST, config.INFLUX_PORT, config.INFLUX_USER, config.INFLUX_PASSWD, config.INFLUX_DB)
self.sensor = None
self._load_sensor(type, model)
def use_store(self):
"""
Opens a database to save data
"""
self.db = InfluxDBClient(
self.settings.get('host', 'localhost'),
self.settings.get('port', 8086),
self.settings.get('user', 'root'),
self.settings.get('password', 'root'),
self.settings.get('database', 'mcp'),
)
return self.db
def test_query(mock_flux):
db = influxdb.InfluxDBClient(database="fizz")
db.query.side_effect = influxdb.exceptions.InfluxDBClientError(None)
client = InfluxAlchemy(db)
query = client.query(Measurement.new("buzz"))
assert str(query) == "SELECT * FROM buzz;"
def test_measurements(mock_flux):
mock_res = mock.MagicMock()
mock_res.get_points.return_value = [{"name": "fizz"}]
mock_flux.return_value = mock_res
db = influxdb.InfluxDBClient(database="fizz")
client = InfluxAlchemy(db)
measurements = list(client.measurements())
mock_flux.assert_called_once_with("SHOW MEASUREMENTS;")
def test_tags(mock_flux):
mock_res = mock.MagicMock()
mock_res.get_points.return_value = [{'tagKey': 'sensor_id'}]
mock_flux.return_value = mock_res
db = influxdb.InfluxDBClient(database="fizz")
client = InfluxAlchemy(db)
assert client.tags(Measurement.new("environment")) == ["sensor_id"]
def test_fields(mock_flux):
mock_res = mock.MagicMock()
mock_res.get_points.return_value = [
{'fieldKey': 'humidity', 'fieldType': 'float'},
{'fieldKey': 'temperature', 'fieldType': 'float'}
]
mock_flux.return_value = mock_res
db = influxdb.InfluxDBClient(database="fizz")
client = InfluxAlchemy(db)
assert client.fields(Measurement.new("environment")) == ["humidity", "temperature"]
def test_repr(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
query = client.query(Measurement.new("fizz"))
assert repr(query) == "SELECT * FROM fizz;"
def test_filter(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
meas = Measurement.new("fizz")
query = client.query(meas).filter(meas.buzz == "goo")
assert repr(query) == "SELECT * FROM fizz WHERE (buzz = 'goo');"
def test_filter_time_naive(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
meas = Measurement.new("fizz")
d = datetime(2016, 10, 1)
query = client.query(meas).filter(meas.time >= d)
assert repr(query) == "SELECT * FROM fizz WHERE (time >= '2016-10-01T00:00:00+00:00');"
def test_filter_time_date(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
meas = Measurement.new("fizz")
d = date(2016, 10, 1)
query = client.query(meas).filter(meas.time >= d)
assert repr(query) == "SELECT * FROM fizz WHERE (time >= '2016-10-01');"
def test_filter_time_aware(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
meas = Measurement.new("fizz")
if sys.version_info.major >= 3:
tz_vietnam = timezone(timedelta(hours=7, minutes=7))
else:
tz_vietnam = timezone('Asia/Ho_Chi_Minh')
d_low = datetime(2016, 9, 1, tzinfo=tz_vietnam)
d_high = datetime(2016, 10, 2, 8)
query = client.query(meas).filter(meas.time.between(d_low, d_high))
assert repr(query) == "SELECT * FROM fizz WHERE (time >= '2016-09-01T00:00:00+07:07' AND time <= '2016-10-02T08:00:00+00:00');"
def test_group_by(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
query = client.query(Measurement.new("fizz")).group_by("buzz")
assert str(query) == "SELECT * FROM fizz GROUP BY buzz;"