Try fix Recursive use of cursors ProgrammingError by creating new cursor for every execute and move Lock to db
This commit is contained in:
parent
afd23849a6
commit
fca9db7972
6 changed files with 51 additions and 27 deletions
|
@ -142,14 +142,14 @@ class ContentDbPlugin(object):
|
|||
if not user:
|
||||
user = UserManager.user_manager.create()
|
||||
auth_address = user.getAuthAddress(site.address)
|
||||
self.execute(
|
||||
res = self.execute(
|
||||
"UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
|
||||
{"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
|
||||
)
|
||||
|
||||
self.log.debug(
|
||||
"Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
|
||||
(site.address, time.time() - s, num, self.cur.cursor.rowcount)
|
||||
(site.address, time.time() - s, num, res.rowcount)
|
||||
)
|
||||
self.filled[site.address] = True
|
||||
|
||||
|
|
|
@ -72,12 +72,12 @@ class ContentManagerPlugin(object):
|
|||
return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
|
||||
|
||||
def optionalRemoved(self, inner_path, hash_id, size=None):
|
||||
self.contents.db.execute(
|
||||
res = self.contents.db.execute(
|
||||
"UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1",
|
||||
{"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
|
||||
)
|
||||
|
||||
if self.contents.db.cur.cursor.rowcount > 0:
|
||||
if res.rowcount > 0:
|
||||
back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
|
||||
# Re-add to hashfield if we have other file with the same hash_id
|
||||
if self.isDownloaded(hash_id=hash_id, force_check_db=True):
|
||||
|
|
|
@ -79,7 +79,7 @@ class ContentDbPlugin(object):
|
|||
cur = self.getCursor()
|
||||
try:
|
||||
cur.execute("DELETE FROM peer WHERE site_id = :site_id", {"site_id": site_id})
|
||||
cur.cursor.executemany(
|
||||
cur.executemany(
|
||||
"INSERT INTO peer (site_id, address, port, hashfield, reputation, time_added, time_found) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
self.iteratePeers(site)
|
||||
)
|
||||
|
|
|
@ -79,8 +79,8 @@ class BootstrapperDb(Db.Db):
|
|||
def getHashId(self, hash):
|
||||
if hash not in self.hash_ids:
|
||||
self.log.debug("New hash: %s" % repr(hash))
|
||||
self.execute("INSERT OR IGNORE INTO hash ?", {"hash": hash})
|
||||
self.hash_ids[hash] = self.cur.cursor.lastrowid
|
||||
res = self.execute("INSERT OR IGNORE INTO hash ?", {"hash": hash})
|
||||
self.hash_ids[hash] = res.lastrowid
|
||||
return self.hash_ids[hash]
|
||||
|
||||
def peerAnnounce(self, ip_type, address, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False):
|
||||
|
@ -100,8 +100,8 @@ class BootstrapperDb(Db.Db):
|
|||
self.log.debug("New peer: %s signed: %s" % (address, onion_signed))
|
||||
if ip_type == "onion" and not onion_signed:
|
||||
return len(hashes)
|
||||
self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now})
|
||||
peer_id = self.cur.cursor.lastrowid
|
||||
res = self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now})
|
||||
peer_id = res.lastrowid
|
||||
|
||||
# Check user's hashes
|
||||
res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id})
|
||||
|
|
|
@ -8,6 +8,7 @@ import atexit
|
|||
import sys
|
||||
|
||||
import gevent
|
||||
from gevent._threading import Lock
|
||||
|
||||
from Debug import Debug
|
||||
from .DbCursor import DbCursor
|
||||
|
@ -44,15 +45,18 @@ def dbCloseAll():
|
|||
for db in opened_dbs[:]:
|
||||
db.close()
|
||||
|
||||
|
||||
gevent.spawn(dbCleanup)
|
||||
gevent.spawn(dbCommitCheck)
|
||||
atexit.register(dbCloseAll)
|
||||
|
||||
|
||||
class DbTableError(Exception):
|
||||
def __init__(self, message, table):
|
||||
super().__init__(message)
|
||||
self.table = table
|
||||
|
||||
|
||||
class Db(object):
|
||||
|
||||
def __init__(self, schema, db_path, close_idle=False):
|
||||
|
@ -76,6 +80,7 @@ class Db(object):
|
|||
self.last_query_time = time.time()
|
||||
self.last_sleep_time = time.time()
|
||||
self.num_execute_since_sleep = 0
|
||||
self.lock = Lock()
|
||||
|
||||
def __repr__(self):
|
||||
return "<Db#%s:%s close_idle:%s>" % (id(self), self.db_path, self.close_idle)
|
||||
|
@ -278,7 +283,6 @@ class Db(object):
|
|||
except Exception as err:
|
||||
self.log.error("Error creating table %s: %s" % (table_name, Debug.formatException(err)))
|
||||
raise DbTableError(err, table_name)
|
||||
#return False
|
||||
|
||||
self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables))
|
||||
if changed_tables:
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import time
|
||||
import re
|
||||
import gevent
|
||||
from gevent._threading import Lock
|
||||
from util import helper
|
||||
|
||||
|
||||
|
@ -13,9 +11,7 @@ class DbCursor:
|
|||
def __init__(self, conn, db):
|
||||
self.conn = conn
|
||||
self.db = db
|
||||
self.cursor = conn.cursor()
|
||||
self.logging = False
|
||||
self.lock = Lock()
|
||||
|
||||
def quoteValue(self, value):
|
||||
if type(value) is int:
|
||||
|
@ -100,19 +96,20 @@ class DbCursor:
|
|||
query, params = self.parseQuery(query, params)
|
||||
|
||||
s = time.time()
|
||||
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
try:
|
||||
self.lock.acquire(True)
|
||||
self.db.lock.acquire(True)
|
||||
if params: # Query has parameters
|
||||
res = self.cursor.execute(query, params)
|
||||
res = cursor.execute(query, params)
|
||||
if self.logging:
|
||||
self.db.log.debug(query + " " + str(params) + " (Done in %.4f)" % (time.time() - s))
|
||||
else:
|
||||
res = self.cursor.execute(query)
|
||||
res = cursor.execute(query)
|
||||
if self.logging:
|
||||
self.db.log.debug(query + " (Done in %.4f)" % (time.time() - s))
|
||||
finally:
|
||||
self.lock.release()
|
||||
self.db.lock.release()
|
||||
|
||||
# Log query stats
|
||||
if self.db.collect_stats:
|
||||
|
@ -121,12 +118,35 @@ class DbCursor:
|
|||
self.db.query_stats[query]["call"] += 1
|
||||
self.db.query_stats[query]["time"] += time.time() - s
|
||||
|
||||
if not self.db.need_commit:
|
||||
query_type = query.split(" ", 1)[0].upper()
|
||||
if query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"]:
|
||||
self.db.need_commit = True
|
||||
query_type = query.split(" ", 1)[0].upper()
|
||||
is_update_query = query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"]
|
||||
if not self.db.need_commit and is_update_query:
|
||||
self.db.need_commit = True
|
||||
|
||||
return res
|
||||
if is_update_query:
|
||||
return cursor
|
||||
else:
|
||||
return res
|
||||
|
||||
def executemany(self, query, params):
|
||||
while self.db.progress_sleeping:
|
||||
time.sleep(0.1)
|
||||
|
||||
self.db.last_query_time = time.time()
|
||||
|
||||
s = time.time()
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
try:
|
||||
self.db.lock.acquire(True)
|
||||
cursor.executemany(query, params)
|
||||
finally:
|
||||
self.db.lock.release()
|
||||
|
||||
if self.logging:
|
||||
self.db.log.debug("%s x %s (Done in %.4f)" % (query, len(params), time.time() - s))
|
||||
|
||||
return cursor
|
||||
|
||||
# Creates on updates a database row without incrementing the rowid
|
||||
def insertOrUpdate(self, table, query_sets, query_wheres, oninsert={}):
|
||||
|
@ -135,11 +155,11 @@ class DbCursor:
|
|||
|
||||
params = query_sets
|
||||
params.update(query_wheres)
|
||||
self.execute(
|
||||
res = self.execute(
|
||||
"UPDATE %s SET %s WHERE %s" % (table, ", ".join(sql_sets), " AND ".join(sql_wheres)),
|
||||
params
|
||||
)
|
||||
if self.cursor.rowcount == 0:
|
||||
if res.rowcount == 0:
|
||||
params.update(oninsert) # Add insert-only fields
|
||||
self.execute("INSERT INTO %s ?" % table, params)
|
||||
|
||||
|
@ -215,4 +235,4 @@ class DbCursor:
|
|||
return row
|
||||
|
||||
def close(self):
|
||||
self.cursor.close()
|
||||
pass
|
||||
|
|
Loading…
Reference in a new issue