Support delayed commands in Db
This commit is contained in:
parent
b7ede15b06
commit
f55a1f1bf3
1 changed files with 43 additions and 0 deletions
43
src/Db/Db.py
43
src/Db/Db.py
|
@ -39,6 +39,8 @@ class Db(object):
|
||||||
self.foreign_keys = False
|
self.foreign_keys = False
|
||||||
self.query_stats = {}
|
self.query_stats = {}
|
||||||
self.db_keyvalues = {}
|
self.db_keyvalues = {}
|
||||||
|
self.delayed_queue = []
|
||||||
|
self.delayed_queue_thread = None
|
||||||
self.last_query_time = time.time()
|
self.last_query_time = time.time()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
@ -73,8 +75,49 @@ class Db(object):
|
||||||
self.connect()
|
self.connect()
|
||||||
return self.cur.execute(query, params)
|
return self.cur.execute(query, params)
|
||||||
|
|
||||||
|
def insertOrUpdate(self, *args, **kwargs):
|
||||||
|
self.last_query_time = time.time()
|
||||||
|
if not self.conn:
|
||||||
|
self.connect()
|
||||||
|
return self.cur.insertOrUpdate(*args, **kwargs)
|
||||||
|
|
||||||
|
def executeDelayed(self, *args, **kwargs):
|
||||||
|
if not self.delayed_queue_thread:
|
||||||
|
self.delayed_queue_thread = gevent.spawn_later(10, self.processDelayed)
|
||||||
|
self.delayed_queue.append(("execute", (args, kwargs)))
|
||||||
|
|
||||||
|
def insertOrUpdateDelayed(self, *args, **kwargs):
|
||||||
|
if not self.delayed_queue:
|
||||||
|
gevent.spawn_later(1, self.processDelayed)
|
||||||
|
self.delayed_queue.append(("insertOrUpdate", (args, kwargs)))
|
||||||
|
|
||||||
|
def processDelayed(self):
|
||||||
|
if not self.delayed_queue:
|
||||||
|
self.log.debug("processDelayed aborted")
|
||||||
|
return
|
||||||
|
self.last_query_time = time.time()
|
||||||
|
if not self.conn:
|
||||||
|
self.connect()
|
||||||
|
|
||||||
|
s = time.time()
|
||||||
|
cur = self.getCursor()
|
||||||
|
cur.execute("BEGIN")
|
||||||
|
for command, params in self.delayed_queue:
|
||||||
|
if command == "insertOrUpdate":
|
||||||
|
cur.insertOrUpdate(*params[0], **params[1])
|
||||||
|
else:
|
||||||
|
cur.execute(*params[0], **params[1])
|
||||||
|
|
||||||
|
cur.execute("END")
|
||||||
|
if len(self.delayed_queue) > 10:
|
||||||
|
self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s))
|
||||||
|
self.delayed_queue = []
|
||||||
|
self.delayed_queue_thread = None
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
s = time.time()
|
s = time.time()
|
||||||
|
if self.delayed_queue:
|
||||||
|
self.processDelayed()
|
||||||
if self in opened_dbs:
|
if self in opened_dbs:
|
||||||
opened_dbs.remove(self)
|
opened_dbs.remove(self)
|
||||||
if self.cur:
|
if self.cur:
|
||||||
|
|
Loading…
Reference in a new issue