From a72da8af563de38ef0f2b070ab04bf5d86faa934 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Fri, 19 Jan 2018 02:13:17 +0100 Subject: [PATCH] Chart plugin --- plugins/Chart/ChartCollector.py | 180 ++++++++++++++++++++++++++++++++ plugins/Chart/ChartDb.py | 124 ++++++++++++++++++++++ plugins/Chart/ChartPlugin.py | 60 +++++++++++ plugins/Chart/__init__.py | 1 + 4 files changed, 365 insertions(+) create mode 100644 plugins/Chart/ChartCollector.py create mode 100644 plugins/Chart/ChartDb.py create mode 100644 plugins/Chart/ChartPlugin.py create mode 100644 plugins/Chart/__init__.py diff --git a/plugins/Chart/ChartCollector.py b/plugins/Chart/ChartCollector.py new file mode 100644 index 00000000..d3096975 --- /dev/null +++ b/plugins/Chart/ChartCollector.py @@ -0,0 +1,180 @@ +import time +import sys +import collections +import itertools +import logging + +import gevent +from util import helper +from Config import config + + +class ChartCollector(object): + def __init__(self, db): + self.db = db + if config.action == "main": + gevent.spawn_later(60 * 3, self.collector) + self.log = logging.getLogger("ChartCollector") + self.last_values = collections.defaultdict(dict) + + def setInitialLastValues(self, sites): + # Recover last value of site bytes/sent + for site in sites: + self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0) + self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0) + + def getCollectors(self): + collectors = {} + file_server = sys.modules["main"].file_server + sites = file_server.sites + content_db = sites.values()[0].content_manager.contents.db + + # Connection stats + collectors["connection"] = lambda: len(file_server.connections) + collectors["connection_in"] = ( + lambda: len([1 for connection in file_server.connections if connection.type == "in"]) + ) + collectors["connection_onion"] = ( + lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")]) + ) + collectors["connection_ping_avg"] = ( + lambda: round(1000 * helper.avg( + [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay] + )) + ) + collectors["connection_ping_min"] = ( + lambda: round(1000 * min( + [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay] + )) + ) + collectors["connection_rev_avg"] = ( + lambda: helper.avg( + [connection.handshake["rev"] for connection in file_server.connections if connection.handshake] + ) + ) + + # Request stats + collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv + collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent + collectors["request_num_recv|change"] = lambda: file_server.num_recv + collectors["request_num_sent|change"] = lambda: file_server.num_sent + + # Limit + collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes() + collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes() + collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()]) + + # Peers + collectors["peer"] = lambda (peers): len(peers) + collectors["peer_onion"] = lambda (peers): len([True for peer in peers if ".onion" in peer]) + + # Size + collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()]) + collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()]) + collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()]) + + return collectors + + def getSiteCollectors(self): + site_collectors = {} + + # Size + site_collectors["site_size"] = lambda(site): site.settings.get("size", 0) + site_collectors["site_size_optional"] = lambda(site): site.settings.get("size_optional", 0) + site_collectors["site_optional_downloaded"] = lambda(site): site.settings.get("optional_downloaded", 0) + site_collectors["site_content"] = lambda(site): len(site.content_manager.contents) + + # Data transfer + site_collectors["site_bytes_recv|change"] = lambda(site): site.settings.get("bytes_recv", 0) + site_collectors["site_bytes_sent|change"] = lambda(site): site.settings.get("bytes_sent", 0) + + # Peers + site_collectors["site_peer"] = lambda(site): len(site.peers) + site_collectors["site_peer_onion"] = lambda(site): len( + [True for peer in site.peers.itervalues() if peer.ip.endswith(".onion")] + ) + site_collectors["site_peer_connected"] = lambda(site): len([True for peer in site.peers.itervalues() if peer.connection]) + + return site_collectors + + def getUniquePeers(self): + sites = sys.modules["main"].file_server.sites + return set(itertools.chain.from_iterable( + [site.peers.keys() for site in sites.values()] + )) + + def collectDatas(self, collectors, last_values, site=None): + if site is None: + peers = self.getUniquePeers() + datas = {} + for key, collector in collectors.iteritems(): + try: + if site: + value = collector(site) + elif key.startswith("peer"): + value = collector(peers) + else: + value = collector() + except Exception as err: + self.log.info("Collector %s error: %s" % (key, err)) + value = None + + if "|change" in key: # Store changes relative to last value + key = key.replace("|change", "") + last_value = last_values.get(key, 0) + last_values[key] = value + value = value - last_value + + if value is None: + datas[key] = None + else: + datas[key] = round(value, 3) + return datas + + def collectGlobal(self, collectors, last_values): + now = int(time.time()) + s = time.time() + datas = self.collectDatas(collectors, last_values["global"]) + values = [] + for key, value in datas.iteritems(): + values.append((self.db.getTypeId(key), value, now)) + self.log.debug("Global collectors done in %.3fs" % (time.time() - s)) + + s = time.time() + cur = self.db.getCursor() + cur.execute("BEGIN") + cur.cursor.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values) + cur.execute("END") + cur.close() + self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s)) + + def collectSites(self, sites, collectors, last_values): + now = int(time.time()) + s = time.time() + values = [] + for address, site in sites.iteritems(): + site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site) + for key, value in site_datas.iteritems(): + values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now)) + time.sleep(0.000001) + self.log.debug("Site collections done in %.3fs" % (time.time() - s)) + + s = time.time() + cur = self.db.getCursor() + cur.execute("BEGIN") + cur.cursor.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values) + cur.execute("END") + cur.close() + self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s)) + + def collector(self): + collectors = self.getCollectors() + site_collectors = self.getSiteCollectors() + sites = sys.modules["main"].file_server.sites + i = 0 + while 1: + self.collectGlobal(collectors, self.last_values) + if i % 12 == 0: # Only collect sites data every hour + self.collectSites(sites, site_collectors, self.last_values) + time.sleep(60 * 5) + i += 1 diff --git a/plugins/Chart/ChartDb.py b/plugins/Chart/ChartDb.py new file mode 100644 index 00000000..34683be1 --- /dev/null +++ b/plugins/Chart/ChartDb.py @@ -0,0 +1,124 @@ +from Config import config +from Db import Db +import time + + +class ChartDb(Db): + def __init__(self): + self.version = 2 + super(ChartDb, self).__init__(self.getSchema(), "%s/chart.db" % config.data_dir) + self.foreign_keys = True + self.checkTables() + self.sites = self.loadSites() + self.types = self.loadTypes() + + def getSchema(self): + schema = {} + schema["db_name"] = "Chart" + schema["tables"] = {} + schema["tables"]["data"] = { + "cols": [ + ["data_id", "INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE"], + ["type_id", "INTEGER NOT NULL"], + ["site_id", "INTEGER"], + ["value", "INTEGER"], + ["date_added", "DATETIME DEFAULT (CURRENT_TIMESTAMP)"] + ], + "indexes": [ + "CREATE INDEX site_id ON data (site_id)", + "CREATE INDEX date_added ON data (date_added)" + ], + "schema_changed": 2 + } + schema["tables"]["type"] = { + "cols": [ + ["type_id", "INTEGER PRIMARY KEY NOT NULL UNIQUE"], + ["name", "TEXT"] + ], + "schema_changed": 1 + } + schema["tables"]["site"] = { + "cols": [ + ["site_id", "INTEGER PRIMARY KEY NOT NULL UNIQUE"], + ["address", "TEXT"] + ], + "schema_changed": 1 + } + return schema + + def getTypeId(self, name): + if name not in self.types: + self.execute("INSERT INTO type ?", {"name": name}) + self.types[name] = self.cur.cursor.lastrowid + + return self.types[name] + + def getSiteId(self, address): + if address not in self.sites: + self.execute("INSERT INTO site ?", {"address": address}) + self.sites[address] = self.cur.cursor.lastrowid + + return self.sites[address] + + def loadSites(self): + sites = {} + for row in self.execute("SELECT * FROM site"): + sites[row["address"]] = row["site_id"] + return sites + + def loadTypes(self): + types = {} + for row in self.execute("SELECT * FROM type"): + types[row["name"]] = row["type_id"] + return types + + def deleteSite(self, address): + if address in self.sites: + site_id = self.sites[address] + del self.sites[address] + self.execute("DELETE FROM site WHERE ?", {"site_id": site_id}) + self.execute("DELETE FROM data WHERE ?", {"site_id": site_id}) + + def archive(self): + week_back = 1 + while 1: + s = time.time() + date_added_from = time.time() - 60 * 60 * 24 * 7 * (week_back + 1) + date_added_to = date_added_from + 60 * 60 * 24 * 7 + res = self.execute(""" + SELECT + MAX(date_added) AS date_added, + SUM(value) AS value, + GROUP_CONCAT(data_id) AS data_ids, + type_id, + site_id, + COUNT(*) AS num + FROM data + WHERE + site_id IS NULL AND + date_added > :date_added_from AND + date_added < :date_added_to + GROUP BY strftime('%Y-%m-%d %H', date_added, 'unixepoch', 'localtime'), type_id + """, {"date_added_from": date_added_from, "date_added_to": date_added_to}) + + num_archived = 0 + cur = self.getCursor() + for row in res: + print dict(row) + if row["num"] == 1: + continue + cur.execute("INSERT INTO data ?", { + "type_id": row["type_id"], + "site_id": row["site_id"], + "value": row["value"], + "date_added": row["date_added"] + }) + cur.execute("DELETE FROM data WHERE data_id IN (%s)" % row["data_ids"]) + num_archived += row["num"] + self.log.debug("Archived %s data from %s weeks ago in %.3fs" % (num_archived, week_back, time.time() - s)) + week_back += 1 + time.sleep(0.1) + if num_archived == 0: + break + if week_back > 1: + self.execute("VACUUM") diff --git a/plugins/Chart/ChartPlugin.py b/plugins/Chart/ChartPlugin.py new file mode 100644 index 00000000..ae606ce8 --- /dev/null +++ b/plugins/Chart/ChartPlugin.py @@ -0,0 +1,60 @@ +import time +import itertools + +import gevent + +from Config import config +from util import helper +from Plugin import PluginManager +from ChartDb import ChartDb +from ChartCollector import ChartCollector + +if "db" not in locals().keys(): # Share on reloads + db = ChartDb() + gevent.spawn_later(10 * 60 * 60, db.archive) + helper.timer(60 * 60 * 6, db.archive) + collector = ChartCollector(db) + +@PluginManager.registerTo("SiteManager") +class SiteManagerPlugin(object): + def load(self, *args, **kwargs): + back = super(SiteManagerPlugin, self).load(*args, **kwargs) + collector.setInitialLastValues(self.sites.values()) + return back + + def delete(self, address, *args, **kwargs): + db.deleteSite(address) + return super(SiteManagerPlugin, self).delete(address, *args, **kwargs) + +@PluginManager.registerTo("UiWebsocket") +class UiWebsocketPlugin(object): + def actionChartDbQuery(self, to, query, params=None): + if not "ADMIN" in self.permissions: + return {"error": "No permission"} + + if config.debug or config.verbose: + s = time.time() + rows = [] + try: + if not query.strip().upper().startswith("SELECT"): + raise Exception("Only SELECT query supported") + res = db.execute(query, params) + except Exception, err: # Response the error to client + self.log.error("ChartDbQuery error: %s" % err) + return {"error": str(err)} + # Convert result to dict + for row in res: + rows.append(dict(row)) + if config.verbose and time.time() - s > 0.1: # Log slow query + self.log.debug("Slow query: %s (%.3fs)" % (query, time.time() - s)) + return rows + + def actionChartGetPeerLocations(self, to): + if not "ADMIN" in self.permissions: + return {"error": "No permission"} + + peers = {} + for site in self.server.sites.values(): + peers.update(site.peers) + peer_locations = self.getPeerLocations(peers) + return peer_locations diff --git a/plugins/Chart/__init__.py b/plugins/Chart/__init__.py new file mode 100644 index 00000000..78981122 --- /dev/null +++ b/plugins/Chart/__init__.py @@ -0,0 +1 @@ +import ChartPlugin \ No newline at end of file