New Db connection type to avoid corruption

This commit is contained in:
shortcutme 2019-03-16 02:40:32 +01:00
parent 0e2f7fb122
commit a1b5dad1c8
No known key found for this signature in database
GPG key ID: 5B63BAE6CB9613AE
2 changed files with 58 additions and 13 deletions

View file

@ -24,7 +24,19 @@ def dbCleanup():
if idle > 60 * 5 and db.close_idle:
db.close()
def dbCommitCheck():
while 1:
time.sleep(5)
for db in opened_dbs[:]:
if not db.need_commit:
continue
db.commit("Interval")
db.need_commit = False
time.sleep(0.1)
gevent.spawn(dbCleanup)
gevent.spawn(dbCommitCheck)
class Db(object):
@ -40,12 +52,15 @@ class Db(object):
self.table_names = None
self.collect_stats = False
self.foreign_keys = False
self.need_commit = False
self.query_stats = {}
self.db_keyvalues = {}
self.delayed_queue = []
self.delayed_queue_thread = None
self.close_idle = close_idle
self.last_query_time = time.time()
self.last_sleep_time = time.time()
self.num_execute_since_sleep = 0
def __repr__(self):
return "<Db#%s:%s close_idle:%s>" % (id(self), self.db_path, self.close_idle)
@ -59,21 +74,35 @@ class Db(object):
self.log.debug("Created Db path: %s" % self.db_dir)
if not os.path.isfile(self.db_path):
self.log.debug("Db file not exist yet: %s" % self.db_path)
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn = sqlite3.connect(self.db_path, check_same_thread=False, isolation_level="DEFERRED")
self.conn.row_factory = sqlite3.Row
self.conn.isolation_level = None
self.conn.set_progress_handler(self.progress, 100000)
self.cur = self.getCursor()
self.log.debug(
"Connected to %s in %.3fs (opened: %s, sqlite version: %s)..." %
(self.db_path, time.time() - s, len(opened_dbs), sqlite3.version)
)
def progress(self, *args, **kwargs):
gevent.sleep()
# Execute query using dbcursor
def execute(self, query, params=None):
if not self.conn:
self.connect()
return self.cur.execute(query, params)
def commit(self, reason="Unknown"):
try:
s = time.time()
self.conn.commit()
self.log.debug("Commited in %.3fs (reason: %s)" % (time.time() - s, reason))
return True
except Exception as err:
self.log.error("Commit error: %s" % err)
return False
def insertOrUpdate(self, *args, **kwargs):
if not self.conn:
self.connect()
@ -104,7 +133,6 @@ class Db(object):
else:
cur.execute(*params[0], **params[1])
cur.execute("END")
if len(self.delayed_queue) > 10:
self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s))
self.delayed_queue = []
@ -116,6 +144,8 @@ class Db(object):
self.processDelayed()
if self in opened_dbs:
opened_dbs.remove(self)
self.need_commit = False
self.commit("Closing")
if self.cur:
self.cur.close()
if self.conn:
@ -131,12 +161,6 @@ class Db(object):
self.connect()
cur = DbCursor(self.conn, self)
if config.db_mode == "security":
cur.execute("PRAGMA journal_mode = WAL")
cur.execute("PRAGMA synchronous = NORMAL")
else:
cur.execute("PRAGMA journal_mode = MEMORY")
cur.execute("PRAGMA synchronous = OFF")
if self.foreign_keys:
cur.execute("PRAGMA foreign_keys = ON")

View file

@ -1,5 +1,6 @@
import time
import re
import gevent
# Special sqlite cursor
@ -18,10 +19,10 @@ class DbCursor:
else:
return "'%s'" % value.replace("'", "''")
def execute(self, query, params=None):
self.db.last_query_time = time.time()
def parseQuery(self, query, params):
query_type = query.split(" ", 1)[0].upper()
if isinstance(params, dict) and "?" in query: # Make easier select and insert by allowing dict params
if query.startswith("SELECT") or query.startswith("DELETE") or query.startswith("UPDATE"):
if query_type in ("SELECT", "DELETE", "UPDATE"):
# Convert param dict to SELECT * FROM table WHERE key = ? AND key2 = ? format
query_wheres = []
values = []
@ -39,7 +40,8 @@ class DbCursor:
else:
query_values = ",".join(["?"] * len(value))
values += value
query_wheres.append("%s %s (%s)" %
query_wheres.append(
"%s %s (%s)" %
(field, operator, query_values)
)
else:
@ -78,7 +80,21 @@ class DbCursor:
new_params[key] = value
params = new_params
return query, params
def execute(self, query, params=None):
query = query.strip()
self.db.last_query_time = time.time()
if time.time() - self.db.last_sleep_time > 0.1:
if self.db.num_execute_since_sleep > 100:
gevent.sleep(0.001)
self.db.num_execute_since_sleep = 0
self.db.last_sleep_time = time.time()
self.db.num_execute_since_sleep += 1
query, params = self.parseQuery(query, params)
s = time.time()
@ -98,6 +114,11 @@ class DbCursor:
self.db.query_stats[query]["call"] += 1
self.db.query_stats[query]["time"] += time.time() - s
if not self.db.need_commit:
query_type = query.split(" ", 1)[0].upper()
if query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"]:
self.db.need_commit = True
return res
# Creates on updates a database row without incrementing the rowid