diff --git a/src/Db/Db.py b/src/Db/Db.py index c8eb969c..c3268b11 100644 --- a/src/Db/Db.py +++ b/src/Db/Db.py @@ -39,6 +39,8 @@ class Db(object): self.foreign_keys = False self.query_stats = {} self.db_keyvalues = {} + self.delayed_queue = [] + self.delayed_queue_thread = None self.last_query_time = time.time() def __repr__(self): @@ -73,8 +75,49 @@ class Db(object): self.connect() return self.cur.execute(query, params) + def insertOrUpdate(self, *args, **kwargs): + self.last_query_time = time.time() + if not self.conn: + self.connect() + return self.cur.insertOrUpdate(*args, **kwargs) + + def executeDelayed(self, *args, **kwargs): + if not self.delayed_queue_thread: + self.delayed_queue_thread = gevent.spawn_later(10, self.processDelayed) + self.delayed_queue.append(("execute", (args, kwargs))) + + def insertOrUpdateDelayed(self, *args, **kwargs): + if not self.delayed_queue: + gevent.spawn_later(1, self.processDelayed) + self.delayed_queue.append(("insertOrUpdate", (args, kwargs))) + + def processDelayed(self): + if not self.delayed_queue: + self.log.debug("processDelayed aborted") + return + self.last_query_time = time.time() + if not self.conn: + self.connect() + + s = time.time() + cur = self.getCursor() + cur.execute("BEGIN") + for command, params in self.delayed_queue: + if command == "insertOrUpdate": + cur.insertOrUpdate(*params[0], **params[1]) + else: + cur.execute(*params[0], **params[1]) + + cur.execute("END") + if len(self.delayed_queue) > 10: + self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s)) + self.delayed_queue = [] + self.delayed_queue_thread = None + def close(self): s = time.time() + if self.delayed_queue: + self.processDelayed() if self in opened_dbs: opened_dbs.remove(self) if self.cur: