diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index a6e00def..fe2a74dd 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -870,7 +870,7 @@ class ContentManager(object): if content_size_file > site_size_limit: # Save site size to display warning self.site.settings["size"] = site_size - task = self.site.worker_manager.findTask(inner_path) + task = self.site.worker_manager.tasks.findTask(inner_path) if task: # Dont try to download from other peers self.site.worker_manager.failTask(task) raise VerifyError("Content too large %s B > %s B, aborting task..." % (site_size, site_size_limit)) diff --git a/src/Test/TestWorkerTaskManager.py b/src/Test/TestWorkerTaskManager.py new file mode 100644 index 00000000..375100c9 --- /dev/null +++ b/src/Test/TestWorkerTaskManager.py @@ -0,0 +1,92 @@ +import pytest + +from Worker import WorkerTaskManager +from . import Spy + + +class TestUiWebsocket: + def checkSort(self, tasks): # Check if it has the same order as a list sorted separately + tasks_list = list(tasks) + tasks_list.sort(key=lambda task: task["id"]) + assert tasks_list != list(tasks) + tasks_list.sort(key=lambda task: (0 - (task["priority"] - task["workers_num"] * 10), task["id"])) + assert tasks_list == list(tasks) + + def testAppendSimple(self): + tasks = WorkerTaskManager.WorkerTaskManager() + tasks.append({"id": 1, "priority": 15, "workers_num": 1, "inner_path": "file1.json"}) + tasks.append({"id": 2, "priority": 1, "workers_num": 0, "inner_path": "file2.json"}) + tasks.append({"id": 3, "priority": 8, "workers_num": 0, "inner_path": "file3.json"}) + assert [task["inner_path"] for task in tasks] == ["file3.json", "file1.json", "file2.json"] + + self.checkSort(tasks) + + def testAppendMany(self): + tasks = WorkerTaskManager.WorkerTaskManager() + for i in range(1000): + tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i}) + assert tasks[0]["inner_path"] == "file39.json" + assert tasks[-1]["inner_path"] == "file980.json" + + self.checkSort(tasks) + + def testRemove(self): + tasks = WorkerTaskManager.WorkerTaskManager() + for i in range(1000): + tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i}) + + i = 333 + task = {"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i} + assert task in tasks + + tasks.remove(task) + + assert task not in tasks + + self.checkSort(tasks) + + def testModify(self): + tasks = WorkerTaskManager.WorkerTaskManager() + for i in range(1000): + tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i}) + + task = tasks[333] + task["priority"] += 10 + + with pytest.raises(AssertionError): + self.checkSort(tasks) + + with Spy.Spy(tasks, "indexSlow") as calls: + tasks.updateItem(task) + assert len(calls) == 1 + + assert task in tasks + + self.checkSort(tasks) + + # Check reorder optimization + + with Spy.Spy(tasks, "indexSlow") as calls: + tasks.updateItem(task, "priority", task["priority"] + 10) + assert len(calls) == 0 + + self.checkSort(tasks) + + def testIn(self): + tasks = WorkerTaskManager.WorkerTaskManager() + + i = 1 + task = {"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i} + + assert task not in tasks + + + def testFindTask(self): + tasks = WorkerTaskManager.WorkerTaskManager() + for i in range(1000): + tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i}) + + assert tasks.findTask("file999.json") + assert not tasks.findTask("file-unknown.json") + tasks.remove(tasks.findTask("file999.json")) + assert not tasks.findTask("file999.json") diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index 89c4ccf6..fb6ce443 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -80,7 +80,8 @@ class Worker(object): self.task = task site = task["site"] - task["workers_num"] += 1 + self.manager.addTaskWorker(task, self) + error_message = "Unknown error" try: buff = self.peer.getFile(site.address, task["inner_path"], task["size"]) @@ -114,6 +115,7 @@ class Worker(object): except Exception as err: self.manager.log.error("%s: Error writing: %s (%s)" % (self.key, task["inner_path"], err)) write_error = err + if task["done"] is False: if write_error: self.manager.failTask(task) @@ -121,10 +123,11 @@ class Worker(object): else: self.manager.doneTask(task) self.num_downloaded += 1 - task["workers_num"] -= 1 + + self.manager.removeTaskWorker(task, self) else: # Verify failed self.num_failed += 1 - task["workers_num"] -= 1 + self.manager.removeTaskWorker(task, self) if self.manager.started_task_num < 50 or config.verbose: self.manager.log.debug( "%s: Verify failed: %s, error: %s, failed peers: %s, workers: %s" % @@ -162,4 +165,4 @@ class Worker(object): if self.thread: self.thread.kill(exception=Debug.Notify("Worker stopped")) del self.thread - self.manager.removeWorker(self) + self.manager.removeWorker(self) \ No newline at end of file diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index c35b4b93..b40852dc 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -5,6 +5,7 @@ import collections import gevent from .Worker import Worker +from .WorkerTaskManager import WorkerTaskManager from Config import config from util import helper from Plugin import PluginManager @@ -17,8 +18,9 @@ class WorkerManager(object): def __init__(self, site): self.site = site self.workers = {} # Key: ip:port, Value: Worker.Worker - self.tasks = [] - # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, + self.tasks = WorkerTaskManager() + self.next_task_id = 1 + # {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids} self.started_task_num = 0 # Last added task num self.asked_peers = [] @@ -115,9 +117,6 @@ class WorkerManager(object): # Returns the next free or less worked task def getTask(self, peer): - # Sort tasks by priority and worker numbers - self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 10, reverse=True) - for task in self.tasks: # Find a task if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task @@ -212,7 +211,7 @@ class WorkerManager(object): worker = self.addWorker(peer) if worker: - self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), max_workers)) + self.log.debug("Added worker: %s (rep: %s), workers: %s/%s" % (peer.key, peer.reputation, len(self.workers), max_workers)) # Find peers for optional hash in local hash tables and add to task peers def findOptionalTasks(self, optional_tasks, reset_task=False): @@ -463,9 +462,10 @@ class WorkerManager(object): # Create new task and return asyncresult def addTask(self, inner_path, peer=None, priority=0, file_info=None): self.site.onFileStart(inner_path) # First task, trigger site download started - task = self.findTask(inner_path) + task = self.tasks.findTask(inner_path) if task: # Already has task for that file - task["priority"] = max(priority, task["priority"]) + if priority > task["priority"]: + self.tasks.updateItem(task, "priority", priority) if peer and task["peers"]: # This peer also has new version, add it to task possible peers task["peers"].append(peer) self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"])) @@ -497,13 +497,14 @@ class WorkerManager(object): priority += 1 task = { - "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, + "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size } self.tasks.append(task) + self.next_task_id += 1 self.started_task_num += 1 if config.verbose: self.log.debug( @@ -525,12 +526,17 @@ class WorkerManager(object): self.startWorkers(peers, reason="Added new task") return task - # Find a task using inner_path - def findTask(self, inner_path): - for task in self.tasks: - if task["inner_path"] == inner_path: - return task - return None # Not found + def addTaskWorker(self, task, worker): + if task in self.tasks: + self.tasks.updateItem(task, "workers_num", task["workers_num"] + 1) + else: + task["workers_num"] += 1 + + def removeTaskWorker(self, task, worker): + if task in self.tasks: + self.tasks.updateItem(task, "workers_num", task["workers_num"] - 1) + else: + task["workers_num"] -= 1 # Wait for other tasks def checkComplete(self): @@ -567,4 +573,4 @@ class WorkerManager(object): self.site.onFileFail(task["inner_path"]) task["evt"].set(False) if not self.tasks: - self.started_task_num = 0 + self.started_task_num = 0 \ No newline at end of file diff --git a/src/Worker/WorkerTaskManager.py b/src/Worker/WorkerTaskManager.py new file mode 100644 index 00000000..791f5217 --- /dev/null +++ b/src/Worker/WorkerTaskManager.py @@ -0,0 +1,119 @@ +import bisect +from collections.abc import MutableSequence + + +class CustomSortedList(MutableSequence): + def __init__(self): + super().__init__() + self.items = [] # (priority, added index, actual value) + self.logging = False + + def __repr__(self): + return "<{0} {1}>".format(self.__class__.__name__, self.items) + + def __len__(self): + return len(self.items) + + def __getitem__(self, index): + if self.logging: + print("getitem", index) + if type(index) is int: + return self.items[index][2] + else: + return [item[2] for item in self.items[index]] + + def __delitem__(self, index): + if self.logging: + print("delitem", index) + del self.items[index] + + def __setitem__(self, index, value): + self.items[index] = self.valueToItem(value) + + def __str__(self): + return str(self[:]) + + def insert(self, index, value): + self.append(value) + + def append(self, value): + bisect.insort(self.items, self.valueToItem(value)) + + def updateItem(self, value, update_key=None, update_value=None): + self.remove(value) + if update_key: + value[update_key] = update_value + self.append(value) + + def sort(self, *args, **kwargs): + raise Exception("Sorted list can't be sorted") + + def valueToItem(self, value): + return (self.getPriority(value), self.getId(value), value) + + def getPriority(self, value): + return value + + def getId(self, value): + return id(value) + + def indexSlow(self, value): + for pos, item in enumerate(self.items): + if item[2] == value: + return pos + return None + + def index(self, value): + item = (self.getPriority(value), self.getId(value), value) + bisect_pos = bisect.bisect(self.items, item) - 1 + if bisect_pos >= 0 and self.items[bisect_pos][2] == value: + if self.logging: + print("Fast index for", value) + return bisect_pos + + # Item probably changed since added, switch to slow iteration + pos = self.indexSlow(value) + if pos is not None: + if self.logging: + print("Slow index for %s in pos %s bisect: %s" % (item[2], pos, bisect_pos)) + return pos + raise ValueError("%r not in list" % value) + + def __contains__(self, value): + try: + self.index(value) + return True + except ValueError: + return False + + +class WorkerTaskManager(CustomSortedList): + def __init__(self): + super().__init__() + self.inner_paths = {} + + def getPriority(self, value): + return 0 - (value["priority"] - value["workers_num"] * 10) + + def getId(self, value): + return value["id"] + + def __contains__(self, value): + return value["inner_path"] in self.inner_paths + + # Fast task search by inner_path + + def append(self, task): + if task["inner_path"] in self.inner_paths: + raise ValueError("File %s already has a task" % task["inner_path"]) + super().append(task) + # Create inner path cache for faster lookup by filename + self.inner_paths[task["inner_path"]] = task + + def __delitem__(self, index): + # Remove from inner path cache + del self.inner_paths[self.items[index][2]["inner_path"]] + super().__delitem__(index) + + def findTask(self, inner_path): + return self.inner_paths.get(inner_path, None)