Influxdb2 #2
7 changed files with 121 additions and 18 deletions
|
@ -17,7 +17,7 @@ ecosystem into InfluxDB using Grafana for a frontend
|
||||||
Requirements:
|
Requirements:
|
||||||
* [Python 3.6.7+](https://www.python.org/downloads/release/python-367/)
|
* [Python 3.6.7+](https://www.python.org/downloads/release/python-367/)
|
||||||
* [Python3-pip](https://pip.pypa.io/en/stable/installing/)
|
* [Python3-pip](https://pip.pypa.io/en/stable/installing/)
|
||||||
* [InfluxDB 1.8.x](https://www.influxdata.com/)
|
* [InfluxDB 1.8.x or 2.0.x](https://www.influxdata.com/)
|
||||||
* [Grafana](https://grafana.com/)
|
* [Grafana](https://grafana.com/)
|
||||||
|
|
||||||
<p align="center">
|
<p align="center">
|
||||||
|
|
11
Varken.py
11
Varken.py
|
@ -21,6 +21,7 @@ from varken.radarr import RadarrAPI
|
||||||
from varken.lidarr import LidarrAPI
|
from varken.lidarr import LidarrAPI
|
||||||
from varken.iniparser import INIParser
|
from varken.iniparser import INIParser
|
||||||
from varken.dbmanager import DBManager
|
from varken.dbmanager import DBManager
|
||||||
|
from varken.influxdb2manager import InfluxDB2Manager
|
||||||
from varken.helpers import GeoIPHandler
|
from varken.helpers import GeoIPHandler
|
||||||
from varken.tautulli import TautulliAPI
|
from varken.tautulli import TautulliAPI
|
||||||
from varken.sickchill import SickChillAPI
|
from varken.sickchill import SickChillAPI
|
||||||
|
@ -90,7 +91,15 @@ if __name__ == "__main__":
|
||||||
vl.logger.info("Varken v%s-%s %s", VERSION, BRANCH, BUILD_DATE)
|
vl.logger.info("Varken v%s-%s %s", VERSION, BRANCH, BUILD_DATE)
|
||||||
|
|
||||||
CONFIG = INIParser(DATA_FOLDER)
|
CONFIG = INIParser(DATA_FOLDER)
|
||||||
DBMANAGER = DBManager(CONFIG.influx_server)
|
|
||||||
|
if CONFIG.influx2_enabled:
|
||||||
|
# Use INFLUX version 2
|
||||||
|
vl.logger.info('Using INFLUXDBv2')
|
||||||
|
DBMANAGER = InfluxDB2Manager(CONFIG.influx_server)
|
||||||
|
else:
|
||||||
|
vl.logger.info('Using INFLUXDB')
|
||||||
|
DBMANAGER = DBManager(CONFIG.influx_server)
|
||||||
|
|
||||||
QUEUE = Queue()
|
QUEUE = Queue()
|
||||||
|
|
||||||
if CONFIG.sonarr_enabled:
|
if CONFIG.sonarr_enabled:
|
||||||
|
|
|
@ -7,6 +7,7 @@ ombi_server_ids = 1
|
||||||
sickchill_server_ids = false
|
sickchill_server_ids = false
|
||||||
unifi_server_ids = false
|
unifi_server_ids = false
|
||||||
maxmind_license_key = xxxxxxxxxxxxxxxx
|
maxmind_license_key = xxxxxxxxxxxxxxxx
|
||||||
|
influx2_enabled = false
|
||||||
|
|
||||||
[influxdb]
|
[influxdb]
|
||||||
url = influxdb.domain.tld
|
url = influxdb.domain.tld
|
||||||
|
@ -16,6 +17,15 @@ verify_ssl = false
|
||||||
username = root
|
username = root
|
||||||
password = root
|
password = root
|
||||||
|
|
||||||
|
[influx2]
|
||||||
|
url = influxdb2.domain.tld
|
||||||
|
org = ORG
|
||||||
|
token = TOKEN
|
||||||
|
timeout = 10000
|
||||||
|
ssl = false
|
||||||
|
verify_ssl = false
|
||||||
|
bucket = varken
|
||||||
|
|
||||||
[tautulli-1]
|
[tautulli-1]
|
||||||
url = tautulli.domain.tld:8181
|
url = tautulli.domain.tld:8181
|
||||||
fallback_ip = 1.1.1.1
|
fallback_ip = 1.1.1.1
|
||||||
|
|
|
@ -7,4 +7,5 @@ geoip2==2.9.0
|
||||||
influxdb==5.2.0
|
influxdb==5.2.0
|
||||||
schedule==0.6.0
|
schedule==0.6.0
|
||||||
distro==1.4.0
|
distro==1.4.0
|
||||||
urllib3==1.24.2
|
urllib3==1.24.2
|
||||||
|
influxdb-client==1.14.0
|
49
varken/influxdb2manager.py
Normal file
49
varken/influxdb2manager.py
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
from sys import exit
|
||||||
|
from logging import getLogger
|
||||||
|
import influxdb_client
|
||||||
|
from influxdb_client import InfluxDBClient
|
||||||
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
|
||||||
|
|
||||||
|
class InfluxDB2Manager(object):
|
||||||
|
def __init__(self, server):
|
||||||
|
self.server = server
|
||||||
|
self.logger = getLogger()
|
||||||
|
if self.server.url == "influxdb2.domain.tld":
|
||||||
|
self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration")
|
||||||
|
exit()
|
||||||
|
|
||||||
|
self.influx = InfluxDBClient(url=self.server.url, token=self.server.token, org=self.server.org,
|
||||||
|
timeout=self.server.timeout, verify_ssl=self.server.verify_ssl,
|
||||||
|
ssl_ca_cert=self.server.ssl)
|
||||||
|
self.influx_write_api = self.influx.write_api(write_options=SYNCHRONOUS)
|
||||||
|
|
||||||
|
# Create the bucket if needed
|
||||||
|
|
||||||
|
bucket_api = self.influx.buckets_api()
|
||||||
|
|
||||||
|
try:
|
||||||
|
bucket = bucket_api.find_bucket_by_name(self.server.bucket)
|
||||||
|
|
||||||
|
if bucket is None:
|
||||||
|
self.logger.info('Creating bucket %s', self.server.bucket)
|
||||||
|
|
||||||
|
org_api = influxdb_client.service.organizations_service.OrganizationsService(self.influx.api_client)
|
||||||
|
orgs = org_api.get_orgs()
|
||||||
|
for org in orgs.orgs:
|
||||||
|
if org.name == self.server.org:
|
||||||
|
my_org = org
|
||||||
|
|
||||||
|
self.influx.buckets_api().create_bucket(bucket_name=self.server.bucket, org_id=my_org.id)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error('Failed creating new InfluxDB bucket! Error: %s', e)
|
||||||
|
|
||||||
|
def write_points(self, data):
|
||||||
|
d = data
|
||||||
|
self.logger.info('Writing Data to InfluxDBv2 %s', d)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.influx_write_api.write(bucket=self.server.bucket, record=d)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error('Error writing data to influxdb2. Dropping this set of data. '
|
||||||
|
'Check your database! Error: %s', e)
|
|
@ -9,7 +9,7 @@ from configparser import ConfigParser, NoOptionError, NoSectionError
|
||||||
from varken.varkenlogger import BlacklistFilter
|
from varken.varkenlogger import BlacklistFilter
|
||||||
from varken.structures import SickChillServer, UniFiServer
|
from varken.structures import SickChillServer, UniFiServer
|
||||||
from varken.helpers import clean_sid_check, rfc1918_ip_check, boolcheck
|
from varken.helpers import clean_sid_check, rfc1918_ip_check, boolcheck
|
||||||
from varken.structures import SonarrServer, RadarrServer, OmbiServer, TautulliServer, InfluxServer
|
from varken.structures import SonarrServer, RadarrServer, OmbiServer, TautulliServer, InfluxServer, Influx2Server
|
||||||
|
|
||||||
|
|
||||||
class INIParser(object):
|
class INIParser(object):
|
||||||
|
@ -144,23 +144,47 @@ class INIParser(object):
|
||||||
if read_file:
|
if read_file:
|
||||||
self.config = self.read_file('varken.ini')
|
self.config = self.read_file('varken.ini')
|
||||||
self.config_blacklist()
|
self.config_blacklist()
|
||||||
|
|
||||||
# Parse InfluxDB options
|
# Parse InfluxDB options
|
||||||
try:
|
self.influx2_enabled = env.get('VRKN_GLOBAL_INFLUXDB2_ENABLED',
|
||||||
url = self.url_check(env.get('VRKN_INFLUXDB_URL', self.config.get('influxdb', 'url')),
|
self.config.getboolean('global', 'influx2_enabled'))
|
||||||
include_port=False, section='influxdb')
|
|
||||||
port = int(env.get('VRKN_INFLUXDB_PORT', self.config.getint('influxdb', 'port')))
|
|
||||||
ssl = boolcheck(env.get('VRKN_INFLUXDB_SSL', self.config.get('influxdb', 'ssl')))
|
|
||||||
verify_ssl = boolcheck(env.get('VRKN_INFLUXDB_VERIFY_SSL', self.config.get('influxdb', 'verify_ssl')))
|
|
||||||
|
|
||||||
username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username'))
|
if self.influx2_enabled:
|
||||||
password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password'))
|
# Use INFLUX version 2
|
||||||
except NoOptionError as e:
|
try:
|
||||||
self.logger.error('Missing key in %s. Error: %s', "influxdb", e)
|
url = self.url_check(env.get('VRKN_INFLUXDB2_URL', self.config.get('influx2', 'url')),
|
||||||
self.rectify_ini()
|
section='influx2')
|
||||||
return
|
ssl = boolcheck(env.get('VRKN_INFLUXDB2_SSL', self.config.get('influx2', 'ssl')))
|
||||||
|
verify_ssl = boolcheck(env.get('VRKN_INFLUXDB2_VERIFY_SSL', self.config.get('influx2', 'verify_ssl')))
|
||||||
|
|
||||||
self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl,
|
org = env.get('VRKN_INFLUXDB2_ORG', self.config.get('influx2', 'org'))
|
||||||
verify_ssl=verify_ssl)
|
bucket = env.get('VRKN_INFLUXDB2_BUCKET', self.config.get('influx2', 'bucket'))
|
||||||
|
token = env.get('VRKN_INFLUXDB2_TOKEN', self.config.get('influx2', 'token'))
|
||||||
|
timeout = env.get('VRKN_INFLUXDB2_TIMEOUT', self.config.get('influx2', 'timeout'))
|
||||||
|
except NoOptionError as e:
|
||||||
|
self.logger.error('Missing key in %s. Error: %s', "influx2", e)
|
||||||
|
self.rectify_ini()
|
||||||
|
return
|
||||||
|
|
||||||
|
self.influx_server = Influx2Server(url=url, token=token, org=org, timeout=timeout, ssl=ssl,
|
||||||
|
verify_ssl=verify_ssl, bucket=bucket)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
url = self.url_check(env.get('VRKN_INFLUXDB_URL', self.config.get('influxdb', 'url')),
|
||||||
|
include_port=False, section='influxdb')
|
||||||
|
port = int(env.get('VRKN_INFLUXDB_PORT', self.config.getint('influxdb', 'port')))
|
||||||
|
ssl = boolcheck(env.get('VRKN_INFLUXDB_SSL', self.config.get('influxdb', 'ssl')))
|
||||||
|
verify_ssl = boolcheck(env.get('VRKN_INFLUXDB_VERIFY_SSL', self.config.get('influxdb', 'verify_ssl')))
|
||||||
|
|
||||||
|
username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username'))
|
||||||
|
password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password'))
|
||||||
|
except NoOptionError as e:
|
||||||
|
self.logger.error('Missing key in %s. Error: %s', "influxdb", e)
|
||||||
|
self.rectify_ini()
|
||||||
|
return
|
||||||
|
|
||||||
|
self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl,
|
||||||
|
verify_ssl=verify_ssl)
|
||||||
|
|
||||||
# Check for all enabled services
|
# Check for all enabled services
|
||||||
for service in self.services:
|
for service in self.services:
|
||||||
|
|
|
@ -20,6 +20,16 @@ class InfluxServer(NamedTuple):
|
||||||
verify_ssl: bool = False
|
verify_ssl: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class Influx2Server(NamedTuple):
|
||||||
|
url: str = 'localhost'
|
||||||
|
org: str = 'server'
|
||||||
|
token: str = 'TOKEN'
|
||||||
|
bucket: str = 'varken'
|
||||||
|
timeout: int = 10000
|
||||||
|
ssl: bool = False
|
||||||
|
verify_ssl: bool = False
|
||||||
|
|
||||||
|
|
||||||
class SonarrServer(NamedTuple):
|
class SonarrServer(NamedTuple):
|
||||||
api_key: str = None
|
api_key: str = None
|
||||||
future_days: int = 0
|
future_days: int = 0
|
||||||
|
|
Loading…
Reference in a new issue