Influxdb #1

Merged
d_mcknight merged 50 commits from github/fork/Dinnerbone/influxdb into master 2023-06-21 20:14:15 -07:00
6 changed files with 67 additions and 20 deletions
Showing only changes of commit 2527b32a20 - Show all commits

View file

@ -16,6 +16,7 @@ ssl = false
verify_ssl = false verify_ssl = false
username = root username = root
password = root password = root
org = -
[tautulli-1] [tautulli-1]
url = tautulli.domain.tld:8181 url = tautulli.domain.tld:8181

View file

@ -5,6 +5,7 @@
requests==2.25.1 requests==2.25.1
geoip2==2.9.0 geoip2==2.9.0
influxdb==5.2.0 influxdb==5.2.0
influxdb-client==1.30.0
schedule==0.6.0 schedule==0.6.0
distro==1.4.0 distro==1.4.0
urllib3==1.26.5 urllib3==1.26.5

View file

@ -1,45 +1,87 @@
import re
from sys import exit from sys import exit
from logging import getLogger from logging import getLogger
from influxdb import InfluxDBClient from influxdb_client import InfluxDBClient, BucketRetentionRules
from requests.exceptions import ConnectionError from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb.exceptions import InfluxDBServerError from influxdb_client.client.exceptions import InfluxDBError
from urllib3.exceptions import NewConnectionError
class DBManager(object): class DBManager(object):
def __init__(self, server): def __init__(self, server):
self.server = server self.server = server
self.logger = getLogger() self.logger = getLogger()
self.bucket = "varken"
if self.server.url == "influxdb.domain.tld": if self.server.url == "influxdb.domain.tld":
self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration") self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration")
exit() exit()
self.influx = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username,
password=self.server.password, ssl=self.server.ssl, database='varken', url = self.server.url
verify_ssl=self.server.verify_ssl) if 'http' not in url:
scheme = 'http'
if self.server.ssl:
scheme = 'https'
url = "{}://{}:{}".format(scheme, self.server.url, self.server.port)
token = f'{self.server.username}:{self.server.password}'
self.influx = InfluxDBClient(url=url, token=token,
verify_ssl=self.server.verify_ssl, org=self.server.org)
try: try:
version = self.influx.request('ping', expected_response_code=204).headers['X-Influxdb-Version'] version = self.influx.version()
self.logger.info('Influxdb version: %s', version) self.logger.info('Influxdb version: %s', version)
except ConnectionError: match = re.match(r'(\d+)\.', version)
self.logger.critical("Error testing connection to InfluxDB. Please check your url/hostname") if match:
self.version = int(match[1])
self.logger.info("Using InfluxDB API v%s", self.version)
else:
self.logger.critical("Unknown influxdb version")
exit(1)
except NewConnectionError:
self.logger.critical("Error getting InfluxDB version number. Please check your url/hostname are valid")
exit(1) exit(1)
databases = [db['name'] for db in self.influx.get_list_database()] if self.version >= 2:
self.create_v2_bucket()
else:
self.create_v1_database()
if 'varken' not in databases: def create_v2_bucket(self):
if not self.influx.buckets_api().find_bucket_by_name(self.bucket):
self.logger.info("Creating varken bucket")
retention = BucketRetentionRules(type="expire", every_seconds=60 * 60 * 24 * 30,
shard_group_duration_seconds=60 * 60)
self.influx.buckets_api().create_bucket(bucket_name=self.bucket,
retention_rules=retention)
def create_v1_database(self):
from influxdb import InfluxDBClient
client = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username,
password=self.server.password, ssl=self.server.ssl, database=self.bucket,
verify_ssl=self.server.verify_ssl)
databases = [db['name'] for db in client.get_list_database()]
if self.bucket not in databases:
self.logger.info("Creating varken database") self.logger.info("Creating varken database")
self.influx.create_database('varken') client.create_database(self.bucket)
retention_policies = [policy['name'] for policy in retention_policies = [policy['name'] for policy in
self.influx.get_list_retention_policies(database='varken')] client.get_list_retention_policies(database=self.bucket)]
if 'varken 30d-1h' not in retention_policies: if 'varken 30d-1h' not in retention_policies:
self.logger.info("Creating varken retention policy (30d-1h)") self.logger.info("Creating varken retention policy (30d-1h)")
self.influx.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', client.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1',
database='varken', default=True, shard_duration='1h') database=self.bucket, default=True, shard_duration='1h')
self.bucket = f'{self.bucket}/varken 30d-1h'
def write_points(self, data): def write_points(self, data):
d = data d = data
self.logger.debug('Writing Data to InfluxDB %s', d) self.logger.debug('Writing Data to InfluxDB %s', d)
write_api = self.influx.write_api(write_options=SYNCHRONOUS)
try: try:
self.influx.write_points(d) write_api.write(bucket=self.bucket, record=data)
except (InfluxDBServerError, ConnectionError) as e: except (InfluxDBError, NewConnectionError) as e:
self.logger.error('Error writing data to influxdb. Dropping this set of data. ' self.logger.error('Error writing data to influxdb. Dropping this set of data. '
'Check your database! Error: %s', e) 'Check your database! Error: %s', e)

View file

@ -154,13 +154,15 @@ class INIParser(object):
username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username')) username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username'))
password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password')) password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password'))
org = env.get('VRKN_INFLUXDB_ORG', self.config.get('influxdb', 'org'))
except NoOptionError as e: except NoOptionError as e:
self.logger.error('Missing key in %s. Error: %s', "influxdb", e) self.logger.error('Missing key in %s. Error: %s', "influxdb", e)
self.rectify_ini() self.rectify_ini()
return return
self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl,
verify_ssl=verify_ssl) verify_ssl=verify_ssl, org=org)
# Check for all enabled services # Check for all enabled services
for service in self.services: for service in self.services:

View file

@ -18,6 +18,7 @@ class InfluxServer(NamedTuple):
url: str = 'localhost' url: str = 'localhost'
username: str = 'root' username: str = 'root'
verify_ssl: bool = False verify_ssl: bool = False
org: str = '-'
class SonarrServer(NamedTuple): class SonarrServer(NamedTuple):

View file

@ -2,7 +2,7 @@ from logging import getLogger
from requests import Session, Request from requests import Session, Request
from geoip2.errors import AddressNotFoundError from geoip2.errors import AddressNotFoundError
from datetime import datetime, timezone, date, timedelta from datetime import datetime, timezone, date, timedelta
from influxdb.exceptions import InfluxDBClientError from influxdb_client.client.exceptions import InfluxDBError
from varken.structures import TautulliStream from varken.structures import TautulliStream
from varken.helpers import hashit, connection_handler, itemgetter_with_default from varken.helpers import hashit, connection_handler, itemgetter_with_default
@ -363,7 +363,7 @@ class TautulliAPI(object):
) )
try: try:
self.dbmanager.write_points(influx_payload) self.dbmanager.write_points(influx_payload)
except InfluxDBClientError as e: except InfluxDBError as e:
if "beyond retention policy" in str(e): if "beyond retention policy" in str(e):
self.logger.debug('Only imported 30 days of data per retention policy') self.logger.debug('Only imported 30 days of data per retention policy')
else: else: