From a1b5dad1c8058f7fd11dd5fcbbb11eef217a9a36 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Sat, 16 Mar 2019 02:40:32 +0100 Subject: [PATCH] New Db connection type to avoid corruption --- src/Db/Db.py | 42 +++++++++++++++++++++++++++++++++--------- src/Db/DbCursor.py | 29 +++++++++++++++++++++++++---- 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/src/Db/Db.py b/src/Db/Db.py index b48e52b5..044dc347 100644 --- a/src/Db/Db.py +++ b/src/Db/Db.py @@ -24,7 +24,19 @@ def dbCleanup(): if idle > 60 * 5 and db.close_idle: db.close() +def dbCommitCheck(): + while 1: + time.sleep(5) + for db in opened_dbs[:]: + if not db.need_commit: + continue + + db.commit("Interval") + db.need_commit = False + time.sleep(0.1) + gevent.spawn(dbCleanup) +gevent.spawn(dbCommitCheck) class Db(object): @@ -40,12 +52,15 @@ class Db(object): self.table_names = None self.collect_stats = False self.foreign_keys = False + self.need_commit = False self.query_stats = {} self.db_keyvalues = {} self.delayed_queue = [] self.delayed_queue_thread = None self.close_idle = close_idle self.last_query_time = time.time() + self.last_sleep_time = time.time() + self.num_execute_since_sleep = 0 def __repr__(self): return "" % (id(self), self.db_path, self.close_idle) @@ -59,21 +74,35 @@ class Db(object): self.log.debug("Created Db path: %s" % self.db_dir) if not os.path.isfile(self.db_path): self.log.debug("Db file not exist yet: %s" % self.db_path) - self.conn = sqlite3.connect(self.db_path, check_same_thread=False) + self.conn = sqlite3.connect(self.db_path, check_same_thread=False, isolation_level="DEFERRED") self.conn.row_factory = sqlite3.Row - self.conn.isolation_level = None + self.conn.set_progress_handler(self.progress, 100000) self.cur = self.getCursor() self.log.debug( "Connected to %s in %.3fs (opened: %s, sqlite version: %s)..." % (self.db_path, time.time() - s, len(opened_dbs), sqlite3.version) ) + def progress(self, *args, **kwargs): + gevent.sleep() + # Execute query using dbcursor def execute(self, query, params=None): if not self.conn: self.connect() return self.cur.execute(query, params) + def commit(self, reason="Unknown"): + try: + s = time.time() + self.conn.commit() + self.log.debug("Commited in %.3fs (reason: %s)" % (time.time() - s, reason)) + return True + except Exception as err: + self.log.error("Commit error: %s" % err) + return False + + def insertOrUpdate(self, *args, **kwargs): if not self.conn: self.connect() @@ -104,7 +133,6 @@ class Db(object): else: cur.execute(*params[0], **params[1]) - cur.execute("END") if len(self.delayed_queue) > 10: self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s)) self.delayed_queue = [] @@ -116,6 +144,8 @@ class Db(object): self.processDelayed() if self in opened_dbs: opened_dbs.remove(self) + self.need_commit = False + self.commit("Closing") if self.cur: self.cur.close() if self.conn: @@ -131,12 +161,6 @@ class Db(object): self.connect() cur = DbCursor(self.conn, self) - if config.db_mode == "security": - cur.execute("PRAGMA journal_mode = WAL") - cur.execute("PRAGMA synchronous = NORMAL") - else: - cur.execute("PRAGMA journal_mode = MEMORY") - cur.execute("PRAGMA synchronous = OFF") if self.foreign_keys: cur.execute("PRAGMA foreign_keys = ON") diff --git a/src/Db/DbCursor.py b/src/Db/DbCursor.py index 88d898ca..7df7dacb 100644 --- a/src/Db/DbCursor.py +++ b/src/Db/DbCursor.py @@ -1,5 +1,6 @@ import time import re +import gevent # Special sqlite cursor @@ -18,10 +19,10 @@ class DbCursor: else: return "'%s'" % value.replace("'", "''") - def execute(self, query, params=None): - self.db.last_query_time = time.time() + def parseQuery(self, query, params): + query_type = query.split(" ", 1)[0].upper() if isinstance(params, dict) and "?" in query: # Make easier select and insert by allowing dict params - if query.startswith("SELECT") or query.startswith("DELETE") or query.startswith("UPDATE"): + if query_type in ("SELECT", "DELETE", "UPDATE"): # Convert param dict to SELECT * FROM table WHERE key = ? AND key2 = ? format query_wheres = [] values = [] @@ -39,7 +40,8 @@ class DbCursor: else: query_values = ",".join(["?"] * len(value)) values += value - query_wheres.append("%s %s (%s)" % + query_wheres.append( + "%s %s (%s)" % (field, operator, query_values) ) else: @@ -78,7 +80,21 @@ class DbCursor: new_params[key] = value params = new_params + return query, params + def execute(self, query, params=None): + query = query.strip() + self.db.last_query_time = time.time() + + if time.time() - self.db.last_sleep_time > 0.1: + if self.db.num_execute_since_sleep > 100: + gevent.sleep(0.001) + self.db.num_execute_since_sleep = 0 + self.db.last_sleep_time = time.time() + + self.db.num_execute_since_sleep += 1 + + query, params = self.parseQuery(query, params) s = time.time() @@ -98,6 +114,11 @@ class DbCursor: self.db.query_stats[query]["call"] += 1 self.db.query_stats[query]["time"] += time.time() - s + if not self.db.need_commit: + query_type = query.split(" ", 1)[0].upper() + if query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"]: + self.db.need_commit = True + return res # Creates on updates a database row without incrementing the rowid