def v2_playbook_on_stats(self, stats):
"""Connect to InfluxDB and commit events"""
# Set InfluxDB host from an environment variable if provided
_host = os.getenv('influx_vip') or self.host_vars['influx_vip']
_port = "8086"
_user = "None"
_pass = "None"
_dbname = "events"
influxdb = InfluxDBClient(_host, _port, _user, _pass, _dbname)
try:
influxdb.write_points(self.events, time_precision='u')
except Exception:
# Disable the plugin if writes fail
self.disabled = True
self._display.warning(
"Cannot write to InfluxDB, check the service state "
"on %s." % _host)
return
python类InfluxDBClient()的实例源码
def check_db_status():
# if the db is not found, then try to create it
try:
dbclient = InfluxDBClient(db_server, db_port, db_admin, db_admin_password)
dblist = dbclient.get_list_database()
db_found = False
for db in dblist:
if db['name'] == db_name:
db_found = True
if not(db_found):
logger.info('Database <%s> not found, trying to create it', db_name)
dbclient.create_database(db_name)
return True
except Exception as e:
logger.error('Error querying open-nti database: %s', e)
return False
def get_latest_datapoints(**kwargs):
dbclient = InfluxDBClient(db_server, db_port, db_admin, db_admin_password)
dbclient.switch_database(db_name)
results = {}
if db_schema == 1:
query = "select * from /%s\./ ORDER BY time DESC limit 1 " % (kwargs['host'])
elif db_schema == 2:
query = "select * from \"%s\" WHERE device = '%s' GROUP BY * ORDER BY time DESC limit 1 " % ('jnpr.collector',kwargs['host'])
elif db_schema == 3:
query = "select * from // WHERE device = '%s' GROUP BY * ORDER BY time DESC limit 1 " % (kwargs['host'])
else:
logger.error("ERROR: Unknown db_schema: <%s>", db_schema)
return results
results = dbclient.query(query)
return results
def main(host='localhost', port=8086, domain=None, key=None):
try:
user = 'admin'
password = 'admin'
dbname = 'beastcraft'
dbclient = InfluxDBClient(host, port, user, password, dbname)
session = gps.gps(host='localhost', port='2947')
session.stream(gps.WATCH_ENABLE|gps.WATCH_NEWSTYLE)
start_time = time.time() - WAIT_TIME
reports = []
for report in session:
report = report.__dict__
if report['class'] == 'TPV':
reports.append(report)
if time.time() - start_time > WAIT_TIME:
write_db(dbclient, summarise_rpt(reports), domain=domain, key=key)
reports = []
start_time = time.time()
except Exception, e:
print '%s retrieving GPS stats, retrying in %d seconds' % (repr(e), WAIT_TIME)
time.sleep(WAIT_TIME)
def flow_query(field,client,interval,dp_name,cookie=None,table_id=0,
flow=None):
query_dict = base_flow_query(field, interval, dp_name, cookie, table_id)
if flow is not None:
query_dict["WHERE"] += match_fields_query(flow)
query = ' '.join(['%s' % (value) for (key, value) in query_dict.items()])
result = client.query(query)
gen = result[(field, None)]
for v in gen:
return v["non_negative_derivative"]
# The StatsCollector needs to receive
# an InfluxDBClient object to perform queries.
# e.g:
# INFLUXDB_DB = "sdx"
# INFLUXDB_HOST = "localhost"
# INFLUXDB_PORT = 8086
# INFLUXDB_USER = ""
# INFLUXDB_PASS = ""
# client = InfluxDBClient(
# host=INFLUXDB_HOST, port=INFLUXDB_PORT,
# username=INFLUXDB_USER, password=INFLUXDB_PASS,
# database=INFLUXDB_DB, timeout=10)
# c = StatsCollector(client)
def __init__(self):
self.scale=Scale(
calibrate_weight=1074 *1534/ 1645,
calibrate_factors=[
402600,
428500,
443400,
439700,
],
callback=self.measurement_event
)
self.catalyser=Catalyser(callback=self.catalyser_event)
self.last_save=time.time()
self.db_timestamp=0
self.load_state()
#db shizzle
self.client = influxdb.InfluxDBClient('localhost', 8086, database="meowton")
self.client.create_database("meowton")
self.points_batch=[]
def test_query_chunked(self):
cli = InfluxDBClient(database='db')
example_object = {
'points': [
[1415206250119, 40001, 667],
[1415206244555, 30001, 7],
[1415206228241, 20001, 788],
[1415206212980, 10001, 555],
[1415197271586, 10001, 23]
],
'name': 'foo',
'columns': [
'time',
'sequence_number',
'val'
]
}
del cli
del example_object
# TODO ?
def setUpClass(cls):
super(TestSeriesHelper, cls).setUpClass()
TestSeriesHelper.client = InfluxDBClient(
'host',
8086,
'username',
'password',
'database'
)
class MySeriesHelper(SeriesHelper):
class Meta:
client = TestSeriesHelper.client
series_name = 'events.stats.{server_name}'
fields = ['some_stat']
tags = ['server_name', 'other_tag']
bulk_size = 5
autocommit = True
TestSeriesHelper.MySeriesHelper = MySeriesHelper
def test_auto_commit(self):
"""
Tests that write_points is called after the right number of events
"""
class AutoCommitTest(SeriesHelper):
class Meta:
series_name = 'events.stats.{server_name}'
fields = ['some_stat']
tags = ['server_name', 'other_tag']
bulk_size = 5
client = InfluxDBClient()
autocommit = True
fake_write_points = mock.MagicMock()
AutoCommitTest(server_name='us.east-1', some_stat=159, other_tag='gg')
AutoCommitTest._client.write_points = fake_write_points
AutoCommitTest(server_name='us.east-1', some_stat=158, other_tag='gg')
AutoCommitTest(server_name='us.east-1', some_stat=157, other_tag='gg')
AutoCommitTest(server_name='us.east-1', some_stat=156, other_tag='gg')
self.assertFalse(fake_write_points.called)
AutoCommitTest(server_name='us.east-1', some_stat=3443, other_tag='gg')
self.assertTrue(fake_write_points.called)
def tryReconnect(self):
helper.internalLogger.info("Try reconnection to database" + self.dbname)
try:
"""Instantiate a connection to the InfluxDB."""
self.client = InfluxDBClient(self.host, self.port, self.user, self.password,self.dbname)
helper.internalLogger.info("Create database: " + self.dbname)
self.client.create_database(self.dbname)
helper.internalLogger.info("Create a retention policy")
self.client.create_retention_policy('awesome_policy', '3d', 3,default=True)
helper.internalLogger.info("Switch user: " + self.dbuser)
self.client.switch_user(self.dbuser, self.dbpss)
except KeyboardInterrupt:
print("Ok ok, quitting")
sys.exit(1)
except Exception as e:
e = sys.exc_info()[0]
helper.internalLogger.error('Unexpected error attempting to access to BD. It will be retried later.')
helper.einternalLogger.exception(e)
def record():
txt = request.form['entry']
try:
influx = InfluxDBClient(config.INFLUX_HOST, config.INFLUX_PORT, config.INFLUX_USER, config.INFLUX_PASSWD, config.INFLUX_DB)
except Exception as err:
flash("Entry was not recorded. Influx connection error: %s" % str(err))
if influx:
json_body = [
{
"measurement": "notes",
"tags":
{
"sleeper": config.SLEEPER
},
"fields": { 'note' : txt }
}
]
try:
influx.write_points(json_body)
flash('Entry recorded.')
except Exception as err:
flash("Entry was not recorded. Influx write error: %s" % str(err))
return render_template("index")
def reset_store(self):
"""
Opens a database for points
"""
logging.info('Resetting InfluxDB database')
self.db = InfluxDBClient(
self.settings.get('host', 'localhost'),
self.settings.get('port', 8086),
self.settings.get('user', 'root'),
self.settings.get('password', 'root'),
self.settings.get('database', 'mcp'),
)
self.db.drop_database(self.settings.get('database', 'mcp'))
self.db.create_database(self.settings.get('database', 'mcp'))
return self.db
def import2db (json_list):
"""
take a list of json data and import it into our database
Work in progress
"""
host = "localhost"
port = "8086"
user = ""
password = ""
dbname = "db"
client = InfluxDBClient(host, port, user, password, dbname)
for json in json_list:
pass
#print (json)
# work in progress
#client.write_points(json_used)
#client.write_points(json_allocated)
#client.write_points(json_capacity)
#client.write_points(json_used_pct)
#client.write_points(json_allocated_pct)
def __init__(self, query, host="127.0.0.1", port=8086, database="thingflow",
username="root", password="root",
ssl=False, verify_ssl=False, timeout=None,
use_udp=False, udp_port=4444, proxies=None,
bulk_size=10):
super().__init__()
self.dbname = database
self.client = InfluxDBClient(host=host, port=port,
username=username, password=password,
database=database,
ssl=ssl, verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp, udp_port=udp_port,
proxies=proxies)
self.query = query
self.points = self.client.query(query).get_points()
def get_incremental_starts(config, default_start):
influx = InfluxDBClient(
username=config['influx']['user'],
password=config['influx']['password'],
database=config['influx']['db'],
host=config['influx'].get('host'))
account_starts = {}
for account in config.get('accounts'):
for region in account.get('regions'):
res = influx.query("""
select * from program_event_name
where account = '%s'
and region = '%s'
order by time desc limit 1""" % (
account['name'], region))
if res is None or len(res) == 0:
account_starts[(account['name'], region)] = default_start
continue
# its all utc
account_starts[(account['name'], region)] = parse_date(
res.raw['series'][0]['values'][0][0]).replace(tzinfo=None)
return account_starts
def __init__(self, query, host="127.0.0.1", port=8086, database="antevents",
username="root", password="root",
ssl=False, verify_ssl=False, timeout=None,
use_udp=False, udp_port=4444, proxies=None,
bulk_size=10):
super().__init__()
self.dbname = database
self.client = InfluxDBClient(host=host, port=port,
username=username, password=password,
database=database,
ssl=ssl, verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp, udp_port=udp_port,
proxies=proxies)
self.query = query
self.points = self.client.query(query).get_points()
def test_influx_output():
loop = asyncio.get_event_loop()
s = ValueListSensor(1, value_stream)
p = SensorPub(s)
b = InfluxDBWriter(msg_format=Sensor(series_name='Sensor', fields=['val', 'ts'], tags=['sensor_id']), generate_timestamp=False)
p.subscribe(b)
scheduler = Scheduler(loop)
scheduler.schedule_periodic(p, 0.2) # sample five times every second
scheduler.run_forever()
# Now play back
c = InfluxDBClient(database='antevents')
rs = c.query('SELECT * FROM Sensor;').get_points()
for d in rs:
print(d)
# Play back using a publisher
p = InfluxDBReader('SELECT * FROM Sensor;')
p.subscribe(CallableAsSubscriber(print))
scheduler = Scheduler(loop)
scheduler.schedule_periodic(p, 0.2) # sample five times every second
scheduler.run_forever()
print("That's all folks")
def use_database(self):
"""
Opens a database to save data
"""
logging.info("using InfluxDB database")
self.db = InfluxDBClient(
self.settings.get('host', 'localhost'),
self.settings.get('port', 8086),
self.settings.get('user', 'root'),
self.settings.get('password', 'root'),
self.settings.get('database', 'smart-video-counter'),
)
self.db.create_database(self.settings.get('database', 'smart-counter'))
return self.db
def reset_database(self):
"""
Opens a database for points
"""
logging.info("resetting InfluxDB database")
self.db = InfluxDBClient(
self.settings.get('host', 'localhost'),
self.settings.get('port', 8086),
self.settings.get('user', 'root'),
self.settings.get('password', 'root'),
self.settings.get('database', 'smart-video-counter'),
)
self.db.drop_database(self.settings.get('database', 'smart-counter'))
self.db.create_database(self.settings.get('database', 'smart-counter'))
return self.db
def init(self, args):
self._client = InfluxDBClient(
args.influx_host,
args.influx_port,
args.username,
args.password,
args.db_name)
# Create database if it doesn't exist
dbs = self._client.get_list_database()
if not any(db['name'] == args.db_name for db in dbs):
self._client.create_database(args.db_name)
self.session = args.session
self.run_no = args.run_no
self._device_values = {}
self._update_list = {}
self._last_resend_time = None
resend_thread = Thread(target=self._resend_thread)
resend_thread.daemon = True
resend_thread.start()
def test_multi_fetch_non_existant_series(self):
"""Test single fetch data for a series by name"""
path1, path2 = 'fake_path1', 'fake_path2'
reader1 = influxgraph.InfluxDBReader(InfluxDBClient(
database=self.db_name), path1)
reader2 = influxgraph.InfluxDBReader(InfluxDBClient(
database=self.db_name), path2)
nodes = [influxgraph.classes.leaf.InfluxDBLeafNode(path1, reader1),
influxgraph.classes.leaf.InfluxDBLeafNode(path2, reader2)]
time_info, data = self.finder.fetch_multi(nodes,
int(self.start_time.strftime("%s")),
int(self.end_time.strftime("%s")))
for metric_name in data:
self.assertFalse(data[metric_name],
msg="Expected no data for non-existant series %s - got %s" % (
metric_name, data,))
fake_nodes = list(self.finder.find_nodes(Query('fake_pathy_path')))
time_info, data = self.finder.fetch_multi(fake_nodes,
int(self.start_time.strftime("%s")),
int(self.end_time.strftime("%s")))
self.assertFalse(data)
def __init__(self, host, port, user, password, db, ssl=False, verify_ssl=False, measure='flask', *args, **kw):
ObserverMetrics.__init__(self, *args, **kw)
self._data = [
{
"measurement": measure,
"tags": {},
"fields": {},
}
]
try:
self.db = InfluxDBClient(host=host,
port=port,
username=user,
password=password,
database=db,
ssl=ssl,
verify_ssl=verify_ssl)
except InfluxDBClientError:
self.logger.critical("Cannot connect to InfluxDB database '%s'" % db)
def load_influx_settings(self):
"""Load Influxdb server information stored in database base."""
try:
settings = {}
field_names = '''
server
port
username
password
'''.split()
sql = 'SELECT {fields} FROM influx_settings LIMIT 1;'.format(
fields=', '.join(field_names))
database = sqlite3.connect(utilities.DB_CORE)
db_elements = database.cursor().execute(sql).fetchone()
for field, value in zip(field_names, db_elements):
settings[field] = value
self.ifconn = InfluxDBClient(
settings["server"], settings["port"], settings["username"], settings["password"]
)
Log.info("Influxdb information loaded.")
except Exception as excpt:
Log.exception("Trying to load Influx server information: %s.", excpt)
finally:
database.close()
def __init__(self, log, host, port, user, password, dbname, dbuser, dbpassword, dbMeasurement):
assert (host is not None), "InfluxConnector: host value is None!"
assert (port is not None), "InfluxConnector: port value is None!"
assert (user is not None), "InfluxConnector: user name value is None!"
assert (password is not None), "InfluxConnector: password value is None!"
assert (dbname is not None), "InfluxConnector: dbname name value is None!"
assert (dbuser is not None), "InfluxConnector: dbuser name value is None!"
assert (dbpassword is not None), "InfluxConnector: dbpassword value is None!"
assert (dbMeasurement is not None), "InfluxConnector: measurement value is None!"
assert (len(dbMeasurement) > 0), "InfluxConnector: measurement value is empty!"
self.host = host
self.port = port
self.user = user
self.password = password
self.dbname = dbname
self.dbuser = dbuser
self.dbpassword = dbpassword
self.dbMeasurement = dbMeasurement
self.log = log
self.myInfluxDb = InfluxDBClient(self.host, self.port, self.user, self.password, self.dbname)
self.myInfluxDb.create_database(self.dbname) # repeated call possible, also the database exists
self.myInfluxDb.switch_user(self.dbuser, self.dbpassword)
def main(args):
client = InfluxDBClient(
args.influx_ip, args.influx_port, database="telegraf"
)
stages = get_build_data(
client,
args.build_ref,
leapfrog=args.leapfrog_upgrade,
leapfiledir=args.leapfiledir,
completefiledir=args.completefiledir,
)
generate_reports(
data=stages,
max_downtime=100,
ymlfile=args.ymlreport,
subunitfile=args.subunitreport,
)
def output_influxdb(self):
""" Writes data to the InfluxDB """
client = InfluxDBClient(self.db_host, self.db_port, self.db_user, self.db_password, self.db_name)
# TODO: Refactor to batch to optimize writes to the DB
for data in self.data_list:
measurement = data['command']
# Build JSON body for the REST API call
json_body = [
{
'measurement': measurement,
'tags': data['tag'],
'fields': data['fields'],
'time': data['timestamp']
}
]
client.write_points(json_body, time_precision='s')
def get_influxdb():
host = 'localhost'
port = 8086
user = 'root'
password = 'root'
dbname = 'disney'
db = influxdb.InfluxDBClient(host, port, user, password, dbname)
db.create_database(dbname)
return db
def insert_datapoints(datapoints):
dbclient = InfluxDBClient(db_server, db_port, db_admin, db_admin_password)
dbclient.switch_database(db_name)
logger.info('Inserting into database the following datapoints:')
logger.info(pformat(datapoints))
response = dbclient.write_points(datapoints)
def get_handle_db():
global HANDLE_DB
if HANDLE_DB == '':
HANDLE_DB = influxdb.InfluxDBClient(
host=DOCKER_IP,
port=TEST_PORT_INFLUXDB_API,
database=DATABASE_NAME,
username="juniper",
password="juniper"
)
return HANDLE_DB
#############################################
def get_influxdb_handle():
global INFLUXDB_HANDLE
if INFLUXDB_HANDLE == '':
INFLUXDB_HANDLE = influxdb.InfluxDBClient(
host=OPENNTI_IP,
port=OPENNTI_API_PORT,
database=OPENNTI_DATABASE_NAME,
username="juniper",
password="juniper"
)
return INFLUXDB_HANDLE