From 12c2a5790d7b506ce1c21d123b421dc9ba0cb661 Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 15:45:37 -0500 Subject: [PATCH 01/11] Add influxdb 2 client --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 38e1312..eba91af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ geoip2==2.9.0 influxdb==5.2.0 schedule==0.6.0 distro==1.4.0 -urllib3==1.24.2 \ No newline at end of file +urllib3==1.24.2 +influxdb-client==1.14.0 \ No newline at end of file -- 2.45.2 From 59d6821a3ae04d4e696a152c7341ac910d7a031c Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 15:46:26 -0500 Subject: [PATCH 02/11] Add structure for influxdb 2 params This contains all the data needed for connecting and writing to an InfluxDB2 server --- varken/structures.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/varken/structures.py b/varken/structures.py index deb4017..c671691 100644 --- a/varken/structures.py +++ b/varken/structures.py @@ -20,6 +20,16 @@ class InfluxServer(NamedTuple): verify_ssl: bool = False +class Influx2Server(NamedTuple): + url: str = 'localhost' + org: str = 'server' + token: str = 'TOKEN' + bucket: str = 'varken' + timeout: int = 10000 + ssl: bool = False + verify_ssl: bool = False + + class SonarrServer(NamedTuple): api_key: str = None future_days: int = 0 -- 2.45.2 From 5b15299a7328e1e12b8d0903f0923a8c2f92e2bc Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 15:48:41 -0500 Subject: [PATCH 03/11] Parse influxdb 2 config data --- varken/iniparser.py | 51 ++++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/varken/iniparser.py b/varken/iniparser.py index e241f31..7e9f9ab 100644 --- a/varken/iniparser.py +++ b/varken/iniparser.py @@ -9,7 +9,7 @@ from configparser import ConfigParser, NoOptionError, NoSectionError from varken.varkenlogger import BlacklistFilter from varken.structures import SickChillServer, UniFiServer from varken.helpers import clean_sid_check, rfc1918_ip_check, boolcheck -from varken.structures import SonarrServer, RadarrServer, OmbiServer, TautulliServer, InfluxServer +from varken.structures import SonarrServer, RadarrServer, OmbiServer, TautulliServer, InfluxServer, Influx2Server class INIParser(object): @@ -144,23 +144,44 @@ class INIParser(object): if read_file: self.config = self.read_file('varken.ini') self.config_blacklist() + # Parse InfluxDB options - try: - url = self.url_check(env.get('VRKN_INFLUXDB_URL', self.config.get('influxdb', 'url')), - include_port=False, section='influxdb') - port = int(env.get('VRKN_INFLUXDB_PORT', self.config.getint('influxdb', 'port'))) - ssl = boolcheck(env.get('VRKN_INFLUXDB_SSL', self.config.get('influxdb', 'ssl'))) - verify_ssl = boolcheck(env.get('VRKN_INFLUXDB_VERIFY_SSL', self.config.get('influxdb', 'verify_ssl'))) + self.influx2_enabled = env.get('VRKN_GLOBAL_INFLUXDB2_ENABLED', self.config.getboolean('global', 'influx2_enabled')) + + if self.influx2_enabled: + # Use INFLUX version 2 + try: + url = self.url_check(env.get('VRKN_INFLUXDB2_URL', self.config.get('influx2', 'url')), section='influx2') + ssl = boolcheck(env.get('VRKN_INFLUXDB2_SSL', self.config.get('influx2', 'ssl'))) + verify_ssl = boolcheck(env.get('VRKN_INFLUXDB2_VERIFY_SSL', self.config.get('influx2', 'verify_ssl'))) - username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username')) - password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password')) - except NoOptionError as e: - self.logger.error('Missing key in %s. Error: %s', "influxdb", e) - self.rectify_ini() - return + org = env.get('VRKN_INFLUXDB2_ORG', self.config.get('influx2', 'org')) + token = env.get('VRKN_INFLUXDB2_TOKEN', self.config.get('influx2', 'token')) + timeout = env.get('VRKN_INFLUXDB2_TIMEOUT', self.config.get('influx2', 'timeout')) + except NoOptionError as e: + self.logger.error('Missing key in %s. Error: %s', "influx2", e) + self.rectify_ini() + return - self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, - verify_ssl=verify_ssl) + self.influx_server = Influx2Server(url=url, token=token, org=org, timeout=timeout, ssl=ssl, + verify_ssl=verify_ssl) + else: + try: + url = self.url_check(env.get('VRKN_INFLUXDB_URL', self.config.get('influxdb', 'url')), + include_port=False, section='influxdb') + port = int(env.get('VRKN_INFLUXDB_PORT', self.config.getint('influxdb', 'port'))) + ssl = boolcheck(env.get('VRKN_INFLUXDB_SSL', self.config.get('influxdb', 'ssl'))) + verify_ssl = boolcheck(env.get('VRKN_INFLUXDB_VERIFY_SSL', self.config.get('influxdb', 'verify_ssl'))) + + username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username')) + password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password')) + except NoOptionError as e: + self.logger.error('Missing key in %s. Error: %s', "influxdb", e) + self.rectify_ini() + return + + self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, + verify_ssl=verify_ssl) # Check for all enabled services for service in self.services: -- 2.45.2 From 6d170c1cdccba5459281f9bdf795d3624adba952 Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 15:49:41 -0500 Subject: [PATCH 04/11] Add influxdb2 manager class This stores the data needed for InfluxDB2, and has a single `write_points` function on this that takes an array of points to add to the database --- varken/influxdb2manager.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 varken/influxdb2manager.py diff --git a/varken/influxdb2manager.py b/varken/influxdb2manager.py new file mode 100644 index 0000000..d6e3452 --- /dev/null +++ b/varken/influxdb2manager.py @@ -0,0 +1,28 @@ +from sys import exit +from logging import getLogger +from requests.exceptions import ConnectionError +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import SYNCHRONOUS + + +class InfluxDB2Manager(object): + def __init__(self, server): + self.server = server + self.logger = getLogger() + if self.server.url == "influxdb2.domain.tld": + self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration") + exit() + + self.influx = InfluxDBClient(url=self.server.url, token=self.server.token, org=self.server.org, + timeout=self.server.timeout, verify_ssl=self.server.verify_ssl, ssl_ca_cert=self.server.ssl) + self.influx_write_api = self.influx.write_api(write_options=SYNCHRONOUS) + + def write_points(self, data): + d = data + self.logger.info('Writing Data to InfluxDBv2 %s', d) + + try: + self.influx_write_api.write(bucket=self.server.bucket, record=d) + except (InfluxDBServerError, ConnectionError) as e: + self.logger.error('Error writing data to influxdb2. Dropping this set of data. ' + 'Check your database! Error: %s', e) -- 2.45.2 From 364253e107ee50eaa5f9be6a6eba953dbbd43e1c Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 15:50:12 -0500 Subject: [PATCH 05/11] Use the correct db manager for varken --- Varken.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Varken.py b/Varken.py index 3641cbc..55fc4fe 100644 --- a/Varken.py +++ b/Varken.py @@ -21,6 +21,7 @@ from varken.radarr import RadarrAPI from varken.lidarr import LidarrAPI from varken.iniparser import INIParser from varken.dbmanager import DBManager +from varken.influxdb2manager import InfluxDB2Manager from varken.helpers import GeoIPHandler from varken.tautulli import TautulliAPI from varken.sickchill import SickChillAPI @@ -90,7 +91,15 @@ if __name__ == "__main__": vl.logger.info("Varken v%s-%s %s", VERSION, BRANCH, BUILD_DATE) CONFIG = INIParser(DATA_FOLDER) - DBMANAGER = DBManager(CONFIG.influx_server) + + if CONFIG.influx2_enabled: + # Use INFLUX version 2 + vl.logger.info('Using INFLUXDBv2') + DBMANAGER = InfluxDB2Manager(CONFIG.influx_server) + else: + vl.logger.info('Using INFLUXDB') + DBMANAGER = DBManager(CONFIG.influx_server) + QUEUE = Queue() if CONFIG.sonarr_enabled: -- 2.45.2 From 06e6a0fb80f8e3ac977b1d7734c762703d902ab5 Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 15:50:48 -0500 Subject: [PATCH 06/11] Add influxdb2 to the example varken config file --- data/varken.example.ini | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/data/varken.example.ini b/data/varken.example.ini index fa072cf..aaa37b1 100644 --- a/data/varken.example.ini +++ b/data/varken.example.ini @@ -7,6 +7,7 @@ ombi_server_ids = 1 sickchill_server_ids = false unifi_server_ids = false maxmind_license_key = xxxxxxxxxxxxxxxx +influx2_enabled = false [influxdb] url = influxdb.domain.tld @@ -16,6 +17,15 @@ verify_ssl = false username = root password = root +[influx2] +url = influxdb2.domain.tld +org = ORG +token = TOKEN +timeout = 10000 +ssl = false +verify_ssl = false +bucket = varken + [tautulli-1] url = tautulli.domain.tld:8181 fallback_ip = 1.1.1.1 -- 2.45.2 From 1a712abe73a90849232e3b319e1ca203cbb7d17f Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 16:03:06 -0500 Subject: [PATCH 07/11] Create influx bucket if it doesn't exist --- varken/influxdb2manager.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/varken/influxdb2manager.py b/varken/influxdb2manager.py index d6e3452..0cb9df9 100644 --- a/varken/influxdb2manager.py +++ b/varken/influxdb2manager.py @@ -1,6 +1,7 @@ from sys import exit from logging import getLogger from requests.exceptions import ConnectionError +import influxdb_client from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS @@ -17,6 +18,23 @@ class InfluxDB2Manager(object): timeout=self.server.timeout, verify_ssl=self.server.verify_ssl, ssl_ca_cert=self.server.ssl) self.influx_write_api = self.influx.write_api(write_options=SYNCHRONOUS) + # Create the bucket if needed + + bucket_api = self.influx.buckets_api() + + bucket = bucket_api.find_bucket_by_name(self.server.bucket) + + if bucket is None: + self.logger.info('Creating bucket %s', self.server.bucket) + + org_api = influxdb_client.service.organizations_service.OrganizationsService(self.influx.api_client) + orgs = org_api.get_orgs() + for org in orgs.orgs: + if org.name == self.server.org: + my_org = org + + self.influx.buckets_api().create_bucket(bucket_name=self.server.bucket, org_id=my_org.id) + def write_points(self, data): d = data self.logger.info('Writing Data to InfluxDBv2 %s', d) -- 2.45.2 From ede4762818c1f019314de42bf0380e5fd4b3feef Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 16:05:45 -0500 Subject: [PATCH 08/11] Update InfluxDB type on README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dd25a3e..de4e654 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ ecosystem into InfluxDB using Grafana for a frontend Requirements: * [Python 3.6.7+](https://www.python.org/downloads/release/python-367/) * [Python3-pip](https://pip.pypa.io/en/stable/installing/) -* [InfluxDB 1.8.x](https://www.influxdata.com/) +* [InfluxDB 1.8.x or 2.0.x](https://www.influxdata.com/) * [Grafana](https://grafana.com/)

-- 2.45.2 From a78da1281b1848bc6ea6828700e9547da839b20b Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 16:38:42 -0500 Subject: [PATCH 09/11] Clean up linting errors --- varken/influxdb2manager.py | 8 ++++---- varken/iniparser.py | 14 ++++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/varken/influxdb2manager.py b/varken/influxdb2manager.py index 0cb9df9..3979ba4 100644 --- a/varken/influxdb2manager.py +++ b/varken/influxdb2manager.py @@ -1,8 +1,7 @@ from sys import exit from logging import getLogger -from requests.exceptions import ConnectionError import influxdb_client -from influxdb_client import InfluxDBClient, Point +from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS @@ -15,7 +14,8 @@ class InfluxDB2Manager(object): exit() self.influx = InfluxDBClient(url=self.server.url, token=self.server.token, org=self.server.org, - timeout=self.server.timeout, verify_ssl=self.server.verify_ssl, ssl_ca_cert=self.server.ssl) + timeout=self.server.timeout, verify_ssl=self.server.verify_ssl, + ssl_ca_cert=self.server.ssl) self.influx_write_api = self.influx.write_api(write_options=SYNCHRONOUS) # Create the bucket if needed @@ -41,6 +41,6 @@ class InfluxDB2Manager(object): try: self.influx_write_api.write(bucket=self.server.bucket, record=d) - except (InfluxDBServerError, ConnectionError) as e: + except Exception as e: self.logger.error('Error writing data to influxdb2. Dropping this set of data. ' 'Check your database! Error: %s', e) diff --git a/varken/iniparser.py b/varken/iniparser.py index 7e9f9ab..c00174c 100644 --- a/varken/iniparser.py +++ b/varken/iniparser.py @@ -146,12 +146,14 @@ class INIParser(object): self.config_blacklist() # Parse InfluxDB options - self.influx2_enabled = env.get('VRKN_GLOBAL_INFLUXDB2_ENABLED', self.config.getboolean('global', 'influx2_enabled')) - + self.influx2_enabled = env.get('VRKN_GLOBAL_INFLUXDB2_ENABLED', + self.config.getboolean('global', 'influx2_enabled')) + if self.influx2_enabled: # Use INFLUX version 2 try: - url = self.url_check(env.get('VRKN_INFLUXDB2_URL', self.config.get('influx2', 'url')), section='influx2') + url = self.url_check(env.get('VRKN_INFLUXDB2_URL', self.config.get('influx2', 'url')), + section='influx2') ssl = boolcheck(env.get('VRKN_INFLUXDB2_SSL', self.config.get('influx2', 'ssl'))) verify_ssl = boolcheck(env.get('VRKN_INFLUXDB2_VERIFY_SSL', self.config.get('influx2', 'verify_ssl'))) @@ -164,11 +166,11 @@ class INIParser(object): return self.influx_server = Influx2Server(url=url, token=token, org=org, timeout=timeout, ssl=ssl, - verify_ssl=verify_ssl) + verify_ssl=verify_ssl) else: try: url = self.url_check(env.get('VRKN_INFLUXDB_URL', self.config.get('influxdb', 'url')), - include_port=False, section='influxdb') + include_port=False, section='influxdb') port = int(env.get('VRKN_INFLUXDB_PORT', self.config.getint('influxdb', 'port'))) ssl = boolcheck(env.get('VRKN_INFLUXDB_SSL', self.config.get('influxdb', 'ssl'))) verify_ssl = boolcheck(env.get('VRKN_INFLUXDB_VERIFY_SSL', self.config.get('influxdb', 'verify_ssl'))) @@ -181,7 +183,7 @@ class INIParser(object): return self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, - verify_ssl=verify_ssl) + verify_ssl=verify_ssl) # Check for all enabled services for service in self.services: -- 2.45.2 From b4efd15f1e76eecf4b9cb20a2c35ab69bd9ec155 Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Sat, 13 Feb 2021 16:40:02 -0500 Subject: [PATCH 10/11] Wrap create bucket in try/catch --- varken/influxdb2manager.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/varken/influxdb2manager.py b/varken/influxdb2manager.py index 3979ba4..66eb12d 100644 --- a/varken/influxdb2manager.py +++ b/varken/influxdb2manager.py @@ -22,18 +22,21 @@ class InfluxDB2Manager(object): bucket_api = self.influx.buckets_api() - bucket = bucket_api.find_bucket_by_name(self.server.bucket) + try: + bucket = bucket_api.find_bucket_by_name(self.server.bucket) - if bucket is None: - self.logger.info('Creating bucket %s', self.server.bucket) + if bucket is None: + self.logger.info('Creating bucket %s', self.server.bucket) - org_api = influxdb_client.service.organizations_service.OrganizationsService(self.influx.api_client) - orgs = org_api.get_orgs() - for org in orgs.orgs: - if org.name == self.server.org: - my_org = org + org_api = influxdb_client.service.organizations_service.OrganizationsService(self.influx.api_client) + orgs = org_api.get_orgs() + for org in orgs.orgs: + if org.name == self.server.org: + my_org = org - self.influx.buckets_api().create_bucket(bucket_name=self.server.bucket, org_id=my_org.id) + self.influx.buckets_api().create_bucket(bucket_name=self.server.bucket, org_id=my_org.id) + except Exception as e: + self.logger.error('Failed creating new InfluxDB bucket! Error: %s', e) def write_points(self, data): d = data -- 2.45.2 From ac25883dc47ab5cc65e945725b749c80d8c02d75 Mon Sep 17 00:00:00 2001 From: Gabe Revells Date: Mon, 1 Mar 2021 10:42:35 -0500 Subject: [PATCH 11/11] Use bucket given in ini file --- varken/iniparser.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/varken/iniparser.py b/varken/iniparser.py index c00174c..dc5c3f7 100644 --- a/varken/iniparser.py +++ b/varken/iniparser.py @@ -158,6 +158,7 @@ class INIParser(object): verify_ssl = boolcheck(env.get('VRKN_INFLUXDB2_VERIFY_SSL', self.config.get('influx2', 'verify_ssl'))) org = env.get('VRKN_INFLUXDB2_ORG', self.config.get('influx2', 'org')) + bucket = env.get('VRKN_INFLUXDB2_BUCKET', self.config.get('influx2', 'bucket')) token = env.get('VRKN_INFLUXDB2_TOKEN', self.config.get('influx2', 'token')) timeout = env.get('VRKN_INFLUXDB2_TIMEOUT', self.config.get('influx2', 'timeout')) except NoOptionError as e: @@ -166,7 +167,7 @@ class INIParser(object): return self.influx_server = Influx2Server(url=url, token=token, org=org, timeout=timeout, ssl=ssl, - verify_ssl=verify_ssl) + verify_ssl=verify_ssl, bucket=bucket) else: try: url = self.url_check(env.get('VRKN_INFLUXDB_URL', self.config.get('influxdb', 'url')), -- 2.45.2