Add reason for Worker actions
This commit is contained in:
parent
0881e274a9
commit
8bf17d3a69
2 changed files with 11 additions and 13 deletions
|
@ -171,7 +171,7 @@ class Worker(object):
|
||||||
|
|
||||||
if not task["done"]:
|
if not task["done"]:
|
||||||
if write_err:
|
if write_err:
|
||||||
self.manager.failTask(task)
|
self.manager.failTask(task, reason="Write error")
|
||||||
self.num_failed += 1
|
self.num_failed += 1
|
||||||
self.manager.log.error("%s: Error writing %s: %s" % (self.key, task["inner_path"], write_err))
|
self.manager.log.error("%s: Error writing %s: %s" % (self.key, task["inner_path"], write_err))
|
||||||
elif is_valid:
|
elif is_valid:
|
||||||
|
@ -223,15 +223,15 @@ class Worker(object):
|
||||||
self.thread = gevent.spawn(self.downloader)
|
self.thread = gevent.spawn(self.downloader)
|
||||||
|
|
||||||
# Skip current task
|
# Skip current task
|
||||||
def skip(self):
|
def skip(self, reason="Unknown"):
|
||||||
self.manager.log.debug("%s: Force skipping" % self.key)
|
self.manager.log.debug("%s: Force skipping (reason: %s)" % (self.key, reason))
|
||||||
if self.thread:
|
if self.thread:
|
||||||
self.thread.kill(exception=Debug.Notify("Worker stopped"))
|
self.thread.kill(exception=Debug.Notify("Worker stopped"))
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
# Force stop the worker
|
# Force stop the worker
|
||||||
def stop(self):
|
def stop(self, reason="Unknown"):
|
||||||
self.manager.log.debug("%s: Force stopping" % self.key)
|
self.manager.log.debug("%s: Force stopping (reason: %s)" % (self.key, reason))
|
||||||
self.running = False
|
self.running = False
|
||||||
if self.thread:
|
if self.thread:
|
||||||
self.thread.kill(exception=Debug.Notify("Worker stopped"))
|
self.thread.kill(exception=Debug.Notify("Worker stopped"))
|
||||||
|
|
|
@ -47,7 +47,7 @@ class WorkerManager(object):
|
||||||
# Clean up workers
|
# Clean up workers
|
||||||
for worker in list(self.workers.values()):
|
for worker in list(self.workers.values()):
|
||||||
if worker.task and worker.task["done"]:
|
if worker.task and worker.task["done"]:
|
||||||
worker.skip() # Stop workers with task done
|
worker.skip(reason="Task done") # Stop workers with task done
|
||||||
|
|
||||||
if not self.tasks:
|
if not self.tasks:
|
||||||
continue
|
continue
|
||||||
|
@ -67,14 +67,12 @@ class WorkerManager(object):
|
||||||
workers = self.findWorkers(task)
|
workers = self.findWorkers(task)
|
||||||
if workers:
|
if workers:
|
||||||
for worker in workers:
|
for worker in workers:
|
||||||
worker.skip()
|
worker.skip(reason="Task timeout")
|
||||||
else:
|
else:
|
||||||
self.failTask(task)
|
self.failTask(task, reason="No workers")
|
||||||
|
|
||||||
elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
|
elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
|
||||||
self.log.debug("Timeout, Cleanup task: %s" % task)
|
self.failTask(task, reason="Timeout")
|
||||||
# Remove task
|
|
||||||
self.failTask(task)
|
|
||||||
|
|
||||||
elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
|
elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
|
||||||
# Find more workers: Task started more than 15 sec ago or no workers
|
# Find more workers: Task started more than 15 sec ago or no workers
|
||||||
|
@ -407,11 +405,11 @@ class WorkerManager(object):
|
||||||
def stopWorkers(self):
|
def stopWorkers(self):
|
||||||
num = 0
|
num = 0
|
||||||
for worker in list(self.workers.values()):
|
for worker in list(self.workers.values()):
|
||||||
worker.stop()
|
worker.stop(reason="Stopping all workers")
|
||||||
num += 1
|
num += 1
|
||||||
tasks = self.tasks[:] # Copy
|
tasks = self.tasks[:] # Copy
|
||||||
for task in tasks: # Mark all current task as failed
|
for task in tasks: # Mark all current task as failed
|
||||||
self.failTask(task)
|
self.failTask(task, reason="Stopping all workers")
|
||||||
return num
|
return num
|
||||||
|
|
||||||
# Find workers by task
|
# Find workers by task
|
||||||
|
|
Loading…
Reference in a new issue