Force start peers for optional files
This commit is contained in:
parent
f997a69ebc
commit
365ba9b5f4
1 changed files with 15 additions and 9 deletions
|
@ -90,7 +90,7 @@ class WorkerManager(object):
|
||||||
if task["peers"]:
|
if task["peers"]:
|
||||||
peers_try = [peer for peer in task["peers"] if peer not in task["failed"]]
|
peers_try = [peer for peer in task["peers"] if peer not in task["failed"]]
|
||||||
if peers_try:
|
if peers_try:
|
||||||
self.startWorkers(peers_try)
|
self.startWorkers(peers_try, force_num=5)
|
||||||
self.startFindOptional(find_more=True)
|
self.startFindOptional(find_more=True)
|
||||||
else:
|
else:
|
||||||
if task["peers"]: # Release the peer lock
|
if task["peers"]: # Release the peer lock
|
||||||
|
@ -139,9 +139,9 @@ class WorkerManager(object):
|
||||||
return config.workers
|
return config.workers
|
||||||
|
|
||||||
# Add new worker
|
# Add new worker
|
||||||
def addWorker(self, peer, multiplexing=False):
|
def addWorker(self, peer, multiplexing=False, force=False):
|
||||||
key = peer.key
|
key = peer.key
|
||||||
if len(self.workers) > self.getMaxWorkers():
|
if len(self.workers) > self.getMaxWorkers() and not force:
|
||||||
return False
|
return False
|
||||||
if multiplexing: # Add even if we already have worker for this peer
|
if multiplexing: # Add even if we already have worker for this peer
|
||||||
key = "%s/%s" % (key, len(self.workers))
|
key = "%s/%s" % (key, len(self.workers))
|
||||||
|
@ -166,7 +166,7 @@ class WorkerManager(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Start workers to process tasks
|
# Start workers to process tasks
|
||||||
def startWorkers(self, peers=None):
|
def startWorkers(self, peers=None, force_num=0):
|
||||||
if not self.tasks:
|
if not self.tasks:
|
||||||
return False # No task for workers
|
return False # No task for workers
|
||||||
if len(self.workers) >= self.getMaxWorkers() and not peers:
|
if len(self.workers) >= self.getMaxWorkers() and not peers:
|
||||||
|
@ -185,7 +185,13 @@ class WorkerManager(object):
|
||||||
for peer in peers: # One worker for every peer
|
for peer in peers: # One worker for every peer
|
||||||
if peers and peer not in peers:
|
if peers and peer not in peers:
|
||||||
continue # If peers defined and peer not valid
|
continue # If peers defined and peer not valid
|
||||||
worker = self.addWorker(peer)
|
|
||||||
|
if force_num:
|
||||||
|
worker = self.addWorker(peer, force=True)
|
||||||
|
force_num -= 1
|
||||||
|
else:
|
||||||
|
worker = self.addWorker(peer)
|
||||||
|
|
||||||
if worker:
|
if worker:
|
||||||
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
|
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
|
||||||
|
|
||||||
|
@ -272,7 +278,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for peers in found.values() for peer in peers])
|
found_peers = set([peer for peers in found.values() for peer in peers])
|
||||||
self.startWorkers(found_peers)
|
self.startWorkers(found_peers, force_num=3)
|
||||||
|
|
||||||
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.itervalues())):
|
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.itervalues())):
|
||||||
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
|
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||||
|
@ -297,7 +303,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
||||||
self.startWorkers(found_peers)
|
self.startWorkers(found_peers, force_num=3)
|
||||||
|
|
||||||
if len(found) < len(optional_hash_ids) or find_more:
|
if len(found) < len(optional_hash_ids) or find_more:
|
||||||
self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
|
self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||||
|
@ -330,7 +336,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
||||||
self.startWorkers(found_peers)
|
self.startWorkers(found_peers, force_num=3)
|
||||||
|
|
||||||
if len(thread_values) == len(threads):
|
if len(thread_values) == len(threads):
|
||||||
# Got result from all started thread
|
# Got result from all started thread
|
||||||
|
@ -359,7 +365,7 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
||||||
self.startWorkers(found_peers)
|
self.startWorkers(found_peers, force_num=3)
|
||||||
|
|
||||||
if len(found) < len(optional_hash_ids):
|
if len(found) < len(optional_hash_ids):
|
||||||
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
|
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||||
|
|
Loading…
Reference in a new issue