version 0.2.6, database support, dbrebuild and dbquery startup commands, connection firstchar error bugfix, log python gevent msgpack lib versions, sitestorage class for site file operations, dbquery websocket api command

This commit is contained in:
HelloZeroNet 2015-03-19 21:19:14 +01:00
parent bce0f56d45
commit 3b8d49207e
16 changed files with 699 additions and 120 deletions

248
src/Db/Db.py Normal file
View file

@ -0,0 +1,248 @@
import sqlite3, json, time, logging, re, os
from DbCursor import DbCursor
class Db:
def __init__(self, schema, db_path):
self.db_path = db_path
self.db_dir = os.path.dirname(db_path)+"/"
self.schema = schema
self.conn = None
self.cur = None
self.log = logging.getLogger("Db:%s" % schema["db_name"])
self.table_names = None
self.collect_stats = False
self.query_stats = {}
self.db_keyvalues = {}
def connect(self):
self.log.debug("Connecting (sqlite version: %s)..." % sqlite3.version)
if not os.path.isdir(self.db_dir): # Directory not exits yet
os.makedirs(self.db_dir)
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self.conn.isolation_level = None
self.cur = self.getCursor()
# We need more speed then security
self.cur.execute("PRAGMA journal_mode = WAL")
self.cur.execute("PRAGMA journal_mode = MEMORY")
self.cur.execute("PRAGMA synchronous = OFF")
# Execute query using dbcursor
def execute(self, query, params):
if not self.conn: self.connect()
return self.cur.execute(query, params)
def close(self):
self.log.debug("Closing")
if self.cur: self.cur.close()
if self.conn: self.conn.close()
# Gets a cursor object to database
# Return: Cursor class
def getCursor(self):
if not self.conn: self.connect()
return DbCursor(self.conn, self)
# Get the table version
# Return: Table version or None if not exits
def getTableVersion(self, table_name):
"""if not self.table_names: # Get exitsing table names
res = self.cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
self.table_names = [row["name"] for row in res]
if table_name not in self.table_names:
return False
else:"""
if not self.db_keyvalues: # Get db keyvalues
try:
res = self.cur.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues
except sqlite3.OperationalError, err: # Table not exits
self.log.debug("Query error: %s" % err)
return False
for row in res:
self.db_keyvalues[row["key"]] = row["value"]
return self.db_keyvalues.get("table.%s.version" % table_name, 0)
# Check Db tables
# Return: <list> Changed table names
def checkTables(self):
s = time.time()
changed_tables = []
cur = self.getCursor()
cur.execute("BEGIN")
# Check internal tables
changed = cur.needTable("keyvalue", [
["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
["key", "TEXT"],
["value", "INTEGER"],
["json_id", "INTEGER REFERENCES json (json_id)"],
],[
"CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)"
], version=1)
if changed: changed_tables.append("keyvalue")
changed = cur.needTable("json", [
["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
["path", "VARCHAR(255)"]
], [
"CREATE UNIQUE INDEX path ON json(path)"
], version=1)
if changed: changed_tables.append("json")
# Check schema tables
for table_name, table_settings in self.schema["tables"].items():
changed = cur.needTable(table_name, table_settings["cols"], table_settings["indexes"], version=table_settings["schema_changed"])
if changed: changed_tables.append(table_name)
cur.execute("COMMIT")
self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time()-s, changed_tables))
return changed_tables
# Load json file to db
# Return: True if matched
def loadJson(self, file_path, file = None, cur = None):
if not file_path.startswith(self.db_dir): return False # Not from the db dir: Skipping
relative_path = re.sub("^%s" % self.db_dir, "", file_path) # File path realative to db file
# Check if filename matches any of mappings in schema
matched_maps = []
for match, map_settings in self.schema["maps"].items():
if re.match(match, relative_path):
matched_maps.append(map_settings)
# No match found for the file
if not matched_maps: return False
# Load the json file
if not file: file = open(file_path)
data = json.load(file)
# No cursor specificed
if not cur:
cur = self.getCursor()
cur.execute("BEGIN")
cur.logging = False
commit_after_done = True
else:
commit_after_done = False
# Row for current json file
json_row = cur.getJsonRow(relative_path)
# Check matched mappings in schema
for map in matched_maps:
# Insert non-relational key values
if map.get("to_keyvalue"):
# Get current values
res = cur.execute("SELECT * FROM keyvalue WHERE json_id = ?", (json_row["json_id"],))
current_keyvalue = {}
current_keyvalue_id = {}
for row in res:
current_keyvalue[row["key"]] = row["value"]
current_keyvalue_id[row["key"]] = row["keyvalue_id"]
for key in map["to_keyvalue"]:
if key not in current_keyvalue: # Keyvalue not exits yet in the db
cur.execute("INSERT INTO keyvalue ?",
{"key": key, "value": data.get(key), "json_id": json_row["json_id"]}
)
elif data.get(key) != current_keyvalue[key]: # Keyvalue different value
cur.execute("UPDATE keyvalue SET value = ? WHERE keyvalue_id = ?", (data.get(key), current_keyvalue_id[key]))
"""for key in map.get("to_keyvalue", []):
cur.execute("INSERT OR REPLACE INTO keyvalue ?",
{"key": key, "value": data.get(key), "json_id": json_row["json_id"]}
)
"""
# Insert data to tables
for table_settings in map.get("to_table", []):
if isinstance(table_settings, dict): # Custom settings
table_name = table_settings["table"] # Table name to insert datas
node = table_settings.get("node", table_name) # Node keyname in data json file
key_col = table_settings.get("key_col") # Map dict key as this col
val_col = table_settings.get("val_col") # Map dict value as this col
import_cols = table_settings.get("import_cols")
replaces = table_settings.get("replaces")
else: # Simple settings
table_name = table_settings
node = table_settings
key_col = None
val_col = None
import_cols = None
replaces = None
cur.execute("DELETE FROM %s WHERE json_id = ?" % table_name, (json_row["json_id"],))
if node not in data: continue
table_schema = self.schema["tables"][table_name]
if key_col: # Map as dict
for key, val in data[node].iteritems():
if val_col: # Single value
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name,
{ key_col: key, val_col: val, "json_id": json_row["json_id"] }
)
else: # Multi value
if isinstance(val, dict): # Single row
row = val
if import_cols: row = { key: row[key] for key in import_cols } # Filter row by import_cols
row[key_col] = key
# Replace in value if necessary
if replaces:
for replace_key, replace in replaces.iteritems():
if replace_key in row:
for replace_from, replace_to in replace.iteritems():
row[replace_key] = row[replace_key].replace(replace_from, replace_to)
row["json_id"] = json_row["json_id"]
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
else: # Multi row
for row in val:
row[key_col] = key
row["json_id"] = json_row["json_id"]
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
else: # Map as list
for row in data[node]:
row["json_id"] = json_row["json_id"]
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
if commit_after_done: cur.execute("COMMIT")
return True
if __name__ == "__main__":
s = time.time()
console_log = logging.StreamHandler()
logging.getLogger('').setLevel(logging.DEBUG)
logging.getLogger('').addHandler(console_log)
console_log.setLevel(logging.DEBUG)
dbjson = DbJson(json.load(open("zerotalk.schema.json")), "data/users/zerotalk.db")
dbjson.collect_stats = True
dbjson.checkTables()
cur = dbjson.getCursor()
cur.execute("BEGIN")
cur.logging = False
dbjson.loadJson("data/users/content.json", cur=cur)
for user_dir in os.listdir("data/users"):
if os.path.isdir("data/users/%s" % user_dir):
dbjson.loadJson("data/users/%s/data.json" % user_dir, cur=cur)
#print ".",
cur.logging = True
cur.execute("COMMIT")
print "Done in %.3fs" % (time.time()-s)
for query, stats in sorted(dbjson.query_stats.items()):
print "-", query, stats

106
src/Db/DbCursor.py Normal file
View file

@ -0,0 +1,106 @@
import time
# Special sqlite cursor
class DbCursor:
def __init__(self, conn, db):
self.conn = conn
self.db = db
self.cursor = conn.cursor()
self.logging = True
def execute(self, query, params=None):
if isinstance(params, dict): # Make easier select and insert by allowing dict params
if query.startswith("SELECT") or query.startswith("DELETE"): # Convert param dict to SELECT * FROM table WHERE key = ?, key2 = ? format
wheres = ", ".join([key+" = ?" for key in params])
query = query.replace("?", wheres)
params = params.values()
else: # Convert param dict to INSERT INTO table (key, key2) VALUES (?, ?) format
keys = ", ".join(params.keys())
values = ", ".join(['?' for key in params.keys()])
query = query.replace("?", "(%s) VALUES (%s)" % (keys, values))
params = tuple(params.values())
s = time.time()
# if query == "COMMIT": self.logging = True # Turn logging back on transaction commit
if params: # Query has parameters
res = self.cursor.execute(query, params)
if self.logging:
self.db.log.debug((query.replace("?", "%s") % params)+" (Done in %.4f)" % (time.time()-s))
else:
res = self.cursor.execute(query)
if self.logging: self.db.log.debug(query+" (Done in %.4f)" % (time.time()-s))
# Log query stats
if self.db.collect_stats:
if query not in self.db.query_stats:
self.db.query_stats[query] = {"call": 0, "time": 0.0}
self.db.query_stats[query]["call"] += 1
self.db.query_stats[query]["time"] += time.time()-s
# if query == "BEGIN": self.logging = False # Turn logging off on transaction commit
return res
# Create new table
# Return: True on success
def createTable(self, table, cols):
# TODO: Check current structure
"""table_changed = False
res = c.execute("PRAGMA table_info(%s)" % table)
if res:
for row in res:
print row["name"], row["type"], cols[row["name"]]
print row
else:
table_changed = True
if table_changed: # Table structure changed, drop and create again"""
self.execute("DROP TABLE IF EXISTS %s" % table)
col_definitions = []
for col_name, col_type in cols:
col_definitions.append("%s %s" % (col_name, col_type))
self.execute("CREATE TABLE %s (%s)" % (table, ",".join(col_definitions)))
return True
# Create indexes on table
# Return: True on success
def createIndexes(self, table, indexes):
# indexes.append("CREATE INDEX %s_id ON %s(%s_id)" % (table, table, table)) # Primary key index
for index in indexes:
self.execute(index)
# Create table if not exits
# Return: True if updated
def needTable(self, table, cols, indexes=None, version=1):
current_version = self.db.getTableVersion(table)
if int(current_version) < int(version): # Table need update or not extis
self.db.log.info("Table %s outdated...version: %s need: %s, rebuilding..." % (table, current_version, version))
self.createTable(table, cols)
if indexes: self.createIndexes(table, indexes)
self.execute("INSERT OR REPLACE INTO keyvalue ?",
{"json_id": 0, "key": "table.%s.version" % table, "value": version}
)
return True
else: # Not changed
return False
# Get or create a row for json file
# Return: The database row
def getJsonRow(self, file_path):
res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path})
row = res.fetchone()
if not row: # No row yet, create it
self.execute("INSERT INTO json ?", {"path": file_path})
res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path})
row = res.fetchone()
return row
def close(self):
self.cursor.close()

2
src/Db/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from Db import Db
from DbCursor import DbCursor