diff --git a/src/Content/ContentDb.py b/src/Content/ContentDb.py new file mode 100644 index 00000000..015c63e6 --- /dev/null +++ b/src/Content/ContentDb.py @@ -0,0 +1,116 @@ +import time + +from Db import Db +from Config import config + + +class ContentDb(Db): + def __init__(self): + self.version = 4 + super(ContentDb, self).__init__({"db_name": "ContentDb"}, "%s/content.db" % config.data_dir) + self.foreign_keys = True + self.checkTables() + self.site_ids = {} + + def checkTables(self): + s = time.time() + version = int(self.execute("PRAGMA user_version").fetchone()[0]) + self.log.debug("Db version: %s, needed: %s" % (version, self.version)) + if version < self.version: + self.createTables() + else: + self.execute("VACUUM") + self.log.debug("Check tables in %.3fs" % (time.time() - s)) + + def createTables(self): + # Delete all tables + self.execute("PRAGMA writable_schema = 1") + self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')") + self.execute("PRAGMA writable_schema = 0") + self.execute("VACUUM") + self.execute("PRAGMA INTEGRITY_CHECK") + # Create new tables + self.execute(""" + CREATE TABLE site ( + site_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE, + address TEXT NOT NULL + ); + """) + self.execute("CREATE UNIQUE INDEX site_address ON site (address);") + + self.execute(""" + CREATE TABLE content ( + content_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL, + site_id INTEGER REFERENCES site (site_id) ON DELETE CASCADE, + inner_path TEXT, + size INTEGER, + size_files INTEGER, + size_files_optional INTEGER, + modified INTEGER + ); + """) + self.execute("CREATE UNIQUE INDEX content_key ON content (site_id, inner_path);") + self.execute("CREATE INDEX content_modified ON content (site_id, modified);") + + self.execute("PRAGMA user_version = %s" % self.version) + + def needSite(self, site_address): + if site_address not in self.site_ids: + self.execute("INSERT OR IGNORE INTO site ?", {"address": site_address}) + for row in self.execute("SELECT * FROM site"): + self.site_ids[row["address"]] = row["site_id"] + return self.site_ids[site_address] + + def deleteSite(self, site_address): + site_id = self.site_ids[site_address] + self.execute("DELETE FROM site WHERE site_id = :site_id", {"site_id": site_id}) + del self.site_ids[site_address] + + def setContent(self, site_address, inner_path, content, size=0): + self.execute("INSERT OR REPLACE INTO content ?", { + "site_id": self.site_ids[site_address], + "inner_path": inner_path, + "size": size, + "size_files": sum([val["size"] for key, val in content.get("files", {}).iteritems()]), + "size_files_optional": sum([val["size"] for key, val in content.get("files_optional", {}).iteritems()]), + "modified": int(content["modified"]) + }) + + def deleteContent(self, site_address, inner_path): + self.execute("DELETE FROM content WHERE ?", {"site_id": self.site_ids[site_address], "inner_path": inner_path}) + + def loadDbDict(self, site_address): + res = self.execute( + "SELECT GROUP_CONCAT(inner_path, '|') AS inner_paths FROM content WHERE ?", + {"site_id": self.site_ids[site_address]} + ) + row = res.fetchone() + if row and row["inner_paths"]: + inner_paths = row["inner_paths"].split("|") + return dict.fromkeys(inner_paths, False) + else: + return {} + + def getTotalSize(self, site_address, ignore=None): + params = {"site_id": self.site_ids[site_address]} + if ignore: + params["not__inner_path"] = ignore + res = self.execute("SELECT SUM(size) + SUM(size_files) AS size FROM content WHERE ?", params) + return res.fetchone()["size"] + + def getOptionalSize(self, site_address): + res = self.execute( + "SELECT SUM(size_files_optional) AS size FROM content WHERE ?", + {"site_id": self.site_ids[site_address]} + ) + return res.fetchone()["size"] + + def listModified(self, site_address, since): + res = self.execute( + "SELECT inner_path, modified FROM content WHERE site_id = :site_id AND modified > :since", + {"site_id": self.site_ids[site_address], "since": since} + ) + return {row["inner_path"]: row["modified"] for row in res} + + +content_db = ContentDb() diff --git a/src/Content/ContentDbDict.py b/src/Content/ContentDbDict.py new file mode 100644 index 00000000..2b1bc8f2 --- /dev/null +++ b/src/Content/ContentDbDict.py @@ -0,0 +1,141 @@ +import time +import os + +import ContentDb + + +class ContentDbDict(dict): + def __init__(self, site, *args, **kwargs): + s = time.time() + self.site = site + self.site_address = site.address + self.cached_keys = [] + self.log = self.site.log + self.db = ContentDb.content_db + self.db_id = self.db.needSite(site.address) + self.num_loaded = 0 + super(ContentDbDict, self).__init__(self.db.loadDbDict(site.address)) # Load keys from database + self.log.debug("ContentDb init: %.3fs, found files: %s" % (time.time() - s, len(self))) + + def loadItem(self, key): + try: + self.num_loaded += 1 + if self.num_loaded % 100 == 0: + self.log.debug("Loaded json: %s (latest: %s)" % (self.num_loaded, key)) + content = self.site.storage.loadJson(key) + dict.__setitem__(self, key, content) + except IOError: + dict.__delitem__(self, key) # File not exists anymore + raise KeyError(key) + + self.addCachedKey(key) + self.checkLimit() + + return content + + def getItemSize(self, key): + return self.site.storage.getSize(key) + + # Only keep last 50 accessed json in memory + def checkLimit(self): + if len(self.cached_keys) > 50: + key_deleted = self.cached_keys.pop(0) + dict.__setitem__(self, key_deleted, False) + + def addCachedKey(self, key): + if key not in self.cached_keys and key != "content.json" and len(key) > 40: # Always keep keys smaller than 40 char + self.cached_keys.append(key) + + def __getitem__(self, key): + val = dict.get(self, key) + if val: # Already loaded + return val + elif val is None: # Unknown key + raise KeyError(key) + elif val is False: # Loaded before, but purged from cache + return self.loadItem(key) + + def __setitem__(self, key, val): + dict.__setitem__(self, key, val) + self.addCachedKey(key) + self.checkLimit() + self.db.setContent(self.site_address, key, val, size=self.getItemSize(key)) + + def __delitem__(self, key): + dict.__delitem__(self, key) + try: + self.cached_keys.remove(key) + except ValueError: + pass + self.db.deleteContent(self.site_address, key) + + def iteritems(self): + for key, val in dict.iteritems(self): + if not val: + val = self.loadItem(key) + yield key, val + + def items(self): + back = [] + for key, val in dict.iteritems(self): + if not val: + try: + val = self.loadItem(key) + except Exception: + continue + back.append((key, val)) + return back + + def values(self): + back = [] + for key, val in dict.iteritems(self): + if not val: + try: + val = self.loadItem(key) + except Exception: + continue + back.append(val) + return back + + def get(self, key, default=None): + try: + return self.__getitem__(key) + except KeyError: + return default + + def execute(self, query, params={}): + params["site_id"] = self.db_id + return self.db.execute(query, params) + +if __name__ == "__main__": + import psutil + process = psutil.Process(os.getpid()) + s_mem = process.memory_info()[0] / float(2 ** 20) + root = "data-live/1MaiL5gfBM1cyb4a8e3iiL8L5gXmoAJu27" + contents = ContentDbDict("1MaiL5gfBM1cyb4a8e3iiL8L5gXmoAJu27", root) + print "Init len", len(contents) + + s = time.time() + for dir_name in os.listdir(root + "/data/users/")[0:8000]: + contents["data/users/%s/content.json" % dir_name] + print "Load: %.3fs" % (time.time() - s) + + s = time.time() + found = 0 + for key, val in contents.iteritems(): + found += 1 + assert key + assert val + print "Found:", found + print "Iteritem: %.3fs" % (time.time() - s) + + s = time.time() + found = 0 + for key in contents.keys(): + found += 1 + assert key in contents + print "In: %.3fs" % (time.time() - s) + + print "Len:", len(contents.values()), len(contents.keys()) + + print "Mem: +", process.memory_info()[0] / float(2 ** 20) - s_mem diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index a46fe62d..4b4b7aa7 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -12,6 +12,7 @@ from Config import config from util import helper from util import Diff from Peer import PeerHashfield +from ContentDbDict import ContentDbDict class ContentManager(object): @@ -19,10 +20,15 @@ class ContentManager(object): def __init__(self, site): self.site = site self.log = self.site.log - self.contents = {} # Known content.json (without files and includes) + self.contents = ContentDbDict(site) self.hashfield = PeerHashfield() + self.has_optional_files = False self.site.onFileDone.append(lambda inner_path: self.addOptionalFile(inner_path)) - self.loadContent(add_bad_files=False, delete_removed_files=False) + + def loadContents(self): + if len(self.contents) == 0: + self.log.debug("Content db not initialized, load files from filesystem") + self.loadContent(add_bad_files=False, delete_removed_files=False) self.site.settings["size"] = self.getTotalSize() # Load content.json to self.content diff --git a/src/Site/Site.py b/src/Site/Site.py index 4a100f6f..0cfc5e0d 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -52,6 +52,8 @@ class Site(object): self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files self.loadSettings() # Load settings from sites.json self.content_manager = ContentManager(self) # Load contents + self.content_manager = ContentManager(self) + self.content_manager.loadContents() # Load content.json files self.connection_server = None if "main" in sys.modules and "file_server" in dir(sys.modules["main"]): # Use global file server by default if possible self.connection_server = sys.modules["main"].file_server