From 58214c0ac30b84a7b19e0278440bf28b7f4e01ce Mon Sep 17 00:00:00 2001 From: shortcutme Date: Tue, 19 Nov 2019 02:16:20 +0100 Subject: [PATCH] Move file writes and reads to separate thread --- src/Site/SiteStorage.py | 23 ++++++++++++++++++++--- src/Test/TestThreadPool.py | 29 +++++++++++++++++++++++++++++ src/util/ThreadPool.py | 22 ++++++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 src/Test/TestThreadPool.py create mode 100644 src/util/ThreadPool.py diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py index 3e4d9c74..49d9e7d1 100644 --- a/src/Site/SiteStorage.py +++ b/src/Site/SiteStorage.py @@ -14,10 +14,15 @@ from Db.Db import Db from Debug import Debug from Config import config from util import helper +from util import ThreadPool from Plugin import PluginManager from Translate import translate as _ +thread_pool_fs_read = ThreadPool.ThreadPool(config.threads_fs_read) +thread_pool_fs_write = ThreadPool.ThreadPool(config.threads_fs_write) + + @PluginManager.acceptPlugins class SiteStorage(object): def __init__(self, site, allow_create=True): @@ -44,6 +49,7 @@ class SiteStorage(object): return False # Create new databaseobject with the site's schema + @util.Noparallel() def openDb(self, close_idle=False): schema = self.getDbSchema() db_path = self.getPath(schema["db_file"]) @@ -95,6 +101,7 @@ class SiteStorage(object): return self.getDb().updateJson(path, file, cur) # Return possible db files for the site + @thread_pool_fs_read.wrap def getDbFiles(self): found = 0 for content_inner_path, content in self.site.content_manager.contents.items(): @@ -120,6 +127,7 @@ class SiteStorage(object): # Rebuild sql cache @util.Noparallel() + @thread_pool_fs_write.wrap def rebuildDb(self, delete_db=True): self.log.info("Rebuilding db...") self.has_db = self.isFile("dbschema.json") @@ -227,11 +235,12 @@ class SiteStorage(object): return open(file_path, mode, **kwargs) # Open file object + @thread_pool_fs_read.wrap def read(self, inner_path, mode="rb"): return open(self.getPath(inner_path), mode).read() - # Write content to file - def write(self, inner_path, content): + @thread_pool_fs_write.wrap + def writeThread(self, inner_path, content): file_path = self.getPath(inner_path) # Create dir if not exist file_dir = os.path.dirname(file_path) @@ -247,7 +256,10 @@ class SiteStorage(object): else: with open(file_path, "wb") as file: file.write(content) - del content + + # Write content to file + def write(self, inner_path, content): + self.writeThread(inner_path, content) self.onUpdated(inner_path) # Remove file from filesystem @@ -275,6 +287,7 @@ class SiteStorage(object): raise rename_err # List files from a directory + @thread_pool_fs_read.wrap def walk(self, dir_inner_path, ignore=None): directory = self.getPath(dir_inner_path) for root, dirs, files in os.walk(directory): @@ -307,6 +320,7 @@ class SiteStorage(object): dirs[:] = dirs_filtered # list directories in a directory + @thread_pool_fs_read.wrap def list(self, dir_inner_path): directory = self.getPath(dir_inner_path) return os.listdir(directory) @@ -331,11 +345,13 @@ class SiteStorage(object): self.closeDb() # Load and parse json file + @thread_pool_fs_read.wrap def loadJson(self, inner_path): with self.open(inner_path, "r", encoding="utf8") as file: return json.load(file) # Write formatted json file + @thread_pool_fs_write.wrap def writeJson(self, inner_path, data): # Write to disk self.write(inner_path, helper.jsonDumps(data).encode("utf8")) @@ -499,6 +515,7 @@ class SiteStorage(object): self.log.debug("Checked files in %.2fs... Found bad files: %s, Quick:%s" % (time.time() - s, len(bad_files), quick_check)) # Delete site's all file + @thread_pool_fs_write.wrap def deleteFiles(self): self.log.debug("Deleting files from content.json...") files = [] # Get filenames diff --git a/src/Test/TestThreadPool.py b/src/Test/TestThreadPool.py new file mode 100644 index 00000000..b237d93a --- /dev/null +++ b/src/Test/TestThreadPool.py @@ -0,0 +1,29 @@ +import gevent + +from util import ThreadPool + + +class TestThreadPool: + def testExecutionOrder(self): + pool = ThreadPool.ThreadPool(4) + + events = [] + + @pool.wrap + def blocker(): + events.append("S") + out = 0 + for i in range(1000000): + out += 1 + events.append("D") + return out + + threads = [] + for i in range(4): + threads.append(gevent.spawn(blocker)) + gevent.joinall(threads) + + assert events == ["S"] * 4 + ["D"] * 4, events + + res = blocker() + assert res == 1000000 diff --git a/src/util/ThreadPool.py b/src/util/ThreadPool.py new file mode 100644 index 00000000..8fbb12fd --- /dev/null +++ b/src/util/ThreadPool.py @@ -0,0 +1,22 @@ +import gevent.threadpool + + +class ThreadPool: + def __init__(self, max_size): + self.setMaxSize(max_size) + + def setMaxSize(self, max_size): + self.max_size = max_size + if max_size > 0: + self.pool = gevent.threadpool.ThreadPool(max_size) + else: + self.pool = None + + def wrap(self, func): + if self.pool is None: + return func + + def wrapper(*args, **kwargs): + return self.pool.apply(func, args, kwargs) + + return wrapper