From f943968087bb01ee2a95948d6facce3a4133e366 Mon Sep 17 00:00:00 2001 From: "Nicholas St. Germain" Date: Wed, 24 Apr 2019 18:17:11 -0500 Subject: [PATCH] Ultra-threaded concurrency. For SCIENCE! --- Varken.py | 53 +++++++++++++++++++++++------------------------------ 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/Varken.py b/Varken.py index 47c5e83..6c58195 100644 --- a/Varken.py +++ b/Varken.py @@ -29,17 +29,13 @@ from varken.varkenlogger import VarkenLogger PLATFORM_LINUX_DISTRO = ' '.join(x for x in linux_distribution() if x) -def thread(): - while schedule.jobs: - job = QUEUE.get() - if isinstance(job, tuple): - job, query = job[0], job[1] - a = job(query=query) - else: - a = job() - if a is not None: - schedule.clear(a) - QUEUE.task_done() +def thread(job): + worker = Thread(target=job) + if isinstance(job, tuple): + job, query = job[0], job[1] + worker = Thread(target=job, kwargs={'query': query}) + + worker.start() if __name__ == "__main__": @@ -109,49 +105,49 @@ if __name__ == "__main__": SONARR = SonarrAPI(server, DBMANAGER) if server.queue: at_time = schedule.every(server.queue_run_seconds).seconds - at_time.do(QUEUE.put, SONARR.get_queue).tag("sonarr-{}-get_queue".format(server.id)) + at_time.do(thread, SONARR.get_queue).tag("sonarr-{}-get_queue".format(server.id)) if server.missing_days > 0: at_time = schedule.every(server.missing_days_run_seconds).seconds - at_time.do(QUEUE.put, SONARR.get_missing).tag("sonarr-{}-get_missing".format(server.id)) + at_time.do(thread, SONARR.get_missing).tag("sonarr-{}-get_missing".format(server.id)) if server.future_days > 0: at_time = schedule.every(server.future_days_run_seconds).seconds - at_time.do(QUEUE.put, SONARR.get_future).tag("sonarr-{}-get_future".format(server.id)) + at_time.do(thread, SONARR.get_future).tag("sonarr-{}-get_future".format(server.id)) if CONFIG.tautulli_enabled: GEOIPHANDLER = GeoIPHandler(DATA_FOLDER) - schedule.every(12).to(24).hours.do(QUEUE.put, GEOIPHANDLER.update) + schedule.every(12).to(24).hours.do(thread, GEOIPHANDLER.update) for server in CONFIG.tautulli_servers: TAUTULLI = TautulliAPI(server, DBMANAGER, GEOIPHANDLER) if server.get_activity: at_time = schedule.every(server.get_activity_run_seconds).seconds - at_time.do(QUEUE.put, TAUTULLI.get_activity).tag("tautulli-{}-get_activity".format(server.id)) + at_time.do(thread, TAUTULLI.get_activity).tag("tautulli-{}-get_activity".format(server.id)) if server.get_stats: at_time = schedule.every(server.get_stats_run_seconds).seconds - at_time.do(QUEUE.put, TAUTULLI.get_stats).tag("tautulli-{}-get_stats".format(server.id)) + at_time.do(thread, TAUTULLI.get_stats).tag("tautulli-{}-get_stats".format(server.id)) if CONFIG.radarr_enabled: for server in CONFIG.radarr_servers: RADARR = RadarrAPI(server, DBMANAGER) if server.get_missing: at_time = schedule.every(server.get_missing_run_seconds).seconds - at_time.do(QUEUE.put, RADARR.get_missing).tag("radarr-{}-get_missing".format(server.id)) + at_time.do(thread, RADARR.get_missing).tag("radarr-{}-get_missing".format(server.id)) if server.queue: at_time = schedule.every(server.queue_run_seconds).seconds - at_time.do(QUEUE.put, RADARR.get_queue).tag("radarr-{}-get_queue".format(server.id)) + at_time.do(thread, RADARR.get_queue).tag("radarr-{}-get_queue".format(server.id)) if CONFIG.lidarr_enabled: for server in CONFIG.lidarr_servers: LIDARR = LidarrAPI(server, DBMANAGER) if server.queue: at_time = schedule.every(server.queue_run_seconds).seconds - at_time.do(QUEUE.put, LIDARR.get_queue, None).tag("lidarr-{}-get_queue".format(server.id)) + at_time.do(thread, LIDARR.get_queue).tag("lidarr-{}-get_queue".format(server.id)) if server.missing_days > 0: at_time = schedule.every(server.missing_days_run_seconds).seconds - at_time.do(QUEUE.put, (LIDARR.get_calendar, "Missing")).tag( + at_time.do(thread, (LIDARR.get_calendar, "Missing")).tag( "lidarr-{}-get_missing".format(server.id)) if server.future_days > 0: at_time = schedule.every(server.future_days_run_seconds).seconds - at_time.do(QUEUE.put, (LIDARR.get_calendar, "Future")).tag("lidarr-{}-get_future".format( + at_time.do(thread, (LIDARR.get_calendar, "Future")).tag("lidarr-{}-get_future".format( server.id)) if CONFIG.ombi_enabled: @@ -159,26 +155,26 @@ if __name__ == "__main__": OMBI = OmbiAPI(server, DBMANAGER) if server.request_type_counts: at_time = schedule.every(server.request_type_run_seconds).seconds - at_time.do(QUEUE.put, OMBI.get_request_counts).tag("ombi-{}-get_request_counts".format(server.id)) + at_time.do(thread, OMBI.get_request_counts).tag("ombi-{}-get_request_counts".format(server.id)) if server.request_total_counts: at_time = schedule.every(server.request_total_run_seconds).seconds - at_time.do(QUEUE.put, OMBI.get_all_requests).tag("ombi-{}-get_all_requests".format(server.id)) + at_time.do(thread, OMBI.get_all_requests).tag("ombi-{}-get_all_requests".format(server.id)) if server.issue_status_counts: at_time = schedule.every(server.issue_status_run_seconds).seconds - at_time.do(QUEUE.put, OMBI.get_issue_counts).tag("ombi-{}-get_issue_counts".format(server.id)) + at_time.do(thread, OMBI.get_issue_counts).tag("ombi-{}-get_issue_counts".format(server.id)) if CONFIG.sickchill_enabled: for server in CONFIG.sickchill_servers: SICKCHILL = SickChillAPI(server, DBMANAGER) if server.get_missing: at_time = schedule.every(server.get_missing_run_seconds).seconds - at_time.do(QUEUE.put, SICKCHILL.get_missing).tag("sickchill-{}-get_missing".format(server.id)) + at_time.do(thread, SICKCHILL.get_missing).tag("sickchill-{}-get_missing".format(server.id)) if CONFIG.unifi_enabled: for server in CONFIG.unifi_servers: UNIFI = UniFiAPI(server, DBMANAGER) at_time = schedule.every(server.get_usg_stats_run_seconds).seconds - at_time.do(QUEUE.put, UNIFI.get_usg_stats).tag("unifi-{}-get_usg_stats".format(server.id)) + at_time.do(thread, UNIFI.get_usg_stats).tag("unifi-{}-get_usg_stats".format(server.id)) # Run all on startup SERVICES_ENABLED = [CONFIG.ombi_enabled, CONFIG.radarr_enabled, CONFIG.tautulli_enabled, CONFIG.unifi_enabled, @@ -187,9 +183,6 @@ if __name__ == "__main__": vl.logger.error("All services disabled. Exiting") exit(1) - WORKER = Thread(target=thread) - WORKER.start() - schedule.run_all() while schedule.jobs: