From 7be718751bfe3118013bfdef233e04319eaebc9b Mon Sep 17 00:00:00 2001 From: "Nicholas St. Germain" Date: Sat, 1 Dec 2018 21:31:58 -0600 Subject: [PATCH] reworked scheduler to pass server to instance to remove duplication --- Varken/helpers.py | 3 +- Varken/radarr.py | 126 ++++++++++++----------- Varken/sonarr.py | 192 +++++++++++++++++------------------ Varken/tautulli.py | 216 ++++++++++++++++++++-------------------- data/varken.example.ini | 1 - varken.py | 23 ++--- 6 files changed, 270 insertions(+), 291 deletions(-) diff --git a/Varken/helpers.py b/Varken/helpers.py index 434f2ee..c3d8d2e 100644 --- a/Varken/helpers.py +++ b/Varken/helpers.py @@ -116,7 +116,7 @@ class TautulliServer(NamedTuple): id: int = None url: str = None fallback_ip: str = None - apikey: str = None + api_key: str = None verify_ssl: bool = None get_activity: bool = False get_activity_run_seconds: int = 30 @@ -319,7 +319,6 @@ class TautulliStream(NamedTuple): transcode_progress: int = None subtitle_language: str = None stream_subtitle_container: str = None - _cache_time: int = None def geoip_download(): diff --git a/Varken/radarr.py b/Varken/radarr.py index 33f7c0b..c31e2e0 100644 --- a/Varken/radarr.py +++ b/Varken/radarr.py @@ -7,11 +7,11 @@ from Varken.helpers import Movie, Queue class RadarrAPI(object): - def __init__(self, servers, influx_server): + def __init__(self, server, influx_server): self.now = datetime.now(timezone.utc).astimezone().isoformat() self.influx = InfluxDBClient(influx_server.url, influx_server.port, influx_server.username, influx_server.password, 'plex2') - self.servers = servers + self.server = server # Create session to reduce server web thread load, and globally define pageSize for all requests self.session = requests.Session() @@ -20,87 +20,85 @@ class RadarrAPI(object): self.influx.write_points(payload) @logging - def get_missing(self, notimplemented): + def get_missing(self): endpoint = '/api/movie' self.now = datetime.now(timezone.utc).astimezone().isoformat() influx_payload = [] - for server in self.servers: - missing = [] - headers = {'X-Api-Key': server.api_key} - get = self.session.get(server.url + endpoint, headers=headers, verify=server.verify_ssl).json() - movies = [Movie(**movie) for movie in get] + missing = [] + headers = {'X-Api-Key': self.server.api_key} + get = self.session.get(self.server.url + endpoint, headers=headers, verify=self.server.verify_ssl).json() + movies = [Movie(**movie) for movie in get] - for movie in movies: - if server.get_missing: - if not movie.downloaded and movie.isAvailable: - ma = True - else: - ma = False - movie_name = '{} ({})'.format(movie.title, movie.year) - missing.append((movie_name, ma, movie.tmdbId)) + for movie in movies: + if self.server.get_missing: + if not movie.downloaded and movie.isAvailable: + ma = True + else: + ma = False + movie_name = '{} ({})'.format(movie.title, movie.year) + missing.append((movie_name, ma, movie.tmdbId)) - for title, ma, mid in missing: - influx_payload.append( - { - "measurement": "Radarr", - "tags": { - "Missing": True, - "Missing_Available": ma, - "tmdbId": mid, - "server": server.id - }, - "time": self.now, - "fields": { - "name": title - } + for title, ma, mid in missing: + influx_payload.append( + { + "measurement": "Radarr", + "tags": { + "Missing": True, + "Missing_Available": ma, + "tmdbId": mid, + "server": self.server.id + }, + "time": self.now, + "fields": { + "name": title } - ) + } + ) self.influx_push(influx_payload) @logging - def get_queue(self, notimplemented): + def get_queue(self): endpoint = '/api/queue' self.now = datetime.now(timezone.utc).astimezone().isoformat() influx_payload = [] - for server in self.servers: - queue = [] - headers = {'X-Api-Key': server.api_key} - get = self.session.get(server.url + endpoint, headers=headers, verify=server.verify_ssl).json() - for movie in get: - movie['movie'] = Movie(**movie['movie']) - download_queue = [Queue(**movie) for movie in get] + queue = [] + headers = {'X-Api-Key': self.server.api_key} + get = self.session.get(self.server.url + endpoint, headers=headers, verify=self.server.verify_ssl).json() + for movie in get: + movie['movie'] = Movie(**movie['movie']) + download_queue = [Queue(**movie) for movie in get] - for queue_item in download_queue: - name = '{} ({})'.format(queue_item.movie.title, queue_item.movie.year) + for queue_item in download_queue: + name = '{} ({})'.format(queue_item.movie.title, queue_item.movie.year) - if queue_item.protocol.upper() == 'USENET': - protocol_id = 1 - else: - protocol_id = 0 + if queue_item.protocol.upper() == 'USENET': + protocol_id = 1 + else: + protocol_id = 0 - queue.append((name, queue_item.quality['quality']['name'], queue_item.protocol.upper(), - protocol_id, queue_item.id)) + queue.append((name, queue_item.quality['quality']['name'], queue_item.protocol.upper(), + protocol_id, queue_item.id)) - for movie, quality, protocol, protocol_id, qid in queue: - influx_payload.append( - { - "measurement": "Radarr", - "tags": { - "type": "Queue", - "tmdbId": qid, - "server": server.id - }, - "time": self.now, - "fields": { - "name": movie, - "quality": quality, - "protocol": protocol, - "protocol_id": protocol_id - } + for movie, quality, protocol, protocol_id, qid in queue: + influx_payload.append( + { + "measurement": "Radarr", + "tags": { + "type": "Queue", + "tmdbId": qid, + "server": self.server.id + }, + "time": self.now, + "fields": { + "name": movie, + "quality": quality, + "protocol": protocol, + "protocol_id": protocol_id } - ) + } + ) self.influx_push(influx_payload) diff --git a/Varken/sonarr.py b/Varken/sonarr.py index 9ee89c2..7fb7a04 100644 --- a/Varken/sonarr.py +++ b/Varken/sonarr.py @@ -7,147 +7,141 @@ from Varken.helpers import TVShow, Queue class SonarrAPI(object): - def __init__(self, servers, influx_server): + def __init__(self, server, influx_server): # Set Time of initialization self.now = datetime.now(timezone.utc).astimezone().isoformat() self.today = str(date.today()) self.influx = InfluxDBClient(influx_server.url, influx_server.port, influx_server.username, influx_server.password, 'plex') - self.servers = servers + self.server = server # Create session to reduce server web thread load, and globally define pageSize for all requests self.session = requests.Session() self.session.params = {'pageSize': 1000} @logging - def get_missing(self, days_past): + def get_missing(self): endpoint = '/api/calendar' - last_days = str(date.today() + timedelta(days=-days_past)) + last_days = str(date.today() + timedelta(days=-self.server.missing_days)) self.now = datetime.now(timezone.utc).astimezone().isoformat() params = {'start': last_days, 'end': self.today} influx_payload = [] + missing = [] + headers = {'X-Api-Key': self.server.api_key} - for server in self.servers: - missing = [] - headers = {'X-Api-Key': server.api_key} + get = self.session.get(self.server.url + endpoint, params=params, headers=headers, + verify=self.server.verify_ssl).json() + # Iteratively create a list of TVShow Objects from response json + tv_shows = [TVShow(**show) for show in get] - get = self.session.get(server.url + endpoint, params=params, headers=headers, - verify=server.verify_ssl).json() - # Iteratively create a list of TVShow Objects from response json - tv_shows = [TVShow(**show) for show in get] + # Add show to missing list if file does not exist + for show in tv_shows: + if not show.hasFile: + sxe = 'S{:0>2}E{:0>2}'.format(show.seasonNumber, show.episodeNumber) + missing.append((show.series['title'], sxe, show.airDate, show.title, show.id)) - # Add show to missing list if file does not exist - for show in tv_shows: - if not show.hasFile: - sxe = 'S{:0>2}E{:0>2}'.format(show.seasonNumber, show.episodeNumber) - missing.append((show.series['title'], sxe, show.airDate, show.title, show.id)) - - for series_title, sxe, air_date, episode_title, sonarr_id in missing: - influx_payload.append( - { - "measurement": "Sonarr", - "tags": { - "type": "Missing", - "sonarrId": sonarr_id, - "server": server.id - }, - "time": self.now, - "fields": { - "name": series_title, - "epname": episode_title, - "sxe": sxe, - "airs": air_date - } + for series_title, sxe, air_date, episode_title, sonarr_id in missing: + influx_payload.append( + { + "measurement": "Sonarr", + "tags": { + "type": "Missing", + "sonarrId": sonarr_id, + "server": self.server.id + }, + "time": self.now, + "fields": { + "name": series_title, + "epname": episode_title, + "sxe": sxe, + "airs": air_date } - ) + } + ) self.influx_push(influx_payload) + @logging - def get_future(self, future_days): + def get_future(self): endpoint = '/api/calendar/' self.now = datetime.now(timezone.utc).astimezone().isoformat() - future = str(date.today() + timedelta(days=future_days)) + future = str(date.today() + timedelta(days=self.server.future_days)) influx_payload = [] + air_days = [] + headers = {'X-Api-Key': self.server.api_key} + params = {'start': self.today, 'end': future} - for server in self.servers: - air_days = [] + get = self.session.get(self.server.url + endpoint, params=params, headers=headers, + verify=self.server.verify_ssl).json() + tv_shows = [TVShow(**show) for show in get] - headers = {'X-Api-Key': server.api_key} - params = {'start': self.today, 'end': future} + for show in tv_shows: + sxe = 'S{:0>2}E{:0>2}'.format(show.seasonNumber, show.episodeNumber) + air_days.append((show.series['title'], show.hasFile, sxe, show.title, show.airDate, show.id)) - get = self.session.get(server.url + endpoint, params=params, headers=headers, - verify=server.verify_ssl).json() - tv_shows = [TVShow(**show) for show in get] - - for show in tv_shows: - sxe = 'S{:0>2}E{:0>2}'.format(show.seasonNumber, show.episodeNumber) - air_days.append((show.series['title'], show.hasFile, sxe, show.title, show.airDate, show.id)) - - for series_title, dl_status, sxe, episode_title, air_date, sonarr_id in air_days: - influx_payload.append( - { - "measurement": "Sonarr", - "tags": { - "type": "Future", - "sonarrId": sonarr_id, - "server": server.id - }, - "time": self.now, - "fields": { - "name": series_title, - "epname": episode_title, - "sxe": sxe, - "airs": air_date, - "downloaded": dl_status - } + for series_title, dl_status, sxe, episode_title, air_date, sonarr_id in air_days: + influx_payload.append( + { + "measurement": "Sonarr", + "tags": { + "type": "Future", + "sonarrId": sonarr_id, + "server": self.server.id + }, + "time": self.now, + "fields": { + "name": series_title, + "epname": episode_title, + "sxe": sxe, + "airs": air_date, + "downloaded": dl_status } - ) + } + ) self.influx_push(influx_payload) @logging - def get_queue(self, notimplemented): + def get_queue(self): influx_payload = [] endpoint = '/api/queue' self.now = datetime.now(timezone.utc).astimezone().isoformat() + queue = [] + headers = {'X-Api-Key': self.server.api_key} - for server in self.servers: - queue = [] - headers = {'X-Api-Key': server.api_key} + get = self.session.get(self.server.url + endpoint, headers=headers, verify=self.server.verify_ssl).json() + download_queue = [Queue(**show) for show in get] - get = self.session.get(server.url + endpoint, headers=headers, verify=server.verify_ssl).json() - download_queue = [Queue(**show) for show in get] + for show in download_queue: + sxe = 'S{:0>2}E{:0>2}'.format(show.episode['seasonNumber'], show.episode['episodeNumber']) + if show.protocol.upper() == 'USENET': + protocol_id = 1 + else: + protocol_id = 0 - for show in download_queue: - sxe = 'S{:0>2}E{:0>2}'.format(show.episode['seasonNumber'], show.episode['episodeNumber']) - if show.protocol.upper() == 'USENET': - protocol_id = 1 - else: - protocol_id = 0 + queue.append((show.series['title'], show.episode['title'], show.protocol.upper(), + protocol_id, sxe, show.id)) - queue.append((show.series['title'], show.episode['title'], show.protocol.upper(), - protocol_id, sxe, show.id)) + for series_title, episode_title, protocol, protocol_id, sxe, sonarr_id in queue: + influx_payload.append( + { + "measurement": "Sonarr", + "tags": { + "type": "Queue", + "sonarrId": sonarr_id, + "server": self.server.id - for series_title, episode_title, protocol, protocol_id, sxe, sonarr_id in queue: - influx_payload.append( - { - "measurement": "Sonarr", - "tags": { - "type": "Queue", - "sonarrId": sonarr_id, - "server": server.id - - }, - "time": self.now, - "fields": { - "name": series_title, - "epname": episode_title, - "sxe": sxe, - "protocol": protocol, - "protocol_id": protocol_id - } + }, + "time": self.now, + "fields": { + "name": series_title, + "epname": episode_title, + "sxe": sxe, + "protocol": protocol, + "protocol_id": protocol_id } - ) + } + ) self.influx_push(influx_payload) diff --git a/Varken/tautulli.py b/Varken/tautulli.py index da01739..ee23e08 100644 --- a/Varken/tautulli.py +++ b/Varken/tautulli.py @@ -7,12 +7,12 @@ from Varken.logger import logging class TautulliAPI(object): - def __init__(self, servers, influx_server): + def __init__(self, server, influx_server): # Set Time of initialization self.now = datetime.now(timezone.utc).astimezone().isoformat() self.influx = InfluxDBClient(influx_server.url, influx_server.port, influx_server.username, influx_server.password, 'plex2') - self.servers = servers + self.server = server self.session = requests.Session() self.endpoint = '/api/v2' @@ -21,128 +21,124 @@ class TautulliAPI(object): self.influx.write_points(payload) @logging - def get_activity(self, notimplemented): + def get_activity(self): self.now = datetime.now(timezone.utc).astimezone().isoformat() params = {'cmd': 'get_activity'} influx_payload = [] + params['apikey'] = self.server.api_key + g = self.session.get(self.server.url + self.endpoint, params=params, verify=self.server.verify_ssl) + get = g.json()['response']['data'] - for server in self.servers: - params['apikey'] = server.apikey - g = self.session.get(server.url + self.endpoint, params=params, verify=server.verify_ssl) - get = g.json()['response']['data'] + influx_payload.append( + { + "measurement": "Tautulli", + "tags": { + "type": "current_stream_stats", + "server": self.server.id + }, + "time": self.now, + "fields": { + "stream_count": int(get['stream_count']), + "total_bandwidth": int(get['total_bandwidth']), + "wan_bandwidth": int(get['wan_bandwidth']), + "lan_bandwidth": int(get['lan_bandwidth']), + "transcode_streams": int(get['stream_count_transcode']), + "direct_play_streams": int(get['stream_count_direct_play']), + "direct_streams": int(get['stream_count_direct_stream']) + } + } + ) + + self.influx_push(influx_payload) + + @logging + def get_sessions(self): + self.now = datetime.now(timezone.utc).astimezone().isoformat() + params = {'cmd': 'get_activity'} + influx_payload = [] + params['apikey'] = self.server.api_key + g = self.session.get(self.server.url + self.endpoint, params=params, verify=self.server.verify_ssl) + get = g.json()['response']['data']['sessions'] + sessions = [TautulliStream(**session) for session in get] + + for session in sessions: + try: + geodata = geo_lookup(session.ip_address_public) + except (ValueError, AddressNotFoundError): + if self.server.fallback_ip: + geodata = geo_lookup(self.server.fallback_ip) + else: + my_ip = requests.get('http://ip.42.pl/raw').text + geodata = geo_lookup(my_ip) + + if not all([geodata.location.latitude, geodata.location.longitude]): + latitude = 37.234332396 + longitude = -115.80666344 + else: + latitude = geodata.location.latitude + longitude = geodata.location.longitude + + decision = session.transcode_decision + if decision == 'copy': + decision = 'direct stream' + + video_decision = session.stream_video_decision + if video_decision == 'copy': + video_decision = 'direct stream' + elif video_decision == '': + video_decision = 'Music' + + quality = session.stream_video_resolution + if not quality: + quality = session.container.upper() + elif quality in ('SD', 'sd', '4k'): + quality = session.stream_video_resolution.upper() + else: + quality = session.stream_video_resolution + 'p' + + player_state = session.state.lower() + if player_state == 'playing': + player_state = 0 + elif player_state == 'paused': + player_state = 1 + elif player_state == 'buffering': + player_state = 3 influx_payload.append( { "measurement": "Tautulli", "tags": { - "type": "current_stream_stats", - "server": server.id + "type": "Session", + "session_id": session.session_id, + "name": session.friendly_name, + "title": session.full_title, + "platform": session.platform, + "product_version": session.product_version, + "quality": quality, + "video_decision": video_decision.title(), + "transcode_decision": decision.title(), + "media_type": session.media_type.title(), + "audio_codec": session.audio_codec.upper(), + "audio_profile": session.audio_profile.upper(), + "stream_audio_codec": session.stream_audio_codec.upper(), + "quality_profile": session.quality_profile, + "progress_percent": session.progress_percent, + "region_code": geodata.subdivisions.most_specific.iso_code, + "location": geodata.city.name, + "full_location": '{} - {}'.format(geodata.subdivisions.most_specific.name, + geodata.city.name), + "latitude": latitude, + "longitude": longitude, + "player_state": player_state, + "device_type": session.platform, + "server": self.server.id }, "time": self.now, "fields": { - "stream_count": int(get['stream_count']), - "total_bandwidth": int(get['total_bandwidth']), - "wan_bandwidth": int(get['wan_bandwidth']), - "lan_bandwidth": int(get['lan_bandwidth']), - "transcode_streams": int(get['stream_count_transcode']), - "direct_play_streams": int(get['stream_count_direct_play']), - "direct_streams": int(get['stream_count_direct_stream']) + "session_id": session.session_id, + "session_key": session.session_key } } ) self.influx_push(influx_payload) - - @logging - def get_sessions(self, notimplemented): - self.now = datetime.now(timezone.utc).astimezone().isoformat() - params = {'cmd': 'get_activity'} - influx_payload = [] - - for server in self.servers: - params['apikey'] = server.apikey - g = self.session.get(server.url + self.endpoint, params=params, verify=server.verify_ssl) - get = g.json()['response']['data']['sessions'] - sessions = [TautulliStream(**session) for session in get] - - for session in sessions: - try: - geodata = geo_lookup(session.ip_address_public) - except (ValueError, AddressNotFoundError): - if server.fallback_ip: - geodata = geo_lookup(server.fallback_ip) - else: - my_ip = requests.get('http://ip.42.pl/raw').text - geodata = geo_lookup(my_ip) - - if not all([geodata.location.latitude, geodata.location.longitude]): - latitude = 37.234332396 - longitude = -115.80666344 - else: - latitude = geodata.location.latitude - longitude = geodata.location.longitude - - decision = session.transcode_decision - if decision == 'copy': - decision = 'direct stream' - - video_decision = session.stream_video_decision - if video_decision == 'copy': - video_decision = 'direct stream' - elif video_decision == '': - video_decision = 'Music' - - quality = session.stream_video_resolution - if not quality: - quality = session.container.upper() - elif quality in ('SD', 'sd', '4k'): - quality = session.stream_video_resolution.upper() - else: - quality = session.stream_video_resolution + 'p' - - player_state = session.state.lower() - if player_state == 'playing': - player_state = 0 - elif player_state == 'paused': - player_state = 1 - elif player_state == 'buffering': - player_state = 3 - - influx_payload.append( - { - "measurement": "Tautulli", - "tags": { - "type": "Session", - "session_id": session.session_id, - "name": session.friendly_name, - "title": session.full_title, - "platform": session.platform, - "product_version": session.product_version, - "quality": quality, - "video_decision": video_decision.title(), - "transcode_decision": decision.title(), - "media_type": session.media_type.title(), - "audio_codec": session.audio_codec.upper(), - "audio_profile": session.audio_profile.upper(), - "stream_audio_codec": session.stream_audio_codec.upper(), - "quality_profile": session.quality_profile, - "progress_percent": session.progress_percent, - "region_code": geodata.subdivisions.most_specific.iso_code, - "location": geodata.city.name, - "full_location": '{} - {}'.format(geodata.subdivisions.most_specific.name, - geodata.city.name), - "latitude": latitude, - "longitude": longitude, - "player_state": player_state, - "device_type": session.platform, - "server": server.id - }, - "time": self.now, - "fields": { - "session_id": session.session_id, - "session_key": session.session_key - } - } - ) - - self.influx_push(influx_payload) diff --git a/data/varken.example.ini b/data/varken.example.ini index 0ebd511..4d28054 100644 --- a/data/varken.example.ini +++ b/data/varken.example.ini @@ -61,7 +61,6 @@ verify_ssl = true queue = true queue_run_seconds = 300 get_missing = true -get_missing_available = true get_missing_run_seconds = 300 [radarr-2] diff --git a/varken.py b/varken.py index 482e788..921b8fb 100644 --- a/varken.py +++ b/varken.py @@ -8,8 +8,8 @@ from Varken.tautulli import TautulliAPI from Varken.radarr import RadarrAPI -def threaded(job, days=None): - thread = threading.Thread(target=job, args=([days])) +def threaded(job): + thread = threading.Thread(target=job) thread.start() @@ -17,38 +17,31 @@ if __name__ == "__main__": CONFIG = INIParser() if CONFIG.sonarr_enabled: - SONARR = SonarrAPI(CONFIG.sonarr_servers, CONFIG.influx_server) - for server in CONFIG.sonarr_servers: + SONARR = SonarrAPI(server, CONFIG.influx_server) if server.queue: schedule.every(server.queue_run_seconds).seconds.do(threaded, SONARR.get_queue) if server.missing_days > 0: - schedule.every(server.missing_days_run_seconds).seconds.do(threaded, SONARR.get_missing, - server.missing_days) + schedule.every(server.missing_days_run_seconds).seconds.do(threaded, SONARR.get_missing) if server.future_days > 0: - schedule.every(server.future_days_run_seconds).seconds.do(threaded, SONARR.get_future, - server.future_days) + schedule.every(server.future_days_run_seconds).seconds.do(threaded, SONARR.get_future) if CONFIG.tautulli_enabled: - TAUTULLI = TautulliAPI(CONFIG.tautulli_servers, CONFIG.influx_server) - for server in CONFIG.tautulli_servers: + TAUTULLI = TautulliAPI(server, CONFIG.influx_server) if server.get_activity: schedule.every(server.get_activity_run_seconds).seconds.do(threaded, TAUTULLI.get_activity) if server.get_sessions: schedule.every(server.get_sessions_run_seconds).seconds.do(threaded, TAUTULLI.get_sessions) if CONFIG.radarr_enabled: - RADARR = RadarrAPI(CONFIG.radarr_servers, CONFIG.influx_server) - for server in CONFIG.radarr_servers: - if any([server.get_missing, server.get_missing_available]): + RADARR = RadarrAPI(server, CONFIG.influx_server) + if server.get_missing: schedule.every(server.get_missing_run_seconds).seconds.do(threaded, RADARR.get_missing) if server.queue: schedule.every(server.queue_run_seconds).seconds.do(threaded, RADARR.get_queue) - - while True: schedule.run_pending() sleep(1)