diff --git a/plugins/PeerDb/PeerDbPlugin.py b/plugins/PeerDb/PeerDbPlugin.py new file mode 100644 index 00000000..87a8eb85 --- /dev/null +++ b/plugins/PeerDb/PeerDbPlugin.py @@ -0,0 +1,88 @@ +import time +import sqlite3 +import random +import atexit + +import gevent +from Plugin import PluginManager + + +@PluginManager.registerTo("ContentDb") +class ContentDbPlugin(object): + def __init__(self, *args, **kwargs): + atexit.register(self.saveAllPeers) + super(ContentDbPlugin, self).__init__(*args, **kwargs) + + def getSchema(self): + schema = super(ContentDbPlugin, self).getSchema() + + schema["tables"]["peer"] = { + "cols": [ + ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"], + ["address", "TEXT NOT NULL"], + ["port", "INTEGER NOT NULL"], + ["hashfield", "BLOB"], + ["time_added", "INTEGER NOT NULL"] + ], + "indexes": [ + "CREATE UNIQUE INDEX peer_key ON peer (site_id, address, port)" + ], + "schema_changed": 1 + } + + return schema + + def loadPeers(self, site): + s = time.time() + site_id = self.site_ids.get(site.address) + res = self.execute("SELECT * FROM peer WHERE site_id = :site_id", {"site_id": site_id}) + num = 0 + num_hashfield = 0 + for row in res: + peer = site.addPeer(row["address"], row["port"]) + if not peer: # Already exist + continue + if row["hashfield"]: + peer.hashfield.replaceFromString(row["hashfield"]) + num_hashfield += 1 + peer.time_added = row["time_added"] + num += 1 + site.log.debug("%s peers (%s with hashfield) loaded in %.3fs" % (num, num_hashfield, time.time() - s)) + + def iteratePeers(self, site): + site_id = self.site_ids.get(site.address) + for key, peer in site.peers.iteritems(): + address, port = key.split(":") + if peer.has_hashfield: + hashfield = sqlite3.Binary(peer.hashfield.tostring()) + else: + hashfield = "" + yield (site_id, address, port, hashfield, int(peer.time_added)) + + def savePeers(self, site, spawn=False): + if spawn: + # Save peers every hour (+random some secs to not update very site at same time) + gevent.spawn_later(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True) + s = time.time() + site_id = self.site_ids.get(site.address) + cur = self.getCursor() + cur.execute("BEGIN") + self.execute("DELETE FROM peer WHERE site_id = :site_id", {"site_id": site_id}) + self.cur.cursor.executemany( + "INSERT INTO peer (site_id, address, port, hashfield, time_added) VALUES (?, ?, ?, ?, ?)", + self.iteratePeers(site) + ) + cur.execute("END") + site.log.debug("Peers saved in %.3fs" % (time.time() - s)) + + def initSite(self, site): + super(ContentDbPlugin, self).initSite(site) + gevent.spawn_later(0.5, self.loadPeers, site) + gevent.spawn_later(60*60, self.savePeers, site, spawn=True) + + def saveAllPeers(self): + for site in self.sites.values(): + try: + self.savePeers(site) + except Exception, err: + site.log.error("Save peer error: %s" % err) diff --git a/plugins/PeerDb/__init__.py b/plugins/PeerDb/__init__.py new file mode 100644 index 00000000..967561dc --- /dev/null +++ b/plugins/PeerDb/__init__.py @@ -0,0 +1,2 @@ +import PeerDbPlugin +