Add reason for startWorkers
This commit is contained in:
parent
752dabe554
commit
8594e4ce4a
1 changed files with 22 additions and 20 deletions
|
@ -97,7 +97,7 @@ class WorkerManager(object):
|
||||||
if task["peers"]:
|
if task["peers"]:
|
||||||
peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers]
|
peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers]
|
||||||
if peers_try:
|
if peers_try:
|
||||||
self.startWorkers(peers_try, force_num=5)
|
self.startWorkers(peers_try, force_num=5, reason="Task checker (optional, has peers)")
|
||||||
else:
|
else:
|
||||||
self.startFindOptional(find_more=True)
|
self.startFindOptional(find_more=True)
|
||||||
else:
|
else:
|
||||||
|
@ -106,10 +106,10 @@ class WorkerManager(object):
|
||||||
if task["peers"]: # Release the peer lock
|
if task["peers"]: # Release the peer lock
|
||||||
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
||||||
task["peers"] = []
|
task["peers"] = []
|
||||||
self.startWorkers()
|
self.startWorkers(reason="Task checker")
|
||||||
|
|
||||||
if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers():
|
if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers():
|
||||||
self.startWorkers()
|
self.startWorkers(reason="Task checker (need more workers)")
|
||||||
|
|
||||||
self.log.debug("checkTasks stopped running")
|
self.log.debug("checkTasks stopped running")
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
# New peers added to site
|
# New peers added to site
|
||||||
def onPeers(self):
|
def onPeers(self):
|
||||||
self.startWorkers()
|
self.startWorkers(reason="More peers found")
|
||||||
|
|
||||||
def getMaxWorkers(self):
|
def getMaxWorkers(self):
|
||||||
if len(self.tasks) > 50:
|
if len(self.tasks) > 50:
|
||||||
|
@ -176,22 +176,24 @@ class WorkerManager(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Start workers to process tasks
|
# Start workers to process tasks
|
||||||
def startWorkers(self, peers=None, force_num=0):
|
def startWorkers(self, peers=None, force_num=0, reason="Unknown"):
|
||||||
if not self.tasks:
|
if not self.tasks:
|
||||||
return False # No task for workers
|
return False # No task for workers
|
||||||
if len(self.workers) >= self.getMaxWorkers() and not peers:
|
max_workers = min(self.getMaxWorkers(), len(self.site.peers))
|
||||||
|
if len(self.workers) >= max_workers and not peers:
|
||||||
return False # Workers number already maxed and no starting peers defined
|
return False # Workers number already maxed and no starting peers defined
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
"Starting workers, tasks: %s, peers: %s, workers: %s" %
|
"Starting workers (%s), tasks: %s, peers: %s, workers: %s" %
|
||||||
(len(self.tasks), len(peers or []), len(self.workers))
|
(reason, len(self.tasks), len(peers or []), len(self.workers))
|
||||||
)
|
)
|
||||||
if not peers:
|
if not peers:
|
||||||
peers = self.site.getConnectedPeers()
|
peers = self.site.getConnectedPeers()
|
||||||
if len(peers) < self.getMaxWorkers():
|
if len(peers) < max_workers:
|
||||||
peers += self.site.getRecentPeers(self.getMaxWorkers())
|
peers += self.site.getRecentPeers(max_workers * 2)
|
||||||
if type(peers) is set:
|
if type(peers) is set:
|
||||||
peers = list(peers)
|
peers = list(peers)
|
||||||
|
|
||||||
|
|
||||||
# Sort by ping
|
# Sort by ping
|
||||||
peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999)
|
peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999)
|
||||||
|
|
||||||
|
@ -206,7 +208,7 @@ class WorkerManager(object):
|
||||||
worker = self.addWorker(peer)
|
worker = self.addWorker(peer)
|
||||||
|
|
||||||
if worker:
|
if worker:
|
||||||
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
|
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), max_workers))
|
||||||
|
|
||||||
# Find peers for optional hash in local hash tables and add to task peers
|
# Find peers for optional hash in local hash tables and add to task peers
|
||||||
def findOptionalTasks(self, optional_tasks, reset_task=False):
|
def findOptionalTasks(self, optional_tasks, reset_task=False):
|
||||||
|
@ -291,7 +293,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for peers in list(found.values()) for peer in peers])
|
found_peers = set([peer for peers in list(found.values()) for peer in peers])
|
||||||
self.startWorkers(found_peers, force_num=3)
|
self.startWorkers(found_peers, force_num=3, reason="Optional found in local peers")
|
||||||
|
|
||||||
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.values())):
|
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.values())):
|
||||||
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
|
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||||
|
@ -316,7 +318,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
|
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
|
||||||
self.startWorkers(found_peers, force_num=3)
|
self.startWorkers(found_peers, force_num=3, reason="Optional found in connected peers")
|
||||||
|
|
||||||
if len(found) < len(optional_hash_ids) or find_more:
|
if len(found) < len(optional_hash_ids) or find_more:
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
|
@ -352,7 +354,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
|
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
|
||||||
self.startWorkers(found_peers, force_num=3)
|
self.startWorkers(found_peers, force_num=3, reason="Optional found by findhash connected peers")
|
||||||
|
|
||||||
if len(thread_values) == len(threads):
|
if len(thread_values) == len(threads):
|
||||||
# Got result from all started thread
|
# Got result from all started thread
|
||||||
|
@ -384,7 +386,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
|
found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
|
||||||
self.startWorkers(found_peers, force_num=3)
|
self.startWorkers(found_peers, force_num=3, reason="Option found using findhash random peers")
|
||||||
|
|
||||||
if len(found) < len(optional_hash_ids):
|
if len(found) < len(optional_hash_ids):
|
||||||
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
|
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||||
|
@ -426,7 +428,7 @@ class WorkerManager(object):
|
||||||
self.startFindOptional()
|
self.startFindOptional()
|
||||||
elif self.tasks and not self.workers and worker.task:
|
elif self.tasks and not self.workers and worker.task:
|
||||||
self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks))
|
self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks))
|
||||||
self.startWorkers()
|
self.startWorkers(reason="Removed worker")
|
||||||
|
|
||||||
# Tasks sorted by this
|
# Tasks sorted by this
|
||||||
def getPriorityBoost(self, inner_path):
|
def getPriorityBoost(self, inner_path):
|
||||||
|
@ -460,11 +462,11 @@ class WorkerManager(object):
|
||||||
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
||||||
task["peers"].append(peer)
|
task["peers"].append(peer)
|
||||||
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
||||||
self.startWorkers([peer])
|
self.startWorkers([peer], reason="Added new task (update received by peer)")
|
||||||
elif peer and peer in task["failed"]:
|
elif peer and peer in task["failed"]:
|
||||||
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
||||||
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
||||||
self.startWorkers([peer])
|
self.startWorkers([peer], reason="Added new task (peer failed before)")
|
||||||
return task
|
return task
|
||||||
else: # No task for that file yet
|
else: # No task for that file yet
|
||||||
evt = gevent.event.AsyncResult()
|
evt = gevent.event.AsyncResult()
|
||||||
|
@ -510,10 +512,10 @@ class WorkerManager(object):
|
||||||
self.startFindOptional(high_priority=priority > 0)
|
self.startFindOptional(high_priority=priority > 0)
|
||||||
|
|
||||||
if peers:
|
if peers:
|
||||||
self.startWorkers(peers)
|
self.startWorkers(peers, reason="Added new optional task")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.startWorkers(peers)
|
self.startWorkers(peers, reason="Added new task")
|
||||||
return task
|
return task
|
||||||
|
|
||||||
# Find a task using inner_path
|
# Find a task using inner_path
|
||||||
|
|
Loading…
Reference in a new issue