From 66a950a48149f4a8be09d75ac408174c3483d0b2 Mon Sep 17 00:00:00 2001
From: shortcutme <tamas@zeronet.io>
Date: Mon, 25 Nov 2019 14:43:28 +0100
Subject: [PATCH] New, much faster worker task sorting

---
 src/Content/ContentManager.py     |   2 +-
 src/Test/TestWorkerTaskManager.py |  92 +++++++++++++++++++++++
 src/Worker/Worker.py              |  11 ++-
 src/Worker/WorkerManager.py       |  38 ++++++----
 src/Worker/WorkerTaskManager.py   | 119 ++++++++++++++++++++++++++++++
 5 files changed, 241 insertions(+), 21 deletions(-)
 create mode 100644 src/Test/TestWorkerTaskManager.py
 create mode 100644 src/Worker/WorkerTaskManager.py

diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py
index a6e00def..fe2a74dd 100644
--- a/src/Content/ContentManager.py
+++ b/src/Content/ContentManager.py
@@ -870,7 +870,7 @@ class ContentManager(object):
             if content_size_file > site_size_limit:
                 # Save site size to display warning
                 self.site.settings["size"] = site_size
-                task = self.site.worker_manager.findTask(inner_path)
+                task = self.site.worker_manager.tasks.findTask(inner_path)
                 if task:  # Dont try to download from other peers
                     self.site.worker_manager.failTask(task)
                 raise VerifyError("Content too large %s B > %s B, aborting task..." % (site_size, site_size_limit))
diff --git a/src/Test/TestWorkerTaskManager.py b/src/Test/TestWorkerTaskManager.py
new file mode 100644
index 00000000..375100c9
--- /dev/null
+++ b/src/Test/TestWorkerTaskManager.py
@@ -0,0 +1,92 @@
+import pytest
+
+from Worker import WorkerTaskManager
+from . import Spy
+
+
+class TestUiWebsocket:
+    def checkSort(self, tasks):  # Check if it has the same order as a list sorted separately
+        tasks_list = list(tasks)
+        tasks_list.sort(key=lambda task: task["id"])
+        assert tasks_list != list(tasks)
+        tasks_list.sort(key=lambda task: (0 - (task["priority"] - task["workers_num"] * 10), task["id"]))
+        assert tasks_list == list(tasks)
+
+    def testAppendSimple(self):
+        tasks = WorkerTaskManager.WorkerTaskManager()
+        tasks.append({"id": 1, "priority": 15, "workers_num": 1, "inner_path": "file1.json"})
+        tasks.append({"id": 2, "priority": 1, "workers_num": 0, "inner_path": "file2.json"})
+        tasks.append({"id": 3, "priority": 8, "workers_num": 0, "inner_path": "file3.json"})
+        assert [task["inner_path"] for task in tasks] == ["file3.json", "file1.json", "file2.json"]
+
+        self.checkSort(tasks)
+
+    def testAppendMany(self):
+        tasks = WorkerTaskManager.WorkerTaskManager()
+        for i in range(1000):
+            tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i})
+        assert tasks[0]["inner_path"] == "file39.json"
+        assert tasks[-1]["inner_path"] == "file980.json"
+
+        self.checkSort(tasks)
+
+    def testRemove(self):
+        tasks = WorkerTaskManager.WorkerTaskManager()
+        for i in range(1000):
+            tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i})
+
+        i = 333
+        task = {"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i}
+        assert task in tasks
+
+        tasks.remove(task)
+
+        assert task not in tasks
+
+        self.checkSort(tasks)
+
+    def testModify(self):
+        tasks = WorkerTaskManager.WorkerTaskManager()
+        for i in range(1000):
+            tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i})
+
+        task = tasks[333]
+        task["priority"] += 10
+
+        with pytest.raises(AssertionError):
+            self.checkSort(tasks)
+
+        with Spy.Spy(tasks, "indexSlow") as calls:
+            tasks.updateItem(task)
+            assert len(calls) == 1
+
+        assert task in tasks
+
+        self.checkSort(tasks)
+
+        # Check reorder optimization
+
+        with Spy.Spy(tasks, "indexSlow") as calls:
+            tasks.updateItem(task, "priority", task["priority"] + 10)
+            assert len(calls) == 0
+
+        self.checkSort(tasks)
+
+    def testIn(self):
+        tasks = WorkerTaskManager.WorkerTaskManager()
+
+        i = 1
+        task = {"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i}
+
+        assert task not in tasks
+
+
+    def testFindTask(self):
+        tasks = WorkerTaskManager.WorkerTaskManager()
+        for i in range(1000):
+            tasks.append({"id": i, "priority": i % 20, "workers_num": i % 3, "inner_path": "file%s.json" % i})
+
+        assert tasks.findTask("file999.json")
+        assert not tasks.findTask("file-unknown.json")
+        tasks.remove(tasks.findTask("file999.json"))
+        assert not tasks.findTask("file999.json")
diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py
index 89c4ccf6..fb6ce443 100644
--- a/src/Worker/Worker.py
+++ b/src/Worker/Worker.py
@@ -80,7 +80,8 @@ class Worker(object):
 
             self.task = task
             site = task["site"]
-            task["workers_num"] += 1
+            self.manager.addTaskWorker(task, self)
+
             error_message = "Unknown error"
             try:
                 buff = self.peer.getFile(site.address, task["inner_path"], task["size"])
@@ -114,6 +115,7 @@ class Worker(object):
                     except Exception as err:
                         self.manager.log.error("%s: Error writing: %s (%s)" % (self.key, task["inner_path"], err))
                         write_error = err
+
                 if task["done"] is False:
                     if write_error:
                         self.manager.failTask(task)
@@ -121,10 +123,11 @@ class Worker(object):
                     else:
                         self.manager.doneTask(task)
                         self.num_downloaded += 1
-                task["workers_num"] -= 1
+
+                self.manager.removeTaskWorker(task, self)
             else:  # Verify failed
                 self.num_failed += 1
-                task["workers_num"] -= 1
+                self.manager.removeTaskWorker(task, self)
                 if self.manager.started_task_num < 50 or config.verbose:
                     self.manager.log.debug(
                         "%s: Verify failed: %s, error: %s, failed peers: %s, workers: %s" %
@@ -162,4 +165,4 @@ class Worker(object):
         if self.thread:
             self.thread.kill(exception=Debug.Notify("Worker stopped"))
         del self.thread
-        self.manager.removeWorker(self)
+        self.manager.removeWorker(self)
\ No newline at end of file
diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py
index c35b4b93..b40852dc 100644
--- a/src/Worker/WorkerManager.py
+++ b/src/Worker/WorkerManager.py
@@ -5,6 +5,7 @@ import collections
 import gevent
 
 from .Worker import Worker
+from .WorkerTaskManager import WorkerTaskManager
 from Config import config
 from util import helper
 from Plugin import PluginManager
@@ -17,8 +18,9 @@ class WorkerManager(object):
     def __init__(self, site):
         self.site = site
         self.workers = {}  # Key: ip:port, Value: Worker.Worker
-        self.tasks = []
-        # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
+        self.tasks = WorkerTaskManager()
+        self.next_task_id = 1
+        # {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
         # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
         self.started_task_num = 0  # Last added task num
         self.asked_peers = []
@@ -115,9 +117,6 @@ class WorkerManager(object):
 
     # Returns the next free or less worked task
     def getTask(self, peer):
-        # Sort tasks by priority and worker numbers
-        self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 10, reverse=True)
-
         for task in self.tasks:  # Find a task
             if task["peers"] and peer not in task["peers"]:
                 continue  # This peer not allowed to pick this task
@@ -212,7 +211,7 @@ class WorkerManager(object):
                 worker = self.addWorker(peer)
 
             if worker:
-                self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), max_workers))
+                self.log.debug("Added worker: %s (rep: %s), workers: %s/%s" % (peer.key, peer.reputation, len(self.workers), max_workers))
 
     # Find peers for optional hash in local hash tables and add to task peers
     def findOptionalTasks(self, optional_tasks, reset_task=False):
@@ -463,9 +462,10 @@ class WorkerManager(object):
     # Create new task and return asyncresult
     def addTask(self, inner_path, peer=None, priority=0, file_info=None):
         self.site.onFileStart(inner_path)  # First task, trigger site download started
-        task = self.findTask(inner_path)
+        task = self.tasks.findTask(inner_path)
         if task:  # Already has task for that file
-            task["priority"] = max(priority, task["priority"])
+            if priority > task["priority"]:
+                self.tasks.updateItem(task, "priority", priority)
             if peer and task["peers"]:  # This peer also has new version, add it to task possible peers
                 task["peers"].append(peer)
                 self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
@@ -497,13 +497,14 @@ class WorkerManager(object):
                 priority += 1
 
             task = {
-                "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
+                "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
                 "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None,
                 "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
             }
 
             self.tasks.append(task)
 
+            self.next_task_id += 1
             self.started_task_num += 1
             if config.verbose:
                 self.log.debug(
@@ -525,12 +526,17 @@ class WorkerManager(object):
                 self.startWorkers(peers, reason="Added new task")
             return task
 
-    # Find a task using inner_path
-    def findTask(self, inner_path):
-        for task in self.tasks:
-            if task["inner_path"] == inner_path:
-                return task
-        return None  # Not found
+    def addTaskWorker(self, task, worker):
+        if task in self.tasks:
+            self.tasks.updateItem(task, "workers_num", task["workers_num"] + 1)
+        else:
+            task["workers_num"] += 1
+
+    def removeTaskWorker(self, task, worker):
+        if task in self.tasks:
+            self.tasks.updateItem(task, "workers_num", task["workers_num"] - 1)
+        else:
+            task["workers_num"] -= 1
 
     # Wait for other tasks
     def checkComplete(self):
@@ -567,4 +573,4 @@ class WorkerManager(object):
             self.site.onFileFail(task["inner_path"])
             task["evt"].set(False)
             if not self.tasks:
-                self.started_task_num = 0
+                self.started_task_num = 0
\ No newline at end of file
diff --git a/src/Worker/WorkerTaskManager.py b/src/Worker/WorkerTaskManager.py
new file mode 100644
index 00000000..791f5217
--- /dev/null
+++ b/src/Worker/WorkerTaskManager.py
@@ -0,0 +1,119 @@
+import bisect
+from collections.abc import MutableSequence
+
+
+class CustomSortedList(MutableSequence):
+    def __init__(self):
+        super().__init__()
+        self.items = []  # (priority, added index, actual value)
+        self.logging = False
+
+    def __repr__(self):
+        return "<{0} {1}>".format(self.__class__.__name__, self.items)
+
+    def __len__(self):
+        return len(self.items)
+
+    def __getitem__(self, index):
+        if self.logging:
+            print("getitem", index)
+        if type(index) is int:
+            return self.items[index][2]
+        else:
+            return [item[2] for item in self.items[index]]
+
+    def __delitem__(self, index):
+        if self.logging:
+            print("delitem", index)
+        del self.items[index]
+
+    def __setitem__(self, index, value):
+        self.items[index] = self.valueToItem(value)
+
+    def __str__(self):
+        return str(self[:])
+
+    def insert(self, index, value):
+        self.append(value)
+
+    def append(self, value):
+        bisect.insort(self.items, self.valueToItem(value))
+
+    def updateItem(self, value, update_key=None, update_value=None):
+        self.remove(value)
+        if update_key:
+            value[update_key] = update_value
+        self.append(value)
+
+    def sort(self, *args, **kwargs):
+        raise Exception("Sorted list can't be sorted")
+
+    def valueToItem(self, value):
+        return (self.getPriority(value), self.getId(value), value)
+
+    def getPriority(self, value):
+        return value
+
+    def getId(self, value):
+        return id(value)
+
+    def indexSlow(self, value):
+        for pos, item in enumerate(self.items):
+            if item[2] == value:
+                return pos
+        return None
+
+    def index(self, value):
+        item = (self.getPriority(value), self.getId(value), value)
+        bisect_pos = bisect.bisect(self.items, item) - 1
+        if bisect_pos >= 0 and self.items[bisect_pos][2] == value:
+            if self.logging:
+                print("Fast index for", value)
+            return bisect_pos
+
+        # Item probably changed since added, switch to slow iteration
+        pos = self.indexSlow(value)
+        if pos is not None:
+            if self.logging:
+                print("Slow index for %s in pos %s bisect: %s" % (item[2], pos, bisect_pos))
+            return pos
+        raise ValueError("%r not in list" % value)
+
+    def __contains__(self, value):
+        try:
+            self.index(value)
+            return True
+        except ValueError:
+            return False
+
+
+class WorkerTaskManager(CustomSortedList):
+    def __init__(self):
+        super().__init__()
+        self.inner_paths = {}
+
+    def getPriority(self, value):
+        return 0 - (value["priority"] - value["workers_num"] * 10)
+
+    def getId(self, value):
+        return value["id"]
+
+    def __contains__(self, value):
+        return value["inner_path"] in self.inner_paths
+
+    # Fast task search by inner_path
+
+    def append(self, task):
+        if task["inner_path"] in self.inner_paths:
+            raise ValueError("File %s already has a task" % task["inner_path"])
+        super().append(task)
+        # Create inner path cache for faster lookup by filename
+        self.inner_paths[task["inner_path"]] = task
+
+    def __delitem__(self, index):
+        # Remove from inner path cache
+        del self.inner_paths[self.items[index][2]["inner_path"]]
+        super().__delitem__(index)
+
+    def findTask(self, inner_path):
+        return self.inner_paths.get(inner_path, None)