diff --git a/plugins/OptionalManager/ContentDbPlugin.py b/plugins/OptionalManager/ContentDbPlugin.py index fd28092b..ccfd6637 100644 --- a/plugins/OptionalManager/ContentDbPlugin.py +++ b/plugins/OptionalManager/ContentDbPlugin.py @@ -142,14 +142,14 @@ class ContentDbPlugin(object): if not user: user = UserManager.user_manager.create() auth_address = user.getAuthAddress(site.address) - self.execute( + res = self.execute( "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path", {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address} ) self.log.debug( "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" % - (site.address, time.time() - s, num, self.cur.cursor.rowcount) + (site.address, time.time() - s, num, res.rowcount) ) self.filled[site.address] = True diff --git a/plugins/OptionalManager/OptionalManagerPlugin.py b/plugins/OptionalManager/OptionalManagerPlugin.py index 909caa31..c33063dc 100644 --- a/plugins/OptionalManager/OptionalManagerPlugin.py +++ b/plugins/OptionalManager/OptionalManagerPlugin.py @@ -72,12 +72,12 @@ class ContentManagerPlugin(object): return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own) def optionalRemoved(self, inner_path, hash_id, size=None): - self.contents.db.execute( + res = self.contents.db.execute( "UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1", {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path} ) - if self.contents.db.cur.cursor.rowcount > 0: + if res.rowcount > 0: back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size) # Re-add to hashfield if we have other file with the same hash_id if self.isDownloaded(hash_id=hash_id, force_check_db=True): diff --git a/plugins/PeerDb/PeerDbPlugin.py b/plugins/PeerDb/PeerDbPlugin.py index addd1d6f..2ce5e39f 100644 --- a/plugins/PeerDb/PeerDbPlugin.py +++ b/plugins/PeerDb/PeerDbPlugin.py @@ -79,7 +79,7 @@ class ContentDbPlugin(object): cur = self.getCursor() try: cur.execute("DELETE FROM peer WHERE site_id = :site_id", {"site_id": site_id}) - cur.cursor.executemany( + cur.executemany( "INSERT INTO peer (site_id, address, port, hashfield, reputation, time_added, time_found) VALUES (?, ?, ?, ?, ?, ?, ?)", self.iteratePeers(site) ) diff --git a/plugins/disabled-Bootstrapper/BootstrapperDb.py b/plugins/disabled-Bootstrapper/BootstrapperDb.py index 3c47b76d..0866dc3e 100644 --- a/plugins/disabled-Bootstrapper/BootstrapperDb.py +++ b/plugins/disabled-Bootstrapper/BootstrapperDb.py @@ -79,8 +79,8 @@ class BootstrapperDb(Db.Db): def getHashId(self, hash): if hash not in self.hash_ids: self.log.debug("New hash: %s" % repr(hash)) - self.execute("INSERT OR IGNORE INTO hash ?", {"hash": hash}) - self.hash_ids[hash] = self.cur.cursor.lastrowid + res = self.execute("INSERT OR IGNORE INTO hash ?", {"hash": hash}) + self.hash_ids[hash] = res.lastrowid return self.hash_ids[hash] def peerAnnounce(self, ip_type, address, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False): @@ -100,8 +100,8 @@ class BootstrapperDb(Db.Db): self.log.debug("New peer: %s signed: %s" % (address, onion_signed)) if ip_type == "onion" and not onion_signed: return len(hashes) - self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now}) - peer_id = self.cur.cursor.lastrowid + res = self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now}) + peer_id = res.lastrowid # Check user's hashes res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id}) diff --git a/src/Db/Db.py b/src/Db/Db.py index 2d1b2e66..63cbe407 100644 --- a/src/Db/Db.py +++ b/src/Db/Db.py @@ -8,6 +8,7 @@ import atexit import sys import gevent +from gevent._threading import Lock from Debug import Debug from .DbCursor import DbCursor @@ -44,15 +45,18 @@ def dbCloseAll(): for db in opened_dbs[:]: db.close() + gevent.spawn(dbCleanup) gevent.spawn(dbCommitCheck) atexit.register(dbCloseAll) + class DbTableError(Exception): def __init__(self, message, table): super().__init__(message) self.table = table + class Db(object): def __init__(self, schema, db_path, close_idle=False): @@ -76,6 +80,7 @@ class Db(object): self.last_query_time = time.time() self.last_sleep_time = time.time() self.num_execute_since_sleep = 0 + self.lock = Lock() def __repr__(self): return "" % (id(self), self.db_path, self.close_idle) @@ -278,7 +283,6 @@ class Db(object): except Exception as err: self.log.error("Error creating table %s: %s" % (table_name, Debug.formatException(err))) raise DbTableError(err, table_name) - #return False self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables)) if changed_tables: diff --git a/src/Db/DbCursor.py b/src/Db/DbCursor.py index 07639130..201d29d4 100644 --- a/src/Db/DbCursor.py +++ b/src/Db/DbCursor.py @@ -1,7 +1,5 @@ import time import re -import gevent -from gevent._threading import Lock from util import helper @@ -13,9 +11,7 @@ class DbCursor: def __init__(self, conn, db): self.conn = conn self.db = db - self.cursor = conn.cursor() self.logging = False - self.lock = Lock() def quoteValue(self, value): if type(value) is int: @@ -100,19 +96,20 @@ class DbCursor: query, params = self.parseQuery(query, params) s = time.time() - + cursor = self.conn.cursor() + try: - self.lock.acquire(True) + self.db.lock.acquire(True) if params: # Query has parameters - res = self.cursor.execute(query, params) + res = cursor.execute(query, params) if self.logging: self.db.log.debug(query + " " + str(params) + " (Done in %.4f)" % (time.time() - s)) else: - res = self.cursor.execute(query) + res = cursor.execute(query) if self.logging: self.db.log.debug(query + " (Done in %.4f)" % (time.time() - s)) finally: - self.lock.release() + self.db.lock.release() # Log query stats if self.db.collect_stats: @@ -121,12 +118,35 @@ class DbCursor: self.db.query_stats[query]["call"] += 1 self.db.query_stats[query]["time"] += time.time() - s - if not self.db.need_commit: - query_type = query.split(" ", 1)[0].upper() - if query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"]: - self.db.need_commit = True + query_type = query.split(" ", 1)[0].upper() + is_update_query = query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"] + if not self.db.need_commit and is_update_query: + self.db.need_commit = True - return res + if is_update_query: + return cursor + else: + return res + + def executemany(self, query, params): + while self.db.progress_sleeping: + time.sleep(0.1) + + self.db.last_query_time = time.time() + + s = time.time() + cursor = self.conn.cursor() + + try: + self.db.lock.acquire(True) + cursor.executemany(query, params) + finally: + self.db.lock.release() + + if self.logging: + self.db.log.debug("%s x %s (Done in %.4f)" % (query, len(params), time.time() - s)) + + return cursor # Creates on updates a database row without incrementing the rowid def insertOrUpdate(self, table, query_sets, query_wheres, oninsert={}): @@ -135,11 +155,11 @@ class DbCursor: params = query_sets params.update(query_wheres) - self.execute( + res = self.execute( "UPDATE %s SET %s WHERE %s" % (table, ", ".join(sql_sets), " AND ".join(sql_wheres)), params ) - if self.cursor.rowcount == 0: + if res.rowcount == 0: params.update(oninsert) # Add insert-only fields self.execute("INSERT INTO %s ?" % table, params) @@ -215,4 +235,4 @@ class DbCursor: return row def close(self): - self.cursor.close() + pass