From 8bf17d3a69fe7253b37908ae6009dde2fb4a496d Mon Sep 17 00:00:00 2001 From: shortcutme Date: Sat, 21 Dec 2019 02:57:25 +0100 Subject: [PATCH] Add reason for Worker actions --- src/Worker/Worker.py | 10 +++++----- src/Worker/WorkerManager.py | 14 ++++++-------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index 4cdb83cb..81bbc7df 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -171,7 +171,7 @@ class Worker(object): if not task["done"]: if write_err: - self.manager.failTask(task) + self.manager.failTask(task, reason="Write error") self.num_failed += 1 self.manager.log.error("%s: Error writing %s: %s" % (self.key, task["inner_path"], write_err)) elif is_valid: @@ -223,15 +223,15 @@ class Worker(object): self.thread = gevent.spawn(self.downloader) # Skip current task - def skip(self): - self.manager.log.debug("%s: Force skipping" % self.key) + def skip(self, reason="Unknown"): + self.manager.log.debug("%s: Force skipping (reason: %s)" % (self.key, reason)) if self.thread: self.thread.kill(exception=Debug.Notify("Worker stopped")) self.start() # Force stop the worker - def stop(self): - self.manager.log.debug("%s: Force stopping" % self.key) + def stop(self, reason="Unknown"): + self.manager.log.debug("%s: Force stopping (reason: %s)" % (self.key, reason)) self.running = False if self.thread: self.thread.kill(exception=Debug.Notify("Worker stopped")) diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 9054f283..932f9d6a 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -47,7 +47,7 @@ class WorkerManager(object): # Clean up workers for worker in list(self.workers.values()): if worker.task and worker.task["done"]: - worker.skip() # Stop workers with task done + worker.skip(reason="Task done") # Stop workers with task done if not self.tasks: continue @@ -67,14 +67,12 @@ class WorkerManager(object): workers = self.findWorkers(task) if workers: for worker in workers: - worker.skip() + worker.skip(reason="Task timeout") else: - self.failTask(task) + self.failTask(task, reason="No workers") elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left - self.log.debug("Timeout, Cleanup task: %s" % task) - # Remove task - self.failTask(task) + self.failTask(task, reason="Timeout") elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers: # Find more workers: Task started more than 15 sec ago or no workers @@ -407,11 +405,11 @@ class WorkerManager(object): def stopWorkers(self): num = 0 for worker in list(self.workers.values()): - worker.stop() + worker.stop(reason="Stopping all workers") num += 1 tasks = self.tasks[:] # Copy for task in tasks: # Mark all current task as failed - self.failTask(task) + self.failTask(task, reason="Stopping all workers") return num # Find workers by task