diff --git a/src/Config.py b/src/Config.py index 605d0e07..3136372c 100644 --- a/src/Config.py +++ b/src/Config.py @@ -3,7 +3,7 @@ import ConfigParser class Config(object): def __init__(self): - self.version = "0.2.5" + self.version = "0.2.6" self.parser = self.createArguments() argv = sys.argv[:] # Copy command line arguments argv = self.parseConfig(argv) # Add arguments from config file @@ -50,6 +50,15 @@ class Config(object): action = subparsers.add_parser("siteVerify", help='Verify site files using sha512: address') action.add_argument('address', help='Site to verify') + #dbRebuild + action = subparsers.add_parser("dbRebuild", help='Rebuild site database cache') + action.add_argument('address', help='Site to rebuild') + + #dbQuery + action = subparsers.add_parser("dbQuery", help='Query site sql cache') + action.add_argument('address', help='Site to query') + action.add_argument('query', help='Sql query') + # PeerPing action = subparsers.add_parser("peerPing", help='Send Ping command to peer') action.add_argument('peer_ip', help='Peer ip') diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 998b4966..d0816446 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -72,7 +72,12 @@ class Connection: # Handle incoming connection def handleIncomingConnection(self, sock): self.type = "in" - firstchar = sock.recv(1) # Find out if pure socket or zeromq + try: + firstchar = sock.recv(1) # Find out if pure socket or zeromq + except Exception, err: + self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.close() + return False if firstchar == "\xff": # Backward compatiblity: forward data to zmq if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ") diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 994dffbc..12b42a50 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -39,7 +39,7 @@ class ConnectionServer: def start(self): self.running = True try: - self.log.debug("Binding to: %s:%s" % (self.ip, self.port)) + self.log.debug("Binding to: %s:%s (msgpack: %s)" % (self.ip, self.port, ".".join(map(str, msgpack.version)))) self.stream_server.serve_forever() # Start normal connection server except Exception, err: self.log.info("StreamServer bind error, must be running already: %s" % err) diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index b4332a34..83b4e533 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -17,7 +17,7 @@ class ContentManager: def loadContent(self, content_inner_path = "content.json", add_bad_files = True, load_includes = True): content_inner_path = content_inner_path.strip("/") # Remove / from begning old_content = self.contents.get(content_inner_path) - content_path = self.site.getPath(content_inner_path) + content_path = self.site.storage.getPath(content_inner_path) content_dir = self.toDir(content_inner_path) if os.path.isfile(content_path): @@ -51,7 +51,7 @@ class ContentManager: if load_includes and "includes" in new_content: for relative_path, info in new_content["includes"].items(): include_inner_path = content_dir+relative_path - if os.path.isfile(self.site.getPath(include_inner_path)): # Content.json exists, load it + if self.site.storage.isFile(include_inner_path): # Content.json exists, load it success = self.loadContent(include_inner_path, add_bad_files=add_bad_files) if success: changed += success # Add changed files else: # Content.json not exits, add to changed files @@ -81,7 +81,7 @@ class ContentManager: total_size = 0 for inner_path, content in self.contents.iteritems(): if inner_path == ignore: continue - total_size += os.path.getsize(self.site.getPath(inner_path)) # Size of content.json + total_size += self.site.storage.getSize(inner_path) # Size of content.json for file, info in content.get("files", {}).iteritems(): total_size += info["size"] return total_size @@ -142,13 +142,13 @@ class ContentManager: content["signs_required"] = 1 content["ignore"] = "" - directory = self.toDir(self.site.getPath(inner_path)) + directory = self.toDir(self.site.storage.getPath(inner_path)) self.log.info("Opening site data directory: %s..." % directory) hashed_files = {} for root, dirs, files in os.walk(directory): for file_name in files: - file_path = self.site.getPath("%s/%s" % (root.strip("/"), file_name)) + file_path = self.site.storage.getPath("%s/%s" % (root.strip("/"), file_name)) file_inner_path = re.sub(re.escape(directory), "", file_path) if file_name == "content.json" or (content.get("ignore") and re.match(content["ignore"], file_inner_path)) or file_name.startswith("."): # Ignore content.json, definied regexp and files starting with . @@ -205,7 +205,7 @@ class ContentManager: if filewrite: self.log.info("Saving to %s..." % inner_path) - json.dump(new_content, open(self.site.getPath(inner_path), "w"), indent=2, sort_keys=True) + json.dump(new_content, open(self.site.storage.getPath(inner_path), "w"), indent=2, sort_keys=True) self.log.info("File %s signed!" % inner_path) @@ -378,10 +378,10 @@ def testVerify(): content_manager = ContentManager(site) print "Loaded contents:", content_manager.contents.keys() - file = open(site.getPath("data/users/1KRxE1s3oDyNDawuYWpzbLUwNm8oDbeEp6/content.json")) + file = open(site.storage.getPath("data/users/1KRxE1s3oDyNDawuYWpzbLUwNm8oDbeEp6/content.json")) print "content.json valid:", content_manager.verifyFile("data/users/1KRxE1s3oDyNDawuYWpzbLUwNm8oDbeEp6/content.json", file, ignore_same=False) - file = open(site.getPath("data/users/1KRxE1s3oDyNDawuYWpzbLUwNm8oDbeEp6/messages.json")) + file = open(site.storage.getPath("data/users/1KRxE1s3oDyNDawuYWpzbLUwNm8oDbeEp6/messages.json")) print "messages.json valid:", content_manager.verifyFile("data/users/1KRxE1s3oDyNDawuYWpzbLUwNm8oDbeEp6/messages.json", file, ignore_same=False) diff --git a/src/Db/Db.py b/src/Db/Db.py new file mode 100644 index 00000000..cd3d4e71 --- /dev/null +++ b/src/Db/Db.py @@ -0,0 +1,248 @@ +import sqlite3, json, time, logging, re, os +from DbCursor import DbCursor + +class Db: + def __init__(self, schema, db_path): + self.db_path = db_path + self.db_dir = os.path.dirname(db_path)+"/" + self.schema = schema + self.conn = None + self.cur = None + self.log = logging.getLogger("Db:%s" % schema["db_name"]) + self.table_names = None + self.collect_stats = False + self.query_stats = {} + self.db_keyvalues = {} + + + def connect(self): + self.log.debug("Connecting (sqlite version: %s)..." % sqlite3.version) + if not os.path.isdir(self.db_dir): # Directory not exits yet + os.makedirs(self.db_dir) + self.conn = sqlite3.connect(self.db_path) + self.conn.row_factory = sqlite3.Row + self.conn.isolation_level = None + self.cur = self.getCursor() + # We need more speed then security + self.cur.execute("PRAGMA journal_mode = WAL") + self.cur.execute("PRAGMA journal_mode = MEMORY") + self.cur.execute("PRAGMA synchronous = OFF") + + + # Execute query using dbcursor + def execute(self, query, params): + if not self.conn: self.connect() + return self.cur.execute(query, params) + + + def close(self): + self.log.debug("Closing") + if self.cur: self.cur.close() + if self.conn: self.conn.close() + + + # Gets a cursor object to database + # Return: Cursor class + def getCursor(self): + if not self.conn: self.connect() + return DbCursor(self.conn, self) + + + # Get the table version + # Return: Table version or None if not exits + def getTableVersion(self, table_name): + """if not self.table_names: # Get exitsing table names + res = self.cur.execute("SELECT name FROM sqlite_master WHERE type='table'") + self.table_names = [row["name"] for row in res] + if table_name not in self.table_names: + return False + + else:""" + if not self.db_keyvalues: # Get db keyvalues + try: + res = self.cur.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues + except sqlite3.OperationalError, err: # Table not exits + self.log.debug("Query error: %s" % err) + return False + + for row in res: + self.db_keyvalues[row["key"]] = row["value"] + + return self.db_keyvalues.get("table.%s.version" % table_name, 0) + + + + # Check Db tables + # Return: Changed table names + def checkTables(self): + s = time.time() + changed_tables = [] + cur = self.getCursor() + + cur.execute("BEGIN") + + # Check internal tables + changed = cur.needTable("keyvalue", [ + ["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], + ["key", "TEXT"], + ["value", "INTEGER"], + ["json_id", "INTEGER REFERENCES json (json_id)"], + ],[ + "CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)" + ], version=1) + if changed: changed_tables.append("keyvalue") + + changed = cur.needTable("json", [ + ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], + ["path", "VARCHAR(255)"] + ], [ + "CREATE UNIQUE INDEX path ON json(path)" + ], version=1) + if changed: changed_tables.append("json") + + # Check schema tables + for table_name, table_settings in self.schema["tables"].items(): + changed = cur.needTable(table_name, table_settings["cols"], table_settings["indexes"], version=table_settings["schema_changed"]) + if changed: changed_tables.append(table_name) + + cur.execute("COMMIT") + self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time()-s, changed_tables)) + + return changed_tables + + + # Load json file to db + # Return: True if matched + def loadJson(self, file_path, file = None, cur = None): + if not file_path.startswith(self.db_dir): return False # Not from the db dir: Skipping + relative_path = re.sub("^%s" % self.db_dir, "", file_path) # File path realative to db file + # Check if filename matches any of mappings in schema + matched_maps = [] + for match, map_settings in self.schema["maps"].items(): + if re.match(match, relative_path): + matched_maps.append(map_settings) + + # No match found for the file + if not matched_maps: return False + + # Load the json file + if not file: file = open(file_path) + data = json.load(file) + + # No cursor specificed + if not cur: + cur = self.getCursor() + cur.execute("BEGIN") + cur.logging = False + commit_after_done = True + else: + commit_after_done = False + + # Row for current json file + json_row = cur.getJsonRow(relative_path) + + # Check matched mappings in schema + for map in matched_maps: + # Insert non-relational key values + if map.get("to_keyvalue"): + # Get current values + res = cur.execute("SELECT * FROM keyvalue WHERE json_id = ?", (json_row["json_id"],)) + current_keyvalue = {} + current_keyvalue_id = {} + for row in res: + current_keyvalue[row["key"]] = row["value"] + current_keyvalue_id[row["key"]] = row["keyvalue_id"] + + for key in map["to_keyvalue"]: + if key not in current_keyvalue: # Keyvalue not exits yet in the db + cur.execute("INSERT INTO keyvalue ?", + {"key": key, "value": data.get(key), "json_id": json_row["json_id"]} + ) + elif data.get(key) != current_keyvalue[key]: # Keyvalue different value + cur.execute("UPDATE keyvalue SET value = ? WHERE keyvalue_id = ?", (data.get(key), current_keyvalue_id[key])) + + """for key in map.get("to_keyvalue", []): + cur.execute("INSERT OR REPLACE INTO keyvalue ?", + {"key": key, "value": data.get(key), "json_id": json_row["json_id"]} + ) + """ + + # Insert data to tables + for table_settings in map.get("to_table", []): + if isinstance(table_settings, dict): # Custom settings + table_name = table_settings["table"] # Table name to insert datas + node = table_settings.get("node", table_name) # Node keyname in data json file + key_col = table_settings.get("key_col") # Map dict key as this col + val_col = table_settings.get("val_col") # Map dict value as this col + import_cols = table_settings.get("import_cols") + replaces = table_settings.get("replaces") + else: # Simple settings + table_name = table_settings + node = table_settings + key_col = None + val_col = None + import_cols = None + replaces = None + + cur.execute("DELETE FROM %s WHERE json_id = ?" % table_name, (json_row["json_id"],)) + + if node not in data: continue + + table_schema = self.schema["tables"][table_name] + if key_col: # Map as dict + for key, val in data[node].iteritems(): + if val_col: # Single value + cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, + { key_col: key, val_col: val, "json_id": json_row["json_id"] } + ) + else: # Multi value + if isinstance(val, dict): # Single row + row = val + if import_cols: row = { key: row[key] for key in import_cols } # Filter row by import_cols + row[key_col] = key + # Replace in value if necessary + if replaces: + for replace_key, replace in replaces.iteritems(): + if replace_key in row: + for replace_from, replace_to in replace.iteritems(): + row[replace_key] = row[replace_key].replace(replace_from, replace_to) + + row["json_id"] = json_row["json_id"] + cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row) + else: # Multi row + for row in val: + row[key_col] = key + row["json_id"] = json_row["json_id"] + cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row) + else: # Map as list + for row in data[node]: + row["json_id"] = json_row["json_id"] + cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row) + + if commit_after_done: cur.execute("COMMIT") + return True + + +if __name__ == "__main__": + s = time.time() + console_log = logging.StreamHandler() + logging.getLogger('').setLevel(logging.DEBUG) + logging.getLogger('').addHandler(console_log) + console_log.setLevel(logging.DEBUG) + dbjson = DbJson(json.load(open("zerotalk.schema.json")), "data/users/zerotalk.db") + dbjson.collect_stats = True + dbjson.checkTables() + cur = dbjson.getCursor() + cur.execute("BEGIN") + cur.logging = False + dbjson.loadJson("data/users/content.json", cur=cur) + for user_dir in os.listdir("data/users"): + if os.path.isdir("data/users/%s" % user_dir): + dbjson.loadJson("data/users/%s/data.json" % user_dir, cur=cur) + #print ".", + cur.logging = True + cur.execute("COMMIT") + print "Done in %.3fs" % (time.time()-s) + for query, stats in sorted(dbjson.query_stats.items()): + print "-", query, stats + diff --git a/src/Db/DbCursor.py b/src/Db/DbCursor.py new file mode 100644 index 00000000..e3e32754 --- /dev/null +++ b/src/Db/DbCursor.py @@ -0,0 +1,106 @@ +import time + +# Special sqlite cursor +class DbCursor: + def __init__(self, conn, db): + self.conn = conn + self.db = db + self.cursor = conn.cursor() + self.logging = True + + + def execute(self, query, params=None): + if isinstance(params, dict): # Make easier select and insert by allowing dict params + if query.startswith("SELECT") or query.startswith("DELETE"): # Convert param dict to SELECT * FROM table WHERE key = ?, key2 = ? format + wheres = ", ".join([key+" = ?" for key in params]) + query = query.replace("?", wheres) + params = params.values() + else: # Convert param dict to INSERT INTO table (key, key2) VALUES (?, ?) format + keys = ", ".join(params.keys()) + values = ", ".join(['?' for key in params.keys()]) + query = query.replace("?", "(%s) VALUES (%s)" % (keys, values)) + params = tuple(params.values()) + + s = time.time() + # if query == "COMMIT": self.logging = True # Turn logging back on transaction commit + + if params: # Query has parameters + res = self.cursor.execute(query, params) + if self.logging: + self.db.log.debug((query.replace("?", "%s") % params)+" (Done in %.4f)" % (time.time()-s)) + else: + res = self.cursor.execute(query) + if self.logging: self.db.log.debug(query+" (Done in %.4f)" % (time.time()-s)) + + # Log query stats + if self.db.collect_stats: + if query not in self.db.query_stats: + self.db.query_stats[query] = {"call": 0, "time": 0.0} + self.db.query_stats[query]["call"] += 1 + self.db.query_stats[query]["time"] += time.time()-s + + # if query == "BEGIN": self.logging = False # Turn logging off on transaction commit + return res + + + # Create new table + # Return: True on success + def createTable(self, table, cols): + # TODO: Check current structure + """table_changed = False + res = c.execute("PRAGMA table_info(%s)" % table) + if res: + for row in res: + print row["name"], row["type"], cols[row["name"]] + print row + else: + table_changed = True + + if table_changed: # Table structure changed, drop and create again""" + self.execute("DROP TABLE IF EXISTS %s" % table) + col_definitions = [] + for col_name, col_type in cols: + col_definitions.append("%s %s" % (col_name, col_type)) + + self.execute("CREATE TABLE %s (%s)" % (table, ",".join(col_definitions))) + return True + + + + # Create indexes on table + # Return: True on success + def createIndexes(self, table, indexes): + # indexes.append("CREATE INDEX %s_id ON %s(%s_id)" % (table, table, table)) # Primary key index + for index in indexes: + self.execute(index) + + + # Create table if not exits + # Return: True if updated + def needTable(self, table, cols, indexes=None, version=1): + current_version = self.db.getTableVersion(table) + if int(current_version) < int(version): # Table need update or not extis + self.db.log.info("Table %s outdated...version: %s need: %s, rebuilding..." % (table, current_version, version)) + self.createTable(table, cols) + if indexes: self.createIndexes(table, indexes) + self.execute("INSERT OR REPLACE INTO keyvalue ?", + {"json_id": 0, "key": "table.%s.version" % table, "value": version} + ) + return True + else: # Not changed + return False + + + # Get or create a row for json file + # Return: The database row + def getJsonRow(self, file_path): + res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path}) + row = res.fetchone() + if not row: # No row yet, create it + self.execute("INSERT INTO json ?", {"path": file_path}) + res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path}) + row = res.fetchone() + return row + + def close(self): + self.cursor.close() diff --git a/src/Db/__init__.py b/src/Db/__init__.py new file mode 100644 index 00000000..1f7f580c --- /dev/null +++ b/src/Db/__init__.py @@ -0,0 +1,2 @@ +from Db import Db +from DbCursor import DbCursor \ No newline at end of file diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 749a71b4..9373f9fb 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -57,9 +57,8 @@ class FileRequest: if valid == True: # Valid and changed self.log.info("Update for %s looks valid, saving..." % params["inner_path"]) buff.seek(0) - file = open(site.getPath(params["inner_path"]), "wb") - shutil.copyfileobj(buff, file) # Write buff to disk - file.close() + site.storage.write(params["inner_path"], buff) + site.onFileDone(params["inner_path"]) # Trigger filedone if params["inner_path"].endswith("content.json"): # Download every changed file from peer @@ -92,7 +91,7 @@ class FileRequest: self.response({"error": "Unknown site"}) return False try: - file_path = site.getPath(params["inner_path"]) + file_path = site.storage.getPath(params["inner_path"]) if config.debug_socket: self.log.debug("Opening file: %s" % file_path) file = open(file_path, "rb") file.seek(params["location"]) diff --git a/src/Site/Site.py b/src/Site/Site.py index 654cb214..f10ca575 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -8,20 +8,15 @@ from Worker import WorkerManager from Crypt import CryptHash from Debug import Debug from Content import ContentManager +from SiteStorage import SiteStorage import SiteManager class Site: def __init__(self, address, allow_create=True): self.address = re.sub("[^A-Za-z0-9]", "", address) # Make sure its correct address self.address_short = "%s..%s" % (self.address[:6], self.address[-4:]) # Short address for logging - self.directory = "data/%s" % self.address # Site data diretory self.log = logging.getLogger("Site:%s" % self.address_short) - if not os.path.isdir(self.directory): - if allow_create: - os.mkdir(self.directory) # Create directory if not found - else: - raise Exception("Directory not exists: %s" % self.directory) self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself) @@ -32,6 +27,7 @@ class Site: self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] self.page_requested = False # Page viewed in browser + self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files self.loadSettings() # Load settings from sites.json self.content_manager = ContentManager(self) # Load contents @@ -98,15 +94,6 @@ class Site: - # Sercurity check and return path of site's file - def getPath(self, inner_path): - inner_path = inner_path.replace("\\", "/") # Windows separator fix - inner_path = re.sub("^%s/" % re.escape(self.directory), "", inner_path) # Remove site directory if begins with it - file_path = self.directory+"/"+inner_path - allowed_dir = os.path.abspath(self.directory) # Only files within this directory allowed - if ".." in file_path or not os.path.dirname(os.path.abspath(file_path)).startswith(allowed_dir): - raise Exception("File not allowed: %s" % file_path) - return file_path # Download all file from content.json @@ -185,7 +172,7 @@ class Site: if changed: for changed_file in changed: self.bad_files[changed_file] = self.bad_files.get(changed_file, 0)+1 - if not self.settings["own"]: self.checkFiles(quick_check=True) # Quick check files based on file size + if not self.settings["own"]: self.storage.checkFiles(quick_check=True) # Quick check files based on file size if self.bad_files: self.download() @@ -193,8 +180,9 @@ class Site: return changed + # Publish worker def publisher(self, inner_path, peers, published, limit): - timeout = 5+int(os.path.getsize(self.getPath(inner_path))/1024) # Timeout: 5sec + size in kb + timeout = 5+int(self.storage.getSize(inner_path)/1024) # Timeout: 5sec + size in kb while 1: if not peers or len(published) >= limit: break # All peers done, or published engouht peer = peers.pop(0) @@ -206,7 +194,7 @@ class Site: result = peer.request("update", { "site": self.address, "inner_path": inner_path, - "body": open(self.getPath(inner_path), "rb").read(), + "body": self.storage.open(inner_path).read(), "peer": (config.ip_external, config.fileserver_port) }) if result: break @@ -240,7 +228,7 @@ class Site: # Check and download if file not exits def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0): - if os.path.isfile(self.getPath(inner_path)) and not update: # File exits, no need to do anything + if self.storage.isFile(inner_path) and not update: # File exits, no need to do anything return True elif self.settings["serving"] == False: # Site not serving return False @@ -323,44 +311,6 @@ class Site: self.log.error("Announced to %s trackers, failed" % len(SiteManager.TRACKERS)) - # Check and try to fix site files integrity - def checkFiles(self, quick_check=True): - self.log.debug("Checking files... Quick:%s" % quick_check) - bad_files = self.verifyFiles(quick_check) - if bad_files: - for bad_file in bad_files: - self.bad_files[bad_file] = self.bad_files.get("bad_file", 0)+1 - - - def deleteFiles(self): - self.log.debug("Deleting files from content.json...") - files = [] # Get filenames - for content_inner_path, content in self.content_manager.contents.items(): - files.append(content_inner_path) - for file_relative_path in content["files"].keys(): - file_inner_path = self.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json - files.append(file_inner_path) - - for inner_path in files: - path = self.getPath(inner_path) - if os.path.isfile(path): os.unlink(path) - - self.log.debug("Deleting empty dirs...") - for root, dirs, files in os.walk(self.directory, topdown=False): - for dir in dirs: - path = os.path.join(root,dir) - if os.path.isdir(path) and os.listdir(path) == []: - os.removedirs(path) - self.log.debug("Removing %s" % path) - if os.path.isdir(self.directory) and os.listdir(self.directory) == []: os.removedirs(self.directory) # Remove sites directory if empty - - if os.path.isdir(self.directory): - self.log.debug("Some unknown file remained in site data dir: %s..." % self.directory) - return False # Some files not deleted - else: - self.log.debug("Site data directory deleted: %s..." % self.directory) - return True # All clean - # - Events - @@ -418,34 +368,3 @@ class Site: self.updateWebsocket(file_failed=inner_path) - - # Verify all files sha512sum using content.json - def verifyFiles(self, quick_check=False): # Fast = using file size - bad_files = [] - if not self.content_manager.contents.get("content.json"): # No content.json, download it first - self.needFile("content.json", update=True) # Force update to fix corrupt file - self.content_manager.loadContent() # Reload content.json - for content_inner_path, content in self.content_manager.contents.items(): - if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file - self.log.error("[MISSING] %s" % content_inner_path) - bad_files.append(content_inner_path) - for file_relative_path in content["files"].keys(): - file_inner_path = self.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json - file_inner_path = file_inner_path.strip("/") # Strip leading / - file_path = self.getPath(file_inner_path) - if not os.path.isfile(file_path): - self.log.error("[MISSING] %s" % file_inner_path) - bad_files.append(file_inner_path) - continue - - if quick_check: - ok = os.path.getsize(file_path) == content["files"][file_relative_path]["size"] - else: - ok = self.content_manager.verifyFile(file_inner_path, open(file_path, "rb")) - - if not ok: - self.log.error("[ERROR] %s" % file_inner_path) - bad_files.append(file_inner_path) - self.log.debug("%s verified: %s files, quick_check: %s, bad files: %s" % (content_inner_path, len(content["files"]), quick_check, bad_files)) - - return bad_files diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py new file mode 100644 index 00000000..db98359d --- /dev/null +++ b/src/Site/SiteStorage.py @@ -0,0 +1,255 @@ +import os, re, shutil, json, time, sqlite3 +import gevent.event +from Db import Db + +class SiteStorage: + def __init__(self, site, allow_create=True): + self.site = site + self.directory = "data/%s" % self.site.address # Site data diretory + self.log = site.log + self.db = None # Db class + self.db_checked = False # Checked db tables since startup + self.event_db_busy = None # Gevent AsyncResult if db is working on rebuild + self.has_db = self.isFile("dbschema.json") # The site has schema + + if not os.path.isdir(self.directory): + if allow_create: + os.mkdir(self.directory) # Create directory if not found + else: + raise Exception("Directory not exists: %s" % self.directory) + + + + + # Load db from dbschema.json + def openDb(self, check=True): + schema = self.loadJson("dbschema.json") + db_path = self.getPath(schema["db_file"]) + if check: + if not os.path.isfile(db_path) or os.path.getsize(db_path) == 0: # Not exits or null + self.rebuildDb() + self.db = Db(schema, db_path) + if check and not self.db_checked: + changed_tables = self.db.checkTables() + if changed_tables: self.rebuildDb(delete_db=False) # Todo only update the changed table datas + + + def closeDb(self): + if self.db: self.db.close() + + + # Return db class + def getDb(self): + if not self.db and self.has_db: + self.openDb() + return self.db + + + # Rebuild sql cache + def rebuildDb(self, delete_db=True): + self.event_db_busy = gevent.event.AsyncResult() + schema = self.loadJson("dbschema.json") + db_path = self.getPath(schema["db_file"]) + if os.path.isfile(db_path) and delete_db: + if self.db: self.db.close() # Close db if open + self.log.info("Deleting %s" % db_path) + try: + os.unlink(db_path) + except Exception, err: + self.log.error("Delete error: %s" % err) + self.openDb(check=False) + self.log.info("Creating tables...") + self.db.checkTables() + self.log.info("Importing data...") + cur = self.db.getCursor() + cur.execute("BEGIN") + cur.logging = False + found = 0 + s = time.time() + for content_inner_path, content in self.site.content_manager.contents.items(): + content_path = self.getPath(content_inner_path) + if os.path.isfile(content_path): # Missing content.json file + if self.db.loadJson(content_path, cur=cur): found += 1 + else: + self.log.error("[MISSING] %s" % content_inner_path) + for file_relative_path in content["files"].keys(): + if not file_relative_path.endswith(".json"): continue # We only interesed in json files + file_inner_path = self.site.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json + file_inner_path = file_inner_path.strip("/") # Strip leading / + file_path = self.getPath(file_inner_path) + if os.path.isfile(file_path): + if self.db.loadJson(file_path, cur=cur): found += 1 + else: + self.log.error("[MISSING] %s" % file_inner_path) + cur.execute("END") + self.log.info("Imported %s data file in %ss" % (found, time.time()-s)) + self.event_db_busy.set(True) # Event done, notify waiters + self.event_db_busy = None # Clear event + + + # Execute sql query or rebuild on dberror + def query(self, query, params=None): + if self.event_db_busy: # Db not ready for queries + self.log.debug("Wating for db...") + self.event_db_busy.get() # Wait for event + try: + res = self.getDb().execute(query, params) + except sqlite3.DatabaseError, err: + if err.__class__.__name__ == "DatabaseError": + self.log.error("Database error: %s, query: %s, try to rebuilding it..." % (err, query)) + self.rebuildDb() + res = self.db.cur.execute(query, params) + else: + raise err + return res + + + + + + # Open file object + def open(self, inner_path, mode="rb"): + return open(self.getPath(inner_path), mode) + + + # Open file object + def read(self, inner_path, mode="r"): + return open(self.getPath(inner_path), mode).read() + + + # Write content to file + def write(self, inner_path, content): + file_path = self.getPath(inner_path) + # Write file + if hasattr(content, 'read'): # File-like object + file = open(file_path, "wb") + shutil.copyfileobj(content, file) # Write buff to disk + file.close() + else: # Simple string + open(file_path, "wb").write(content) + del content + + # Update Sql cache + if inner_path == "dbschema.json": + self.has_db = self.isFile("dbschema.json") + self.getDb().checkTables() # Check if any if table schema changed + elif inner_path != "content.json" and inner_path.endswith(".json") and self.has_db: # Load json file to db + self.log.debug("Loading json file to db: %s" % inner_path) + self.getDb().loadJson(file_path) + + + # Load and parse json file + def loadJson(self, inner_path): + return json.load(self.open(inner_path)) + + + # Get file size + def getSize(self, inner_path): + return os.path.getsize(self.getPath(inner_path)) + + + # File exits + def isFile(self, inner_path): + return os.path.isfile(self.getPath(inner_path)) + + + # Dir exits + def isDir(self, inner_path): + return os.path.isdir(self.getPath(inner_path)) + + + # Sercurity check and return path of site's file + def getPath(self, inner_path): + inner_path = inner_path.replace("\\", "/") # Windows separator fix + inner_path = re.sub("^%s/" % re.escape(self.directory), "", inner_path) # Remove site directory if begins with it + file_path = self.directory+"/"+inner_path + allowed_dir = os.path.abspath(self.directory) # Only files within this directory allowed + if ".." in file_path or not os.path.dirname(os.path.abspath(file_path)).startswith(allowed_dir): + raise Exception("File not allowed: %s" % file_path) + return file_path + + + + + + # Verify all files sha512sum using content.json + def verifyFiles(self, quick_check=False): # Fast = using file size + bad_files = [] + if not self.site.content_manager.contents.get("content.json"): # No content.json, download it first + self.site.needFile("content.json", update=True) # Force update to fix corrupt file + self.site.content_manager.loadContent() # Reload content.json + for content_inner_path, content in self.site.content_manager.contents.items(): + if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file + self.log.error("[MISSING] %s" % content_inner_path) + bad_files.append(content_inner_path) + for file_relative_path in content["files"].keys(): + file_inner_path = self.site.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json + file_inner_path = file_inner_path.strip("/") # Strip leading / + file_path = self.getPath(file_inner_path) + if not os.path.isfile(file_path): + self.log.error("[MISSING] %s" % file_inner_path) + bad_files.append(file_inner_path) + continue + + if quick_check: + ok = os.path.getsize(file_path) == content["files"][file_relative_path]["size"] + else: + ok = self.site.content_manager.verifyFile(file_inner_path, open(file_path, "rb")) + + if not ok: + self.log.error("[ERROR] %s" % file_inner_path) + bad_files.append(file_inner_path) + self.log.debug("%s verified: %s files, quick_check: %s, bad files: %s" % (content_inner_path, len(content["files"]), quick_check, bad_files)) + + return bad_files + + + # Check and try to fix site files integrity + def checkFiles(self, quick_check=True): + self.log.debug("Checking files... Quick:%s" % quick_check) + bad_files = self.verifyFiles(quick_check) + if bad_files: + for bad_file in bad_files: + self.site.bad_files[bad_file] = self.site.bad_files.get("bad_file", 0)+1 + + + # Delete site's all file + def deleteFiles(self): + if self.has_db: + self.log.debug("Deleting db file...") + self.closeDb() + try: + schema = self.loadJson("dbschema.json") + db_path = self.getPath(schema["db_file"]) + if os.path.isfile(db_path): os.unlink(db_path) + except Exception, err: + self.log.error("Db file delete error: %s" % err) + + + self.log.debug("Deleting files from content.json...") + files = [] # Get filenames + for content_inner_path, content in self.site.content_manager.contents.items(): + files.append(content_inner_path) + for file_relative_path in content["files"].keys(): + file_inner_path = self.site.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json + files.append(file_inner_path) + + for inner_path in files: + path = self.getPath(inner_path) + if os.path.isfile(path): os.unlink(path) + + self.log.debug("Deleting empty dirs...") + for root, dirs, files in os.walk(self.directory, topdown=False): + for dir in dirs: + path = os.path.join(root,dir) + if os.path.isdir(path) and os.listdir(path) == []: + os.removedirs(path) + self.log.debug("Removing %s" % path) + if os.path.isdir(self.directory) and os.listdir(self.directory) == []: os.removedirs(self.directory) # Remove sites directory if empty + + if os.path.isdir(self.directory): + self.log.debug("Some unknown file remained in site data dir: %s..." % self.directory) + return False # Some files not deleted + else: + self.log.debug("Site data directory deleted: %s..." % self.directory) + return True # All clean diff --git a/src/Site/__init__.py b/src/Site/__init__.py index 24b2e268..cc830ae8 100644 --- a/src/Site/__init__.py +++ b/src/Site/__init__.py @@ -1 +1,2 @@ from Site import Site +from SiteStorage import SiteStorage \ No newline at end of file diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index cc9990f1..a47c7974 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -147,7 +147,7 @@ class UiRequest: query_string=query_string, wrapper_key=site.settings["wrapper_key"], permissions=json.dumps(site.settings["permissions"]), - show_loadingscreen=json.dumps(not os.path.isfile(site.getPath(inner_path))), + show_loadingscreen=json.dumps(not site.storage.isFile(inner_path)), homepage=config.homepage ) diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index fbffa5f8..a2ad559d 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -116,6 +116,8 @@ class UiWebsocket: func = self.actionFileGet elif cmd == "fileQuery": func = self.actionFileQuery + elif cmd == "dbQuery": + func = self.actionDbQuery # Admin commands elif cmd == "sitePause" and "ADMIN" in permissions: func = self.actionSitePause @@ -271,7 +273,7 @@ class UiWebsocket: try: import base64 content = base64.b64decode(content_base64) - open(self.site.getPath(inner_path), "wb").write(content) + self.site.storage.write(inner_path, content) except Exception, err: return self.response(to, "Write error: %s" % err) @@ -284,17 +286,30 @@ class UiWebsocket: # Find data in json files def actionFileQuery(self, to, dir_inner_path, query): # s = time.time() - dir_path = self.site.getPath(dir_inner_path) + dir_path = self.site.storage.getPath(dir_inner_path) rows = list(QueryJson.query(dir_path, query)) # self.log.debug("FileQuery %s %s done in %s" % (dir_inner_path, query, time.time()-s)) return self.response(to, rows) + + + # Sql query + def actionDbQuery(self, to, query, params=None): + rows = [] + try: + res = self.site.storage.query(query, params) + except Exception, err: # Response the error to client + return self.response(to, {"error": str(err)}) + # Convert result to dict + for row in res: + rows.append(dict(row)) + return self.response(to, rows) # Return file content def actionFileGet(self, to, inner_path): try: self.site.needFile(inner_path, priority=1) - body = open(self.site.getPath(inner_path)).read() + body = self.site.storage.read(inner_path) except: body = None return self.response(to, body) @@ -363,7 +378,7 @@ class UiWebsocket: site.saveSettings() site.worker_manager.running = False site.worker_manager.stopWorkers() - site.deleteFiles() + site.storage.deleteFiles() SiteManager.delete(address) site.updateWebsocket() else: diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index da4274aa..553a5d69 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -38,27 +38,26 @@ class Worker: if task["done"] == False: self.task = task + site = task["site"] task["workers_num"] += 1 - buff = self.peer.getFile(task["site"].address, task["inner_path"]) + buff = self.peer.getFile(site.address, task["inner_path"]) if self.running == False: # Worker no longer needed or got killed self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"])) break if buff: # Download ok - correct = task["site"].content_manager.verifyFile(task["inner_path"], buff) + correct = site.content_manager.verifyFile(task["inner_path"], buff) else: # Download error correct = False if correct == True or correct == None: # Hash ok or same file self.manager.log.debug("%s: Hash correct: %s" % (self.key, task["inner_path"])) - if task["done"] == False: # Task not done yet + if correct == True and task["done"] == False: # Save if changed and task not done yet buff.seek(0) - file_path = task["site"].getPath(task["inner_path"]) + file_path = site.storage.getPath(task["inner_path"]) file_dir = os.path.dirname(file_path) if not os.path.isdir(file_dir): os.makedirs(file_dir) # Make directory for files - file = open(file_path, "wb") - shutil.copyfileobj(buff, file) # Write buff to disk - file.close() - task["workers_num"] -= 1 - self.manager.doneTask(task) + site.storage.write(task["inner_path"], buff) + if task["done"] == False: self.manager.doneTask(task) + task["workers_num"] -= 1 self.task = None else: # Hash failed self.manager.log.debug("%s: Hash failed: %s, failed peers: %s" % (self.key, task["inner_path"], len(task["failed"]))) diff --git a/src/main.py b/src/main.py index d5bc2f8e..346ea3db 100644 --- a/src/main.py +++ b/src/main.py @@ -44,7 +44,7 @@ else: import gevent import time -logging.debug("Starting... %s" % config) +logging.debug("Config: %s" % config) # Starts here when running zeronet.py def start(): @@ -56,6 +56,7 @@ def start(): # Start serving UiServer and PeerServer def main(): + logging.info("Version: %s, Python %s, Gevent: %s" % (config.version, sys.version, gevent.__version__)) global ui_server, file_server from File import FileServer from Ui import UiServer @@ -118,19 +119,38 @@ def siteVerify(address): for content_inner_path in site.content_manager.contents: logging.info("Verifing %s signature..." % content_inner_path) - if site.content_manager.verifyFile(content_inner_path, open(site.getPath(content_inner_path), "rb"), ignore_same=False) == True: + if site.content_manager.verifyFile(content_inner_path, site.storage.open(content_inner_path, "rb"), ignore_same=False) == True: logging.info("[OK] %s signed by address %s!" % (content_inner_path, address)) else: logging.error("[ERROR] %s not signed by address %s!" % (content_inner_path, address)) logging.info("Verifying site files...") - bad_files = site.verifyFiles() + bad_files = site.storage.verifyFiles() if not bad_files: logging.info("[OK] All file sha512sum matches!") else: logging.error("[ERROR] Error during verifying site files!") +def dbRebuild(address): + from Site import Site + logging.info("Rebuilding site sql cache: %s..." % address) + site = Site(address) + s = time.time() + site.storage.rebuildDb() + logging.info("Done in %.3fs" % (time.time()-s)) + + +def dbQuery(address, query): + from Site import Site + import json + site = Site(address) + result = [] + for row in site.storage.query(query): + result.append(dict(row)) + print json.dumps(result, indent=4) + + def siteAnnounce(address): from Site.Site import Site logging.info("Announcing site %s to tracker..." % address) diff --git a/zeronet.py b/zeronet.py index 4dcceb73..a88a4f0a 100644 --- a/zeronet.py +++ b/zeronet.py @@ -1,6 +1,7 @@ #!/usr/bin/env python def main(): + print " - Starging ZeroNet..." try: from src import main main.start()