Allow more workers if more task

This commit is contained in:
HelloZeroNet 2016-03-19 18:07:14 +01:00
parent 437a9b79a8
commit 988f1435c5

View file

@ -9,8 +9,6 @@ from Worker import Worker
from util import helper from util import helper
import util import util
MAX_WORKERS = 10 # Max concurent workers
class WorkerManager: class WorkerManager:
@ -47,7 +45,7 @@ class WorkerManager:
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
for task in tasks: for task in tasks:
size_extra_time = task["size"] / (1024*100) # 1 second for every 100k size_extra_time = task["size"] / (1024 * 100) # 1 second for every 100k
if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time: # Task taking too long time, skip it if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time: # Task taking too long time, skip it
self.log.debug("Timeout, Skipping: %s" % task) self.log.debug("Timeout, Skipping: %s" % task)
# Skip to next file workers # Skip to next file workers
@ -126,15 +124,20 @@ class WorkerManager:
self.started_task_num = 0 self.started_task_num = 0
self.site.updateWebsocket() self.site.updateWebsocket()
# New peers added to site # New peers added to site
def onPeers(self): def onPeers(self):
self.startWorkers() self.startWorkers()
def getMaxWorkers(self):
if len(self.tasks) < 30:
return 10
else:
return 20
# Add new worker # Add new worker
def addWorker(self, peer): def addWorker(self, peer):
key = peer.key key = peer.key
if key not in self.workers and len(self.workers) < MAX_WORKERS: if key not in self.workers and len(self.workers) < self.getMaxWorkers():
# We dont have worker for that peer and workers num less than max # We dont have worker for that peer and workers num less than max
worker = Worker(self, peer) worker = Worker(self, peer)
self.workers[key] = worker self.workers[key] = worker
@ -148,7 +151,7 @@ class WorkerManager:
def startWorkers(self, peers=None): def startWorkers(self, peers=None):
if not self.tasks: if not self.tasks:
return False # No task for workers return False # No task for workers
if len(self.workers) >= MAX_WORKERS and not peers: if len(self.workers) >= self.getMaxWorkers() and not peers:
return False # Workers number already maxed and no starting peers definied return False # Workers number already maxed and no starting peers definied
if not peers: if not peers:
peers = self.site.peers.values() # No peers definied, use any from site peers = self.site.peers.values() # No peers definied, use any from site
@ -161,7 +164,7 @@ class WorkerManager:
continue # If peers definied and peer not valid continue # If peers definied and peer not valid
worker = self.addWorker(peer) worker = self.addWorker(peer)
if worker: if worker:
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), MAX_WORKERS)) self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
# Find peers for optional hash in local hash tables and add to task peers # Find peers for optional hash in local hash tables and add to task peers
def findOptionalTasks(self, optional_tasks): def findOptionalTasks(self, optional_tasks):
@ -295,7 +298,6 @@ class WorkerManager:
if len(found) < len(optional_hash_ids): if len(found) < len(optional_hash_ids):
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found))) self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
# Stop all worker # Stop all worker
def stopWorkers(self): def stopWorkers(self):
for worker in self.workers.values(): for worker in self.workers.values():
@ -317,7 +319,7 @@ class WorkerManager:
worker.running = False worker.running = False
if worker.key in self.workers: if worker.key in self.workers:
del(self.workers[worker.key]) del(self.workers[worker.key])
self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), MAX_WORKERS)) self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
# Create new task and return asyncresult # Create new task and return asyncresult
def addTask(self, inner_path, peer=None, priority=0): def addTask(self, inner_path, peer=None, priority=0):