Move file writes and reads to separate thread
This commit is contained in:
parent
5d113757df
commit
58214c0ac3
3 changed files with 71 additions and 3 deletions
|
@ -14,10 +14,15 @@ from Db.Db import Db
|
||||||
from Debug import Debug
|
from Debug import Debug
|
||||||
from Config import config
|
from Config import config
|
||||||
from util import helper
|
from util import helper
|
||||||
|
from util import ThreadPool
|
||||||
from Plugin import PluginManager
|
from Plugin import PluginManager
|
||||||
from Translate import translate as _
|
from Translate import translate as _
|
||||||
|
|
||||||
|
|
||||||
|
thread_pool_fs_read = ThreadPool.ThreadPool(config.threads_fs_read)
|
||||||
|
thread_pool_fs_write = ThreadPool.ThreadPool(config.threads_fs_write)
|
||||||
|
|
||||||
|
|
||||||
@PluginManager.acceptPlugins
|
@PluginManager.acceptPlugins
|
||||||
class SiteStorage(object):
|
class SiteStorage(object):
|
||||||
def __init__(self, site, allow_create=True):
|
def __init__(self, site, allow_create=True):
|
||||||
|
@ -44,6 +49,7 @@ class SiteStorage(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Create new databaseobject with the site's schema
|
# Create new databaseobject with the site's schema
|
||||||
|
@util.Noparallel()
|
||||||
def openDb(self, close_idle=False):
|
def openDb(self, close_idle=False):
|
||||||
schema = self.getDbSchema()
|
schema = self.getDbSchema()
|
||||||
db_path = self.getPath(schema["db_file"])
|
db_path = self.getPath(schema["db_file"])
|
||||||
|
@ -95,6 +101,7 @@ class SiteStorage(object):
|
||||||
return self.getDb().updateJson(path, file, cur)
|
return self.getDb().updateJson(path, file, cur)
|
||||||
|
|
||||||
# Return possible db files for the site
|
# Return possible db files for the site
|
||||||
|
@thread_pool_fs_read.wrap
|
||||||
def getDbFiles(self):
|
def getDbFiles(self):
|
||||||
found = 0
|
found = 0
|
||||||
for content_inner_path, content in self.site.content_manager.contents.items():
|
for content_inner_path, content in self.site.content_manager.contents.items():
|
||||||
|
@ -120,6 +127,7 @@ class SiteStorage(object):
|
||||||
|
|
||||||
# Rebuild sql cache
|
# Rebuild sql cache
|
||||||
@util.Noparallel()
|
@util.Noparallel()
|
||||||
|
@thread_pool_fs_write.wrap
|
||||||
def rebuildDb(self, delete_db=True):
|
def rebuildDb(self, delete_db=True):
|
||||||
self.log.info("Rebuilding db...")
|
self.log.info("Rebuilding db...")
|
||||||
self.has_db = self.isFile("dbschema.json")
|
self.has_db = self.isFile("dbschema.json")
|
||||||
|
@ -227,11 +235,12 @@ class SiteStorage(object):
|
||||||
return open(file_path, mode, **kwargs)
|
return open(file_path, mode, **kwargs)
|
||||||
|
|
||||||
# Open file object
|
# Open file object
|
||||||
|
@thread_pool_fs_read.wrap
|
||||||
def read(self, inner_path, mode="rb"):
|
def read(self, inner_path, mode="rb"):
|
||||||
return open(self.getPath(inner_path), mode).read()
|
return open(self.getPath(inner_path), mode).read()
|
||||||
|
|
||||||
# Write content to file
|
@thread_pool_fs_write.wrap
|
||||||
def write(self, inner_path, content):
|
def writeThread(self, inner_path, content):
|
||||||
file_path = self.getPath(inner_path)
|
file_path = self.getPath(inner_path)
|
||||||
# Create dir if not exist
|
# Create dir if not exist
|
||||||
file_dir = os.path.dirname(file_path)
|
file_dir = os.path.dirname(file_path)
|
||||||
|
@ -247,7 +256,10 @@ class SiteStorage(object):
|
||||||
else:
|
else:
|
||||||
with open(file_path, "wb") as file:
|
with open(file_path, "wb") as file:
|
||||||
file.write(content)
|
file.write(content)
|
||||||
del content
|
|
||||||
|
# Write content to file
|
||||||
|
def write(self, inner_path, content):
|
||||||
|
self.writeThread(inner_path, content)
|
||||||
self.onUpdated(inner_path)
|
self.onUpdated(inner_path)
|
||||||
|
|
||||||
# Remove file from filesystem
|
# Remove file from filesystem
|
||||||
|
@ -275,6 +287,7 @@ class SiteStorage(object):
|
||||||
raise rename_err
|
raise rename_err
|
||||||
|
|
||||||
# List files from a directory
|
# List files from a directory
|
||||||
|
@thread_pool_fs_read.wrap
|
||||||
def walk(self, dir_inner_path, ignore=None):
|
def walk(self, dir_inner_path, ignore=None):
|
||||||
directory = self.getPath(dir_inner_path)
|
directory = self.getPath(dir_inner_path)
|
||||||
for root, dirs, files in os.walk(directory):
|
for root, dirs, files in os.walk(directory):
|
||||||
|
@ -307,6 +320,7 @@ class SiteStorage(object):
|
||||||
dirs[:] = dirs_filtered
|
dirs[:] = dirs_filtered
|
||||||
|
|
||||||
# list directories in a directory
|
# list directories in a directory
|
||||||
|
@thread_pool_fs_read.wrap
|
||||||
def list(self, dir_inner_path):
|
def list(self, dir_inner_path):
|
||||||
directory = self.getPath(dir_inner_path)
|
directory = self.getPath(dir_inner_path)
|
||||||
return os.listdir(directory)
|
return os.listdir(directory)
|
||||||
|
@ -331,11 +345,13 @@ class SiteStorage(object):
|
||||||
self.closeDb()
|
self.closeDb()
|
||||||
|
|
||||||
# Load and parse json file
|
# Load and parse json file
|
||||||
|
@thread_pool_fs_read.wrap
|
||||||
def loadJson(self, inner_path):
|
def loadJson(self, inner_path):
|
||||||
with self.open(inner_path, "r", encoding="utf8") as file:
|
with self.open(inner_path, "r", encoding="utf8") as file:
|
||||||
return json.load(file)
|
return json.load(file)
|
||||||
|
|
||||||
# Write formatted json file
|
# Write formatted json file
|
||||||
|
@thread_pool_fs_write.wrap
|
||||||
def writeJson(self, inner_path, data):
|
def writeJson(self, inner_path, data):
|
||||||
# Write to disk
|
# Write to disk
|
||||||
self.write(inner_path, helper.jsonDumps(data).encode("utf8"))
|
self.write(inner_path, helper.jsonDumps(data).encode("utf8"))
|
||||||
|
@ -499,6 +515,7 @@ class SiteStorage(object):
|
||||||
self.log.debug("Checked files in %.2fs... Found bad files: %s, Quick:%s" % (time.time() - s, len(bad_files), quick_check))
|
self.log.debug("Checked files in %.2fs... Found bad files: %s, Quick:%s" % (time.time() - s, len(bad_files), quick_check))
|
||||||
|
|
||||||
# Delete site's all file
|
# Delete site's all file
|
||||||
|
@thread_pool_fs_write.wrap
|
||||||
def deleteFiles(self):
|
def deleteFiles(self):
|
||||||
self.log.debug("Deleting files from content.json...")
|
self.log.debug("Deleting files from content.json...")
|
||||||
files = [] # Get filenames
|
files = [] # Get filenames
|
||||||
|
|
29
src/Test/TestThreadPool.py
Normal file
29
src/Test/TestThreadPool.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
import gevent
|
||||||
|
|
||||||
|
from util import ThreadPool
|
||||||
|
|
||||||
|
|
||||||
|
class TestThreadPool:
|
||||||
|
def testExecutionOrder(self):
|
||||||
|
pool = ThreadPool.ThreadPool(4)
|
||||||
|
|
||||||
|
events = []
|
||||||
|
|
||||||
|
@pool.wrap
|
||||||
|
def blocker():
|
||||||
|
events.append("S")
|
||||||
|
out = 0
|
||||||
|
for i in range(1000000):
|
||||||
|
out += 1
|
||||||
|
events.append("D")
|
||||||
|
return out
|
||||||
|
|
||||||
|
threads = []
|
||||||
|
for i in range(4):
|
||||||
|
threads.append(gevent.spawn(blocker))
|
||||||
|
gevent.joinall(threads)
|
||||||
|
|
||||||
|
assert events == ["S"] * 4 + ["D"] * 4, events
|
||||||
|
|
||||||
|
res = blocker()
|
||||||
|
assert res == 1000000
|
22
src/util/ThreadPool.py
Normal file
22
src/util/ThreadPool.py
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
import gevent.threadpool
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadPool:
|
||||||
|
def __init__(self, max_size):
|
||||||
|
self.setMaxSize(max_size)
|
||||||
|
|
||||||
|
def setMaxSize(self, max_size):
|
||||||
|
self.max_size = max_size
|
||||||
|
if max_size > 0:
|
||||||
|
self.pool = gevent.threadpool.ThreadPool(max_size)
|
||||||
|
else:
|
||||||
|
self.pool = None
|
||||||
|
|
||||||
|
def wrap(self, func):
|
||||||
|
if self.pool is None:
|
||||||
|
return func
|
||||||
|
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
return self.pool.apply(func, args, kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
Loading…
Reference in a new issue