Update task ids during startFindOptional if new task added
This commit is contained in:
parent
d60785ee33
commit
62d092e5ac
1 changed files with 40 additions and 9 deletions
|
@ -11,6 +11,7 @@ from util import helper
|
|||
from Plugin import PluginManager
|
||||
import util
|
||||
|
||||
|
||||
@PluginManager.acceptPlugins
|
||||
class WorkerManager(object):
|
||||
|
||||
|
@ -22,6 +23,7 @@ class WorkerManager(object):
|
|||
# "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
|
||||
self.started_task_num = 0 # Last added task num
|
||||
self.running = True
|
||||
self.time_task_added = 0
|
||||
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
|
||||
self.process_taskchecker = gevent.spawn(self.checkTasks)
|
||||
|
||||
|
@ -215,8 +217,20 @@ class WorkerManager(object):
|
|||
# Start find peers for optional files
|
||||
@util.Noparallel(blocking=False, ignore_args=True)
|
||||
def startFindOptional(self, reset_task=False, find_more=False, high_priority=False):
|
||||
# Wait for more file requests
|
||||
if len(self.tasks) < 20 or high_priority:
|
||||
time.sleep(0.01)
|
||||
if len(self.tasks) > 90:
|
||||
time.sleep(5)
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
|
||||
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
||||
if not optional_tasks:
|
||||
return False
|
||||
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
||||
time_tasks = self.time_task_added
|
||||
|
||||
self.log.debug(
|
||||
"Finding peers for optional files: %s (reset_task: %s, find_more: %s)" %
|
||||
(optional_hash_ids, reset_task, find_more)
|
||||
|
@ -240,6 +254,10 @@ class WorkerManager(object):
|
|||
threads.append(gevent.spawn(peer.updateHashfield))
|
||||
gevent.joinall(threads, timeout=5)
|
||||
|
||||
if time_tasks != self.time_task_added: # New task added since start
|
||||
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
||||
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
||||
|
||||
found = self.findOptionalTasks(optional_tasks)
|
||||
self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
|
||||
len(found), len(optional_hash_ids)
|
||||
|
@ -249,17 +267,18 @@ class WorkerManager(object):
|
|||
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
||||
self.startWorkers(found_peers)
|
||||
|
||||
if len(found) < len(optional_hash_ids) or find_more:
|
||||
if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.itervalues())):
|
||||
self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||
|
||||
# Try to query connected peers
|
||||
threads = []
|
||||
peers = self.site.getConnectedPeers()
|
||||
peers = [peer for peer in self.site.getConnectedPeers() if peer not in self.asked_peers]
|
||||
if not peers:
|
||||
peers = self.site.getConnectablePeers()
|
||||
|
||||
for peer in peers:
|
||||
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
||||
self.asked_peers.append(peer)
|
||||
|
||||
for i in range(5):
|
||||
time.sleep(1)
|
||||
|
@ -269,8 +288,8 @@ class WorkerManager(object):
|
|||
|
||||
found_ips = helper.mergeDicts(thread_values)
|
||||
found = self.addOptionalPeers(found_ips)
|
||||
self.log.debug("Found optional files after findhash connected peers: %s/%s" % (
|
||||
len(found), len(optional_hash_ids)
|
||||
self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % (
|
||||
len(found), len(optional_hash_ids), len(threads)
|
||||
))
|
||||
|
||||
if found:
|
||||
|
@ -284,11 +303,17 @@ class WorkerManager(object):
|
|||
if len(found) < len(optional_hash_ids):
|
||||
self.log.debug("No findHash result, try random peers: %s" % (optional_hash_ids - set(found)))
|
||||
# Try to query random peers
|
||||
threads = []
|
||||
peers = self.site.getConnectablePeers()
|
||||
|
||||
for peer in peers[0:5]:
|
||||
if time_tasks != self.time_task_added: # New task added since start
|
||||
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
||||
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
||||
|
||||
threads = []
|
||||
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
||||
|
||||
for peer in peers:
|
||||
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
||||
self.asked_peers.append(peer)
|
||||
|
||||
gevent.joinall(threads, timeout=15)
|
||||
|
||||
|
@ -325,8 +350,13 @@ class WorkerManager(object):
|
|||
if worker.key in self.workers:
|
||||
del(self.workers[worker.key])
|
||||
self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
|
||||
if len(self.workers) <= self.getMaxWorkers()/2 and any(task["optional_hash_id"] for task in self.tasks):
|
||||
if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10:
|
||||
important_task = (task for task in self.tasks if task["priority"] > 0)
|
||||
if next(important_task, None) or len(self.asked_peers) == 0:
|
||||
self.startFindOptional(find_more=True)
|
||||
else:
|
||||
self.startFindOptional()
|
||||
|
||||
|
||||
# Tasks sorted by this
|
||||
def getPriorityBoost(self, inner_path):
|
||||
|
@ -394,6 +424,7 @@ class WorkerManager(object):
|
|||
"New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
|
||||
(task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
|
||||
)
|
||||
self.time_task_added = time.time()
|
||||
|
||||
if optional_hash_id:
|
||||
self.startFindOptional()
|
||||
|
|
Loading…
Reference in a new issue