Refactor task adding with less locking
This commit is contained in:
parent
5987274edf
commit
b2e7cbb927
1 changed files with 75 additions and 60 deletions
|
@ -460,72 +460,87 @@ class WorkerManager(object):
|
||||||
return 2
|
return 2
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def addTaskUpdate(self, task, peer, priority=0):
|
||||||
|
if priority > task["priority"]:
|
||||||
|
self.tasks.updateItem(task, "priority", priority)
|
||||||
|
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
||||||
|
task["peers"].append(peer)
|
||||||
|
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
||||||
|
self.startWorkers([peer], reason="Added new task (update received by peer)")
|
||||||
|
elif peer and peer in task["failed"]:
|
||||||
|
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
||||||
|
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
||||||
|
self.startWorkers([peer], reason="Added new task (peer failed before)")
|
||||||
|
|
||||||
|
def addTaskCreate(self, inner_path, peer, priority=0, file_info=None):
|
||||||
|
evt = gevent.event.AsyncResult()
|
||||||
|
if peer:
|
||||||
|
peers = [peer] # Only download from this peer
|
||||||
|
else:
|
||||||
|
peers = None
|
||||||
|
if not file_info:
|
||||||
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
||||||
|
if file_info and file_info["optional"]:
|
||||||
|
optional_hash_id = helper.toHashId(file_info["sha512"])
|
||||||
|
else:
|
||||||
|
optional_hash_id = None
|
||||||
|
if file_info:
|
||||||
|
size = file_info.get("size", 0)
|
||||||
|
else:
|
||||||
|
size = 0
|
||||||
|
|
||||||
|
self.lock_add_task.acquire()
|
||||||
|
|
||||||
|
# Check again if we have task for this file
|
||||||
|
task = self.tasks.findTask(inner_path)
|
||||||
|
if task:
|
||||||
|
self.addTaskUpdate(task, peer, priority)
|
||||||
|
return task
|
||||||
|
|
||||||
|
priority += self.getPriorityBoost(inner_path)
|
||||||
|
|
||||||
|
if self.started_task_num == 0: # Boost priority for first requested file
|
||||||
|
priority += 1
|
||||||
|
|
||||||
|
task = {
|
||||||
|
"id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
|
||||||
|
"optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None,
|
||||||
|
"time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
|
||||||
|
}
|
||||||
|
|
||||||
|
self.tasks.append(task)
|
||||||
|
self.lock_add_task.release()
|
||||||
|
|
||||||
|
self.next_task_id += 1
|
||||||
|
self.started_task_num += 1
|
||||||
|
if config.verbose:
|
||||||
|
self.log.debug(
|
||||||
|
"New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
|
||||||
|
(task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.time_task_added = time.time()
|
||||||
|
|
||||||
|
if optional_hash_id:
|
||||||
|
if self.asked_peers:
|
||||||
|
del self.asked_peers[:] # Reset asked peers
|
||||||
|
self.startFindOptional(high_priority=priority > 0)
|
||||||
|
|
||||||
|
if peers:
|
||||||
|
self.startWorkers(peers, reason="Added new optional task")
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.startWorkers(peers, reason="Added new task")
|
||||||
|
return task
|
||||||
|
|
||||||
# Create new task and return asyncresult
|
# Create new task and return asyncresult
|
||||||
def addTask(self, inner_path, peer=None, priority=0, file_info=None):
|
def addTask(self, inner_path, peer=None, priority=0, file_info=None):
|
||||||
self.lock_add_task.acquire()
|
|
||||||
self.site.onFileStart(inner_path) # First task, trigger site download started
|
self.site.onFileStart(inner_path) # First task, trigger site download started
|
||||||
task = self.tasks.findTask(inner_path)
|
task = self.tasks.findTask(inner_path)
|
||||||
if task: # Already has task for that file
|
if task: # Already has task for that file
|
||||||
if priority > task["priority"]:
|
self.addTaskUpdate(task, peer, priority)
|
||||||
self.tasks.updateItem(task, "priority", priority)
|
|
||||||
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
|
||||||
task["peers"].append(peer)
|
|
||||||
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
|
||||||
self.startWorkers([peer], reason="Added new task (update received by peer)")
|
|
||||||
elif peer and peer in task["failed"]:
|
|
||||||
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
|
||||||
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
|
||||||
self.startWorkers([peer], reason="Added new task (peer failed before)")
|
|
||||||
else: # No task for that file yet
|
else: # No task for that file yet
|
||||||
evt = gevent.event.AsyncResult()
|
task = self.addTaskCreate(inner_path, peer, priority, file_info)
|
||||||
if peer:
|
|
||||||
peers = [peer] # Only download from this peer
|
|
||||||
else:
|
|
||||||
peers = None
|
|
||||||
if not file_info:
|
|
||||||
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
||||||
if file_info and file_info["optional"]:
|
|
||||||
optional_hash_id = helper.toHashId(file_info["sha512"])
|
|
||||||
else:
|
|
||||||
optional_hash_id = None
|
|
||||||
if file_info:
|
|
||||||
size = file_info.get("size", 0)
|
|
||||||
else:
|
|
||||||
size = 0
|
|
||||||
priority += self.getPriorityBoost(inner_path)
|
|
||||||
|
|
||||||
if self.started_task_num == 0: # Boost priority for first requested file
|
|
||||||
priority += 1
|
|
||||||
|
|
||||||
task = {
|
|
||||||
"id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
|
|
||||||
"optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None,
|
|
||||||
"time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
|
|
||||||
}
|
|
||||||
|
|
||||||
self.tasks.append(task)
|
|
||||||
|
|
||||||
self.next_task_id += 1
|
|
||||||
self.started_task_num += 1
|
|
||||||
if config.verbose:
|
|
||||||
self.log.debug(
|
|
||||||
"New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
|
|
||||||
(task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
|
|
||||||
)
|
|
||||||
|
|
||||||
self.time_task_added = time.time()
|
|
||||||
|
|
||||||
if optional_hash_id:
|
|
||||||
if self.asked_peers:
|
|
||||||
del self.asked_peers[:] # Reset asked peers
|
|
||||||
self.startFindOptional(high_priority=priority > 0)
|
|
||||||
|
|
||||||
if peers:
|
|
||||||
self.startWorkers(peers, reason="Added new optional task")
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.startWorkers(peers, reason="Added new task")
|
|
||||||
self.lock_add_task.release()
|
|
||||||
return task
|
return task
|
||||||
|
|
||||||
def addTaskWorker(self, task, worker):
|
def addTaskWorker(self, task, worker):
|
||||||
|
|
Loading…
Reference in a new issue