diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 649794c0..461a5a3c 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -9,8 +9,6 @@ from Worker import Worker from util import helper import util -MAX_WORKERS = 10 # Max concurent workers - class WorkerManager: @@ -47,7 +45,7 @@ class WorkerManager: tasks = self.tasks[:] # Copy it so removing elements wont cause any problem for task in tasks: - size_extra_time = task["size"] / (1024*100) # 1 second for every 100k + size_extra_time = task["size"] / (1024 * 100) # 1 second for every 100k if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time: # Task taking too long time, skip it self.log.debug("Timeout, Skipping: %s" % task) # Skip to next file workers @@ -126,15 +124,20 @@ class WorkerManager: self.started_task_num = 0 self.site.updateWebsocket() - # New peers added to site def onPeers(self): self.startWorkers() + def getMaxWorkers(self): + if len(self.tasks) < 30: + return 10 + else: + return 20 + # Add new worker def addWorker(self, peer): key = peer.key - if key not in self.workers and len(self.workers) < MAX_WORKERS: + if key not in self.workers and len(self.workers) < self.getMaxWorkers(): # We dont have worker for that peer and workers num less than max worker = Worker(self, peer) self.workers[key] = worker @@ -148,7 +151,7 @@ class WorkerManager: def startWorkers(self, peers=None): if not self.tasks: return False # No task for workers - if len(self.workers) >= MAX_WORKERS and not peers: + if len(self.workers) >= self.getMaxWorkers() and not peers: return False # Workers number already maxed and no starting peers definied if not peers: peers = self.site.peers.values() # No peers definied, use any from site @@ -161,7 +164,7 @@ class WorkerManager: continue # If peers definied and peer not valid worker = self.addWorker(peer) if worker: - self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), MAX_WORKERS)) + self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers())) # Find peers for optional hash in local hash tables and add to task peers def findOptionalTasks(self, optional_tasks): @@ -295,7 +298,6 @@ class WorkerManager: if len(found) < len(optional_hash_ids): self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found))) - # Stop all worker def stopWorkers(self): for worker in self.workers.values(): @@ -317,7 +319,7 @@ class WorkerManager: worker.running = False if worker.key in self.workers: del(self.workers[worker.key]) - self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), MAX_WORKERS)) + self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers())) # Create new task and return asyncresult def addTask(self, inner_path, peer=None, priority=0):