From b2e7cbb927bd6c6c47cdc3c5c1cb8bddd0bef938 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Tue, 31 Dec 2019 12:51:52 +0100 Subject: [PATCH] Refactor task adding with less locking --- src/Worker/WorkerManager.py | 135 ++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 60 deletions(-) diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index c4d11b25..cf4b4bd8 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -460,72 +460,87 @@ class WorkerManager(object): return 2 return 0 + def addTaskUpdate(self, task, peer, priority=0): + if priority > task["priority"]: + self.tasks.updateItem(task, "priority", priority) + if peer and task["peers"]: # This peer also has new version, add it to task possible peers + task["peers"].append(peer) + self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"])) + self.startWorkers([peer], reason="Added new task (update received by peer)") + elif peer and peer in task["failed"]: + task["failed"].remove(peer) # New update arrived, remove the peer from failed peers + self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"])) + self.startWorkers([peer], reason="Added new task (peer failed before)") + + def addTaskCreate(self, inner_path, peer, priority=0, file_info=None): + evt = gevent.event.AsyncResult() + if peer: + peers = [peer] # Only download from this peer + else: + peers = None + if not file_info: + file_info = self.site.content_manager.getFileInfo(inner_path) + if file_info and file_info["optional"]: + optional_hash_id = helper.toHashId(file_info["sha512"]) + else: + optional_hash_id = None + if file_info: + size = file_info.get("size", 0) + else: + size = 0 + + self.lock_add_task.acquire() + + # Check again if we have task for this file + task = self.tasks.findTask(inner_path) + if task: + self.addTaskUpdate(task, peer, priority) + return task + + priority += self.getPriorityBoost(inner_path) + + if self.started_task_num == 0: # Boost priority for first requested file + priority += 1 + + task = { + "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, + "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None, + "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size + } + + self.tasks.append(task) + self.lock_add_task.release() + + self.next_task_id += 1 + self.started_task_num += 1 + if config.verbose: + self.log.debug( + "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" % + (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num) + ) + + self.time_task_added = time.time() + + if optional_hash_id: + if self.asked_peers: + del self.asked_peers[:] # Reset asked peers + self.startFindOptional(high_priority=priority > 0) + + if peers: + self.startWorkers(peers, reason="Added new optional task") + + else: + self.startWorkers(peers, reason="Added new task") + return task + # Create new task and return asyncresult def addTask(self, inner_path, peer=None, priority=0, file_info=None): - self.lock_add_task.acquire() self.site.onFileStart(inner_path) # First task, trigger site download started task = self.tasks.findTask(inner_path) if task: # Already has task for that file - if priority > task["priority"]: - self.tasks.updateItem(task, "priority", priority) - if peer and task["peers"]: # This peer also has new version, add it to task possible peers - task["peers"].append(peer) - self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"])) - self.startWorkers([peer], reason="Added new task (update received by peer)") - elif peer and peer in task["failed"]: - task["failed"].remove(peer) # New update arrived, remove the peer from failed peers - self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"])) - self.startWorkers([peer], reason="Added new task (peer failed before)") + self.addTaskUpdate(task, peer, priority) else: # No task for that file yet - evt = gevent.event.AsyncResult() - if peer: - peers = [peer] # Only download from this peer - else: - peers = None - if not file_info: - file_info = self.site.content_manager.getFileInfo(inner_path) - if file_info and file_info["optional"]: - optional_hash_id = helper.toHashId(file_info["sha512"]) - else: - optional_hash_id = None - if file_info: - size = file_info.get("size", 0) - else: - size = 0 - priority += self.getPriorityBoost(inner_path) - - if self.started_task_num == 0: # Boost priority for first requested file - priority += 1 - - task = { - "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, - "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None, - "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size - } - - self.tasks.append(task) - - self.next_task_id += 1 - self.started_task_num += 1 - if config.verbose: - self.log.debug( - "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" % - (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num) - ) - - self.time_task_added = time.time() - - if optional_hash_id: - if self.asked_peers: - del self.asked_peers[:] # Reset asked peers - self.startFindOptional(high_priority=priority > 0) - - if peers: - self.startWorkers(peers, reason="Added new optional task") - - else: - self.startWorkers(peers, reason="Added new task") - self.lock_add_task.release() + task = self.addTaskCreate(inner_path, peer, priority, file_info) return task def addTaskWorker(self, task, worker):