From 1aaf1fc6bf2211958cc8b21a4f7f66a1535b98f8 Mon Sep 17 00:00:00 2001 From: "Nicholas St. Germain" Date: Wed, 9 Jan 2019 02:00:50 -0600 Subject: [PATCH] initial in-thread kill --- Varken.py | 58 +++++++++++++++++++++++++++++++++--------------- varken/sonarr.py | 1 + varken/unifi.py | 5 +++-- 3 files changed, 44 insertions(+), 20 deletions(-) diff --git a/Varken.py b/Varken.py index f0d41cc..5085c65 100644 --- a/Varken.py +++ b/Varken.py @@ -1,7 +1,7 @@ import platform import schedule - from time import sleep +from queue import Queue from sys import version from threading import Thread from os import access, R_OK, getenv @@ -26,9 +26,14 @@ from varken.varkenlogger import VarkenLogger PLATFORM_LINUX_DISTRO = ' '.join(x for x in linux_distribution() if x) -def threaded(job): - thread = Thread(target=job) - thread.start() +def thread(): + while schedule.jobs: + job = QUEUE.get() + a = job() + print(a) + if a is not None: + schedule.clear(a) + QUEUE.task_done() if __name__ == "__main__": @@ -85,55 +90,68 @@ if __name__ == "__main__": CONFIG = INIParser(DATA_FOLDER) DBMANAGER = DBManager(CONFIG.influx_server) + QUEUE = Queue() if CONFIG.sonarr_enabled: for server in CONFIG.sonarr_servers: SONARR = SonarrAPI(server, DBMANAGER) if server.queue: - schedule.every(server.queue_run_seconds).seconds.do(threaded, SONARR.get_queue) + at_time = schedule.every(server.queue_run_seconds).seconds + at_time.do(QUEUE.put, SONARR.get_queue).tag(f"sonarr-{server.id}-get_queue") if server.missing_days > 0: - schedule.every(server.missing_days_run_seconds).seconds.do(threaded, SONARR.get_missing) + at_time = schedule.every(server.missing_days_run_seconds).seconds + at_time.do(QUEUE.put, SONARR.get_missing).tag(f"sonarr-{server.id}-get_missing") if server.future_days > 0: - schedule.every(server.future_days_run_seconds).seconds.do(threaded, SONARR.get_future) + at_time = schedule.every(server.future_days_run_seconds).seconds + at_time.do(QUEUE.put, SONARR.get_future).tag(f"sonarr-{server.id}-get_future") if CONFIG.tautulli_enabled: GEOIPHANDLER = GeoIPHandler(DATA_FOLDER) - schedule.every(12).to(24).hours.do(threaded, GEOIPHANDLER.update) + schedule.every(12).to(24).hours.do(QUEUE.put, GEOIPHANDLER.update) for server in CONFIG.tautulli_servers: TAUTULLI = TautulliAPI(server, DBMANAGER, GEOIPHANDLER) if server.get_activity: - schedule.every(server.get_activity_run_seconds).seconds.do(threaded, TAUTULLI.get_activity) + at_time = schedule.every(server.get_activity_run_seconds).seconds + at_time.do(QUEUE.put, TAUTULLI.get_activity).tag(f"tautulli-{server.id}-get_activity") if server.get_stats: - schedule.every(server.get_stats_run_seconds).seconds.do(threaded, TAUTULLI.get_stats) + at_time = schedule.every(server.get_stats_run_seconds).seconds + at_time.do(QUEUE.put, TAUTULLI.get_stats).tag(f"tautulli-{server.id}-get_stats") if CONFIG.radarr_enabled: for server in CONFIG.radarr_servers: RADARR = RadarrAPI(server, DBMANAGER) if server.get_missing: - schedule.every(server.get_missing_run_seconds).seconds.do(threaded, RADARR.get_missing) + at_time = schedule.every(server.get_missing_run_seconds).seconds + at_time.do(QUEUE.put, RADARR.get_missing).tag(f"radarr-{server.id}-get_missing") if server.queue: - schedule.every(server.queue_run_seconds).seconds.do(threaded, RADARR.get_queue) + at_time = schedule.every(server.queue_run_seconds).seconds + at_time.do(QUEUE.put, RADARR.get_queue).tag(f"radarr-{server.id}-get_queue") if CONFIG.ombi_enabled: for server in CONFIG.ombi_servers: OMBI = OmbiAPI(server, DBMANAGER) if server.request_type_counts: - schedule.every(server.request_type_run_seconds).seconds.do(threaded, OMBI.get_request_counts) + at_time = schedule.every(server.request_type_run_seconds).seconds + at_time.do(QUEUE.put, OMBI.get_request_counts).tag(f"ombi-{server.id}-get_request_counts") if server.request_total_counts: - schedule.every(server.request_total_run_seconds).seconds.do(threaded, OMBI.get_all_requests) + at_time = schedule.every(server.request_total_run_seconds).seconds + at_time.do(QUEUE.put, OMBI.get_all_requests).tag(f"ombi-{server.id}-get_all_requests") if server.issue_status_counts: - schedule.every(server.issue_status_run_seconds).seconds.do(threaded, OMBI.get_issue_counts) + at_time = schedule.every(server.issue_status_run_seconds).seconds + at_time.do(QUEUE.put, OMBI.get_issue_counts).tag(f"ombi-{server.id}-get_issue_counts") if CONFIG.sickchill_enabled: for server in CONFIG.sickchill_servers: SICKCHILL = SickChillAPI(server, DBMANAGER) if server.get_missing: - schedule.every(server.get_missing_run_seconds).seconds.do(threaded, SICKCHILL.get_missing) + at_time = schedule.every(server.get_missing_run_seconds).seconds + at_time.do(QUEUE.put, SICKCHILL.get_missing).tag(f"sickchill-{server.id}-get_missing") if CONFIG.unifi_enabled: for server in CONFIG.unifi_servers: UNIFI = UniFiAPI(server, DBMANAGER) - schedule.every(server.get_usg_stats_run_seconds).seconds.do(threaded, UNIFI.get_usg_stats) + at_time = schedule.every(server.get_usg_stats_run_seconds).seconds + at_time.do(QUEUE.put, UNIFI.get_usg_stats).tag(f"unifi-{server.id}-get_usg_stats") # Run all on startup SERVICES_ENABLED = [CONFIG.ombi_enabled, CONFIG.radarr_enabled, CONFIG.tautulli_enabled, CONFIG.unifi_enabled, @@ -141,8 +159,12 @@ if __name__ == "__main__": if not [enabled for enabled in SERVICES_ENABLED if enabled]: vl.logger.error("All services disabled. Exiting") exit(1) + + WORKER = Thread(target=thread) + WORKER.start() + schedule.run_all() - while True: + while schedule.jobs: schedule.run_pending() sleep(1) diff --git a/varken/sonarr.py b/varken/sonarr.py index b90dad9..86bdfe8 100644 --- a/varken/sonarr.py +++ b/varken/sonarr.py @@ -1,4 +1,5 @@ from logging import getLogger +from schedule import CancelJob from requests import Session, Request from datetime import datetime, timezone, date, timedelta diff --git a/varken/unifi.py b/varken/unifi.py index 0701b59..11b33f2 100644 --- a/varken/unifi.py +++ b/varken/unifi.py @@ -1,5 +1,5 @@ -from time import time from logging import getLogger +from schedule import CancelJob from requests import Session, Request from datetime import datetime, timezone @@ -38,7 +38,8 @@ class UniFiAPI(object): get = connection_handler(self.session, req, self.server.verify_ssl) if not get: - return + self.logger.error("Canceling Job get_usg_stats for unifi-%s", self.server.id) + return f"unifi-{self.server.id}-get_usg_stats" devices = {device['name']: device for device in get['data']} if devices.get(self.server.usg_name):