Cache file sizes and modification date in sqlite db
This commit is contained in:
parent
227751e455
commit
ecb5885dba
4 changed files with 267 additions and 2 deletions
116
src/Content/ContentDb.py
Normal file
116
src/Content/ContentDb.py
Normal file
|
@ -0,0 +1,116 @@
|
|||
import time
|
||||
|
||||
from Db import Db
|
||||
from Config import config
|
||||
|
||||
|
||||
class ContentDb(Db):
|
||||
def __init__(self):
|
||||
self.version = 4
|
||||
super(ContentDb, self).__init__({"db_name": "ContentDb"}, "%s/content.db" % config.data_dir)
|
||||
self.foreign_keys = True
|
||||
self.checkTables()
|
||||
self.site_ids = {}
|
||||
|
||||
def checkTables(self):
|
||||
s = time.time()
|
||||
version = int(self.execute("PRAGMA user_version").fetchone()[0])
|
||||
self.log.debug("Db version: %s, needed: %s" % (version, self.version))
|
||||
if version < self.version:
|
||||
self.createTables()
|
||||
else:
|
||||
self.execute("VACUUM")
|
||||
self.log.debug("Check tables in %.3fs" % (time.time() - s))
|
||||
|
||||
def createTables(self):
|
||||
# Delete all tables
|
||||
self.execute("PRAGMA writable_schema = 1")
|
||||
self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')")
|
||||
self.execute("PRAGMA writable_schema = 0")
|
||||
self.execute("VACUUM")
|
||||
self.execute("PRAGMA INTEGRITY_CHECK")
|
||||
# Create new tables
|
||||
self.execute("""
|
||||
CREATE TABLE site (
|
||||
site_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE,
|
||||
address TEXT NOT NULL
|
||||
);
|
||||
""")
|
||||
self.execute("CREATE UNIQUE INDEX site_address ON site (address);")
|
||||
|
||||
self.execute("""
|
||||
CREATE TABLE content (
|
||||
content_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
|
||||
site_id INTEGER REFERENCES site (site_id) ON DELETE CASCADE,
|
||||
inner_path TEXT,
|
||||
size INTEGER,
|
||||
size_files INTEGER,
|
||||
size_files_optional INTEGER,
|
||||
modified INTEGER
|
||||
);
|
||||
""")
|
||||
self.execute("CREATE UNIQUE INDEX content_key ON content (site_id, inner_path);")
|
||||
self.execute("CREATE INDEX content_modified ON content (site_id, modified);")
|
||||
|
||||
self.execute("PRAGMA user_version = %s" % self.version)
|
||||
|
||||
def needSite(self, site_address):
|
||||
if site_address not in self.site_ids:
|
||||
self.execute("INSERT OR IGNORE INTO site ?", {"address": site_address})
|
||||
for row in self.execute("SELECT * FROM site"):
|
||||
self.site_ids[row["address"]] = row["site_id"]
|
||||
return self.site_ids[site_address]
|
||||
|
||||
def deleteSite(self, site_address):
|
||||
site_id = self.site_ids[site_address]
|
||||
self.execute("DELETE FROM site WHERE site_id = :site_id", {"site_id": site_id})
|
||||
del self.site_ids[site_address]
|
||||
|
||||
def setContent(self, site_address, inner_path, content, size=0):
|
||||
self.execute("INSERT OR REPLACE INTO content ?", {
|
||||
"site_id": self.site_ids[site_address],
|
||||
"inner_path": inner_path,
|
||||
"size": size,
|
||||
"size_files": sum([val["size"] for key, val in content.get("files", {}).iteritems()]),
|
||||
"size_files_optional": sum([val["size"] for key, val in content.get("files_optional", {}).iteritems()]),
|
||||
"modified": int(content["modified"])
|
||||
})
|
||||
|
||||
def deleteContent(self, site_address, inner_path):
|
||||
self.execute("DELETE FROM content WHERE ?", {"site_id": self.site_ids[site_address], "inner_path": inner_path})
|
||||
|
||||
def loadDbDict(self, site_address):
|
||||
res = self.execute(
|
||||
"SELECT GROUP_CONCAT(inner_path, '|') AS inner_paths FROM content WHERE ?",
|
||||
{"site_id": self.site_ids[site_address]}
|
||||
)
|
||||
row = res.fetchone()
|
||||
if row and row["inner_paths"]:
|
||||
inner_paths = row["inner_paths"].split("|")
|
||||
return dict.fromkeys(inner_paths, False)
|
||||
else:
|
||||
return {}
|
||||
|
||||
def getTotalSize(self, site_address, ignore=None):
|
||||
params = {"site_id": self.site_ids[site_address]}
|
||||
if ignore:
|
||||
params["not__inner_path"] = ignore
|
||||
res = self.execute("SELECT SUM(size) + SUM(size_files) AS size FROM content WHERE ?", params)
|
||||
return res.fetchone()["size"]
|
||||
|
||||
def getOptionalSize(self, site_address):
|
||||
res = self.execute(
|
||||
"SELECT SUM(size_files_optional) AS size FROM content WHERE ?",
|
||||
{"site_id": self.site_ids[site_address]}
|
||||
)
|
||||
return res.fetchone()["size"]
|
||||
|
||||
def listModified(self, site_address, since):
|
||||
res = self.execute(
|
||||
"SELECT inner_path, modified FROM content WHERE site_id = :site_id AND modified > :since",
|
||||
{"site_id": self.site_ids[site_address], "since": since}
|
||||
)
|
||||
return {row["inner_path"]: row["modified"] for row in res}
|
||||
|
||||
|
||||
content_db = ContentDb()
|
141
src/Content/ContentDbDict.py
Normal file
141
src/Content/ContentDbDict.py
Normal file
|
@ -0,0 +1,141 @@
|
|||
import time
|
||||
import os
|
||||
|
||||
import ContentDb
|
||||
|
||||
|
||||
class ContentDbDict(dict):
|
||||
def __init__(self, site, *args, **kwargs):
|
||||
s = time.time()
|
||||
self.site = site
|
||||
self.site_address = site.address
|
||||
self.cached_keys = []
|
||||
self.log = self.site.log
|
||||
self.db = ContentDb.content_db
|
||||
self.db_id = self.db.needSite(site.address)
|
||||
self.num_loaded = 0
|
||||
super(ContentDbDict, self).__init__(self.db.loadDbDict(site.address)) # Load keys from database
|
||||
self.log.debug("ContentDb init: %.3fs, found files: %s" % (time.time() - s, len(self)))
|
||||
|
||||
def loadItem(self, key):
|
||||
try:
|
||||
self.num_loaded += 1
|
||||
if self.num_loaded % 100 == 0:
|
||||
self.log.debug("Loaded json: %s (latest: %s)" % (self.num_loaded, key))
|
||||
content = self.site.storage.loadJson(key)
|
||||
dict.__setitem__(self, key, content)
|
||||
except IOError:
|
||||
dict.__delitem__(self, key) # File not exists anymore
|
||||
raise KeyError(key)
|
||||
|
||||
self.addCachedKey(key)
|
||||
self.checkLimit()
|
||||
|
||||
return content
|
||||
|
||||
def getItemSize(self, key):
|
||||
return self.site.storage.getSize(key)
|
||||
|
||||
# Only keep last 50 accessed json in memory
|
||||
def checkLimit(self):
|
||||
if len(self.cached_keys) > 50:
|
||||
key_deleted = self.cached_keys.pop(0)
|
||||
dict.__setitem__(self, key_deleted, False)
|
||||
|
||||
def addCachedKey(self, key):
|
||||
if key not in self.cached_keys and key != "content.json" and len(key) > 40: # Always keep keys smaller than 40 char
|
||||
self.cached_keys.append(key)
|
||||
|
||||
def __getitem__(self, key):
|
||||
val = dict.get(self, key)
|
||||
if val: # Already loaded
|
||||
return val
|
||||
elif val is None: # Unknown key
|
||||
raise KeyError(key)
|
||||
elif val is False: # Loaded before, but purged from cache
|
||||
return self.loadItem(key)
|
||||
|
||||
def __setitem__(self, key, val):
|
||||
dict.__setitem__(self, key, val)
|
||||
self.addCachedKey(key)
|
||||
self.checkLimit()
|
||||
self.db.setContent(self.site_address, key, val, size=self.getItemSize(key))
|
||||
|
||||
def __delitem__(self, key):
|
||||
dict.__delitem__(self, key)
|
||||
try:
|
||||
self.cached_keys.remove(key)
|
||||
except ValueError:
|
||||
pass
|
||||
self.db.deleteContent(self.site_address, key)
|
||||
|
||||
def iteritems(self):
|
||||
for key, val in dict.iteritems(self):
|
||||
if not val:
|
||||
val = self.loadItem(key)
|
||||
yield key, val
|
||||
|
||||
def items(self):
|
||||
back = []
|
||||
for key, val in dict.iteritems(self):
|
||||
if not val:
|
||||
try:
|
||||
val = self.loadItem(key)
|
||||
except Exception:
|
||||
continue
|
||||
back.append((key, val))
|
||||
return back
|
||||
|
||||
def values(self):
|
||||
back = []
|
||||
for key, val in dict.iteritems(self):
|
||||
if not val:
|
||||
try:
|
||||
val = self.loadItem(key)
|
||||
except Exception:
|
||||
continue
|
||||
back.append(val)
|
||||
return back
|
||||
|
||||
def get(self, key, default=None):
|
||||
try:
|
||||
return self.__getitem__(key)
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
def execute(self, query, params={}):
|
||||
params["site_id"] = self.db_id
|
||||
return self.db.execute(query, params)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import psutil
|
||||
process = psutil.Process(os.getpid())
|
||||
s_mem = process.memory_info()[0] / float(2 ** 20)
|
||||
root = "data-live/1MaiL5gfBM1cyb4a8e3iiL8L5gXmoAJu27"
|
||||
contents = ContentDbDict("1MaiL5gfBM1cyb4a8e3iiL8L5gXmoAJu27", root)
|
||||
print "Init len", len(contents)
|
||||
|
||||
s = time.time()
|
||||
for dir_name in os.listdir(root + "/data/users/")[0:8000]:
|
||||
contents["data/users/%s/content.json" % dir_name]
|
||||
print "Load: %.3fs" % (time.time() - s)
|
||||
|
||||
s = time.time()
|
||||
found = 0
|
||||
for key, val in contents.iteritems():
|
||||
found += 1
|
||||
assert key
|
||||
assert val
|
||||
print "Found:", found
|
||||
print "Iteritem: %.3fs" % (time.time() - s)
|
||||
|
||||
s = time.time()
|
||||
found = 0
|
||||
for key in contents.keys():
|
||||
found += 1
|
||||
assert key in contents
|
||||
print "In: %.3fs" % (time.time() - s)
|
||||
|
||||
print "Len:", len(contents.values()), len(contents.keys())
|
||||
|
||||
print "Mem: +", process.memory_info()[0] / float(2 ** 20) - s_mem
|
|
@ -12,6 +12,7 @@ from Config import config
|
|||
from util import helper
|
||||
from util import Diff
|
||||
from Peer import PeerHashfield
|
||||
from ContentDbDict import ContentDbDict
|
||||
|
||||
|
||||
class ContentManager(object):
|
||||
|
@ -19,9 +20,14 @@ class ContentManager(object):
|
|||
def __init__(self, site):
|
||||
self.site = site
|
||||
self.log = self.site.log
|
||||
self.contents = {} # Known content.json (without files and includes)
|
||||
self.contents = ContentDbDict(site)
|
||||
self.hashfield = PeerHashfield()
|
||||
self.has_optional_files = False
|
||||
self.site.onFileDone.append(lambda inner_path: self.addOptionalFile(inner_path))
|
||||
|
||||
def loadContents(self):
|
||||
if len(self.contents) == 0:
|
||||
self.log.debug("Content db not initialized, load files from filesystem")
|
||||
self.loadContent(add_bad_files=False, delete_removed_files=False)
|
||||
self.site.settings["size"] = self.getTotalSize()
|
||||
|
||||
|
|
|
@ -52,6 +52,8 @@ class Site(object):
|
|||
self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files
|
||||
self.loadSettings() # Load settings from sites.json
|
||||
self.content_manager = ContentManager(self) # Load contents
|
||||
self.content_manager = ContentManager(self)
|
||||
self.content_manager.loadContents() # Load content.json files
|
||||
self.connection_server = None
|
||||
if "main" in sys.modules and "file_server" in dir(sys.modules["main"]): # Use global file server by default if possible
|
||||
self.connection_server = sys.modules["main"].file_server
|
||||
|
|
Loading…
Reference in a new issue