diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 5bdcd83d..695d80e5 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -11,6 +11,7 @@ from util import helper from Plugin import PluginManager import util + @PluginManager.acceptPlugins class WorkerManager(object): @@ -22,6 +23,7 @@ class WorkerManager(object): # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids} self.started_task_num = 0 # Last added task num self.running = True + self.time_task_added = 0 self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short) self.process_taskchecker = gevent.spawn(self.checkTasks) @@ -215,8 +217,20 @@ class WorkerManager(object): # Start find peers for optional files @util.Noparallel(blocking=False, ignore_args=True) def startFindOptional(self, reset_task=False, find_more=False, high_priority=False): + # Wait for more file requests + if len(self.tasks) < 20 or high_priority: + time.sleep(0.01) + if len(self.tasks) > 90: + time.sleep(5) + else: + time.sleep(0.5) + optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] + if not optional_tasks: + return False optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) + time_tasks = self.time_task_added + self.log.debug( "Finding peers for optional files: %s (reset_task: %s, find_more: %s)" % (optional_hash_ids, reset_task, find_more) @@ -240,6 +254,10 @@ class WorkerManager(object): threads.append(gevent.spawn(peer.updateHashfield)) gevent.joinall(threads, timeout=5) + if time_tasks != self.time_task_added: # New task added since start + optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] + optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) + found = self.findOptionalTasks(optional_tasks) self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % ( len(found), len(optional_hash_ids) @@ -249,17 +267,18 @@ class WorkerManager(object): found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers]) self.startWorkers(found_peers) - if len(found) < len(optional_hash_ids) or find_more: + if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.itervalues())): self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found))) # Try to query connected peers threads = [] - peers = self.site.getConnectedPeers() + peers = [peer for peer in self.site.getConnectedPeers() if peer not in self.asked_peers] if not peers: peers = self.site.getConnectablePeers() for peer in peers: threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids))) + self.asked_peers.append(peer) for i in range(5): time.sleep(1) @@ -269,8 +288,8 @@ class WorkerManager(object): found_ips = helper.mergeDicts(thread_values) found = self.addOptionalPeers(found_ips) - self.log.debug("Found optional files after findhash connected peers: %s/%s" % ( - len(found), len(optional_hash_ids) + self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % ( + len(found), len(optional_hash_ids), len(threads) )) if found: @@ -284,11 +303,17 @@ class WorkerManager(object): if len(found) < len(optional_hash_ids): self.log.debug("No findHash result, try random peers: %s" % (optional_hash_ids - set(found))) # Try to query random peers - threads = [] - peers = self.site.getConnectablePeers() - for peer in peers[0:5]: + if time_tasks != self.time_task_added: # New task added since start + optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] + optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) + + threads = [] + peers = self.site.getConnectablePeers(ignore=self.asked_peers) + + for peer in peers: threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids))) + self.asked_peers.append(peer) gevent.joinall(threads, timeout=15) @@ -325,8 +350,13 @@ class WorkerManager(object): if worker.key in self.workers: del(self.workers[worker.key]) self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers())) - if len(self.workers) <= self.getMaxWorkers()/2 and any(task["optional_hash_id"] for task in self.tasks): - self.startFindOptional(find_more=True) + if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10: + important_task = (task for task in self.tasks if task["priority"] > 0) + if next(important_task, None) or len(self.asked_peers) == 0: + self.startFindOptional(find_more=True) + else: + self.startFindOptional() + # Tasks sorted by this def getPriorityBoost(self, inner_path): @@ -394,6 +424,7 @@ class WorkerManager(object): "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" % (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num) ) + self.time_task_added = time.time() if optional_hash_id: self.startFindOptional()