From 8594e4ce4aff9d079c4c220e1cecd5579de1e2b9 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Thu, 4 Apr 2019 13:27:06 +0200 Subject: [PATCH] Add reason for startWorkers --- src/Worker/WorkerManager.py | 42 +++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 9d266d29..9fef8217 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -97,7 +97,7 @@ class WorkerManager(object): if task["peers"]: peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers] if peers_try: - self.startWorkers(peers_try, force_num=5) + self.startWorkers(peers_try, force_num=5, reason="Task checker (optional, has peers)") else: self.startFindOptional(find_more=True) else: @@ -106,10 +106,10 @@ class WorkerManager(object): if task["peers"]: # Release the peer lock self.log.debug("Task peer lock release: %s" % task["inner_path"]) task["peers"] = [] - self.startWorkers() + self.startWorkers(reason="Task checker") if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers(): - self.startWorkers() + self.startWorkers(reason="Task checker (need more workers)") self.log.debug("checkTasks stopped running") @@ -140,7 +140,7 @@ class WorkerManager(object): # New peers added to site def onPeers(self): - self.startWorkers() + self.startWorkers(reason="More peers found") def getMaxWorkers(self): if len(self.tasks) > 50: @@ -176,22 +176,24 @@ class WorkerManager(object): return True # Start workers to process tasks - def startWorkers(self, peers=None, force_num=0): + def startWorkers(self, peers=None, force_num=0, reason="Unknown"): if not self.tasks: return False # No task for workers - if len(self.workers) >= self.getMaxWorkers() and not peers: + max_workers = min(self.getMaxWorkers(), len(self.site.peers)) + if len(self.workers) >= max_workers and not peers: return False # Workers number already maxed and no starting peers defined self.log.debug( - "Starting workers, tasks: %s, peers: %s, workers: %s" % - (len(self.tasks), len(peers or []), len(self.workers)) + "Starting workers (%s), tasks: %s, peers: %s, workers: %s" % + (reason, len(self.tasks), len(peers or []), len(self.workers)) ) if not peers: peers = self.site.getConnectedPeers() - if len(peers) < self.getMaxWorkers(): - peers += self.site.getRecentPeers(self.getMaxWorkers()) + if len(peers) < max_workers: + peers += self.site.getRecentPeers(max_workers * 2) if type(peers) is set: peers = list(peers) + # Sort by ping peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999) @@ -206,7 +208,7 @@ class WorkerManager(object): worker = self.addWorker(peer) if worker: - self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers())) + self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), max_workers)) # Find peers for optional hash in local hash tables and add to task peers def findOptionalTasks(self, optional_tasks, reset_task=False): @@ -291,7 +293,7 @@ class WorkerManager(object): if found: found_peers = set([peer for peers in list(found.values()) for peer in peers]) - self.startWorkers(found_peers, force_num=3) + self.startWorkers(found_peers, force_num=3, reason="Optional found in local peers") if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.values())): self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found))) @@ -316,7 +318,7 @@ class WorkerManager(object): if found: found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers]) - self.startWorkers(found_peers, force_num=3) + self.startWorkers(found_peers, force_num=3, reason="Optional found in connected peers") if len(found) < len(optional_hash_ids) or find_more: self.log.debug( @@ -352,7 +354,7 @@ class WorkerManager(object): if found: found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers]) - self.startWorkers(found_peers, force_num=3) + self.startWorkers(found_peers, force_num=3, reason="Optional found by findhash connected peers") if len(thread_values) == len(threads): # Got result from all started thread @@ -384,7 +386,7 @@ class WorkerManager(object): if found: found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers]) - self.startWorkers(found_peers, force_num=3) + self.startWorkers(found_peers, force_num=3, reason="Option found using findhash random peers") if len(found) < len(optional_hash_ids): self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found))) @@ -426,7 +428,7 @@ class WorkerManager(object): self.startFindOptional() elif self.tasks and not self.workers and worker.task: self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks)) - self.startWorkers() + self.startWorkers(reason="Removed worker") # Tasks sorted by this def getPriorityBoost(self, inner_path): @@ -460,11 +462,11 @@ class WorkerManager(object): if peer and task["peers"]: # This peer also has new version, add it to task possible peers task["peers"].append(peer) self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"])) - self.startWorkers([peer]) + self.startWorkers([peer], reason="Added new task (update received by peer)") elif peer and peer in task["failed"]: task["failed"].remove(peer) # New update arrived, remove the peer from failed peers self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"])) - self.startWorkers([peer]) + self.startWorkers([peer], reason="Added new task (peer failed before)") return task else: # No task for that file yet evt = gevent.event.AsyncResult() @@ -510,10 +512,10 @@ class WorkerManager(object): self.startFindOptional(high_priority=priority > 0) if peers: - self.startWorkers(peers) + self.startWorkers(peers, reason="Added new optional task") else: - self.startWorkers(peers) + self.startWorkers(peers, reason="Added new task") return task # Find a task using inner_path