Chart plugin
This commit is contained in:
parent
9b3a50c124
commit
a72da8af56
4 changed files with 365 additions and 0 deletions
180
plugins/Chart/ChartCollector.py
Normal file
180
plugins/Chart/ChartCollector.py
Normal file
|
@ -0,0 +1,180 @@
|
|||
import time
|
||||
import sys
|
||||
import collections
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
import gevent
|
||||
from util import helper
|
||||
from Config import config
|
||||
|
||||
|
||||
class ChartCollector(object):
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
if config.action == "main":
|
||||
gevent.spawn_later(60 * 3, self.collector)
|
||||
self.log = logging.getLogger("ChartCollector")
|
||||
self.last_values = collections.defaultdict(dict)
|
||||
|
||||
def setInitialLastValues(self, sites):
|
||||
# Recover last value of site bytes/sent
|
||||
for site in sites:
|
||||
self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0)
|
||||
self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0)
|
||||
|
||||
def getCollectors(self):
|
||||
collectors = {}
|
||||
file_server = sys.modules["main"].file_server
|
||||
sites = file_server.sites
|
||||
content_db = sites.values()[0].content_manager.contents.db
|
||||
|
||||
# Connection stats
|
||||
collectors["connection"] = lambda: len(file_server.connections)
|
||||
collectors["connection_in"] = (
|
||||
lambda: len([1 for connection in file_server.connections if connection.type == "in"])
|
||||
)
|
||||
collectors["connection_onion"] = (
|
||||
lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")])
|
||||
)
|
||||
collectors["connection_ping_avg"] = (
|
||||
lambda: round(1000 * helper.avg(
|
||||
[connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
|
||||
))
|
||||
)
|
||||
collectors["connection_ping_min"] = (
|
||||
lambda: round(1000 * min(
|
||||
[connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
|
||||
))
|
||||
)
|
||||
collectors["connection_rev_avg"] = (
|
||||
lambda: helper.avg(
|
||||
[connection.handshake["rev"] for connection in file_server.connections if connection.handshake]
|
||||
)
|
||||
)
|
||||
|
||||
# Request stats
|
||||
collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv
|
||||
collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent
|
||||
collectors["request_num_recv|change"] = lambda: file_server.num_recv
|
||||
collectors["request_num_sent|change"] = lambda: file_server.num_sent
|
||||
|
||||
# Limit
|
||||
collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes()
|
||||
collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes()
|
||||
collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()])
|
||||
|
||||
# Peers
|
||||
collectors["peer"] = lambda (peers): len(peers)
|
||||
collectors["peer_onion"] = lambda (peers): len([True for peer in peers if ".onion" in peer])
|
||||
|
||||
# Size
|
||||
collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()])
|
||||
collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()])
|
||||
collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()])
|
||||
|
||||
return collectors
|
||||
|
||||
def getSiteCollectors(self):
|
||||
site_collectors = {}
|
||||
|
||||
# Size
|
||||
site_collectors["site_size"] = lambda(site): site.settings.get("size", 0)
|
||||
site_collectors["site_size_optional"] = lambda(site): site.settings.get("size_optional", 0)
|
||||
site_collectors["site_optional_downloaded"] = lambda(site): site.settings.get("optional_downloaded", 0)
|
||||
site_collectors["site_content"] = lambda(site): len(site.content_manager.contents)
|
||||
|
||||
# Data transfer
|
||||
site_collectors["site_bytes_recv|change"] = lambda(site): site.settings.get("bytes_recv", 0)
|
||||
site_collectors["site_bytes_sent|change"] = lambda(site): site.settings.get("bytes_sent", 0)
|
||||
|
||||
# Peers
|
||||
site_collectors["site_peer"] = lambda(site): len(site.peers)
|
||||
site_collectors["site_peer_onion"] = lambda(site): len(
|
||||
[True for peer in site.peers.itervalues() if peer.ip.endswith(".onion")]
|
||||
)
|
||||
site_collectors["site_peer_connected"] = lambda(site): len([True for peer in site.peers.itervalues() if peer.connection])
|
||||
|
||||
return site_collectors
|
||||
|
||||
def getUniquePeers(self):
|
||||
sites = sys.modules["main"].file_server.sites
|
||||
return set(itertools.chain.from_iterable(
|
||||
[site.peers.keys() for site in sites.values()]
|
||||
))
|
||||
|
||||
def collectDatas(self, collectors, last_values, site=None):
|
||||
if site is None:
|
||||
peers = self.getUniquePeers()
|
||||
datas = {}
|
||||
for key, collector in collectors.iteritems():
|
||||
try:
|
||||
if site:
|
||||
value = collector(site)
|
||||
elif key.startswith("peer"):
|
||||
value = collector(peers)
|
||||
else:
|
||||
value = collector()
|
||||
except Exception as err:
|
||||
self.log.info("Collector %s error: %s" % (key, err))
|
||||
value = None
|
||||
|
||||
if "|change" in key: # Store changes relative to last value
|
||||
key = key.replace("|change", "")
|
||||
last_value = last_values.get(key, 0)
|
||||
last_values[key] = value
|
||||
value = value - last_value
|
||||
|
||||
if value is None:
|
||||
datas[key] = None
|
||||
else:
|
||||
datas[key] = round(value, 3)
|
||||
return datas
|
||||
|
||||
def collectGlobal(self, collectors, last_values):
|
||||
now = int(time.time())
|
||||
s = time.time()
|
||||
datas = self.collectDatas(collectors, last_values["global"])
|
||||
values = []
|
||||
for key, value in datas.iteritems():
|
||||
values.append((self.db.getTypeId(key), value, now))
|
||||
self.log.debug("Global collectors done in %.3fs" % (time.time() - s))
|
||||
|
||||
s = time.time()
|
||||
cur = self.db.getCursor()
|
||||
cur.execute("BEGIN")
|
||||
cur.cursor.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values)
|
||||
cur.execute("END")
|
||||
cur.close()
|
||||
self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s))
|
||||
|
||||
def collectSites(self, sites, collectors, last_values):
|
||||
now = int(time.time())
|
||||
s = time.time()
|
||||
values = []
|
||||
for address, site in sites.iteritems():
|
||||
site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site)
|
||||
for key, value in site_datas.iteritems():
|
||||
values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now))
|
||||
time.sleep(0.000001)
|
||||
self.log.debug("Site collections done in %.3fs" % (time.time() - s))
|
||||
|
||||
s = time.time()
|
||||
cur = self.db.getCursor()
|
||||
cur.execute("BEGIN")
|
||||
cur.cursor.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values)
|
||||
cur.execute("END")
|
||||
cur.close()
|
||||
self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s))
|
||||
|
||||
def collector(self):
|
||||
collectors = self.getCollectors()
|
||||
site_collectors = self.getSiteCollectors()
|
||||
sites = sys.modules["main"].file_server.sites
|
||||
i = 0
|
||||
while 1:
|
||||
self.collectGlobal(collectors, self.last_values)
|
||||
if i % 12 == 0: # Only collect sites data every hour
|
||||
self.collectSites(sites, site_collectors, self.last_values)
|
||||
time.sleep(60 * 5)
|
||||
i += 1
|
124
plugins/Chart/ChartDb.py
Normal file
124
plugins/Chart/ChartDb.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
from Config import config
|
||||
from Db import Db
|
||||
import time
|
||||
|
||||
|
||||
class ChartDb(Db):
|
||||
def __init__(self):
|
||||
self.version = 2
|
||||
super(ChartDb, self).__init__(self.getSchema(), "%s/chart.db" % config.data_dir)
|
||||
self.foreign_keys = True
|
||||
self.checkTables()
|
||||
self.sites = self.loadSites()
|
||||
self.types = self.loadTypes()
|
||||
|
||||
def getSchema(self):
|
||||
schema = {}
|
||||
schema["db_name"] = "Chart"
|
||||
schema["tables"] = {}
|
||||
schema["tables"]["data"] = {
|
||||
"cols": [
|
||||
["data_id", "INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE"],
|
||||
["type_id", "INTEGER NOT NULL"],
|
||||
["site_id", "INTEGER"],
|
||||
["value", "INTEGER"],
|
||||
["date_added", "DATETIME DEFAULT (CURRENT_TIMESTAMP)"]
|
||||
],
|
||||
"indexes": [
|
||||
"CREATE INDEX site_id ON data (site_id)",
|
||||
"CREATE INDEX date_added ON data (date_added)"
|
||||
],
|
||||
"schema_changed": 2
|
||||
}
|
||||
schema["tables"]["type"] = {
|
||||
"cols": [
|
||||
["type_id", "INTEGER PRIMARY KEY NOT NULL UNIQUE"],
|
||||
["name", "TEXT"]
|
||||
],
|
||||
"schema_changed": 1
|
||||
}
|
||||
schema["tables"]["site"] = {
|
||||
"cols": [
|
||||
["site_id", "INTEGER PRIMARY KEY NOT NULL UNIQUE"],
|
||||
["address", "TEXT"]
|
||||
],
|
||||
"schema_changed": 1
|
||||
}
|
||||
return schema
|
||||
|
||||
def getTypeId(self, name):
|
||||
if name not in self.types:
|
||||
self.execute("INSERT INTO type ?", {"name": name})
|
||||
self.types[name] = self.cur.cursor.lastrowid
|
||||
|
||||
return self.types[name]
|
||||
|
||||
def getSiteId(self, address):
|
||||
if address not in self.sites:
|
||||
self.execute("INSERT INTO site ?", {"address": address})
|
||||
self.sites[address] = self.cur.cursor.lastrowid
|
||||
|
||||
return self.sites[address]
|
||||
|
||||
def loadSites(self):
|
||||
sites = {}
|
||||
for row in self.execute("SELECT * FROM site"):
|
||||
sites[row["address"]] = row["site_id"]
|
||||
return sites
|
||||
|
||||
def loadTypes(self):
|
||||
types = {}
|
||||
for row in self.execute("SELECT * FROM type"):
|
||||
types[row["name"]] = row["type_id"]
|
||||
return types
|
||||
|
||||
def deleteSite(self, address):
|
||||
if address in self.sites:
|
||||
site_id = self.sites[address]
|
||||
del self.sites[address]
|
||||
self.execute("DELETE FROM site WHERE ?", {"site_id": site_id})
|
||||
self.execute("DELETE FROM data WHERE ?", {"site_id": site_id})
|
||||
|
||||
def archive(self):
|
||||
week_back = 1
|
||||
while 1:
|
||||
s = time.time()
|
||||
date_added_from = time.time() - 60 * 60 * 24 * 7 * (week_back + 1)
|
||||
date_added_to = date_added_from + 60 * 60 * 24 * 7
|
||||
res = self.execute("""
|
||||
SELECT
|
||||
MAX(date_added) AS date_added,
|
||||
SUM(value) AS value,
|
||||
GROUP_CONCAT(data_id) AS data_ids,
|
||||
type_id,
|
||||
site_id,
|
||||
COUNT(*) AS num
|
||||
FROM data
|
||||
WHERE
|
||||
site_id IS NULL AND
|
||||
date_added > :date_added_from AND
|
||||
date_added < :date_added_to
|
||||
GROUP BY strftime('%Y-%m-%d %H', date_added, 'unixepoch', 'localtime'), type_id
|
||||
""", {"date_added_from": date_added_from, "date_added_to": date_added_to})
|
||||
|
||||
num_archived = 0
|
||||
cur = self.getCursor()
|
||||
for row in res:
|
||||
print dict(row)
|
||||
if row["num"] == 1:
|
||||
continue
|
||||
cur.execute("INSERT INTO data ?", {
|
||||
"type_id": row["type_id"],
|
||||
"site_id": row["site_id"],
|
||||
"value": row["value"],
|
||||
"date_added": row["date_added"]
|
||||
})
|
||||
cur.execute("DELETE FROM data WHERE data_id IN (%s)" % row["data_ids"])
|
||||
num_archived += row["num"]
|
||||
self.log.debug("Archived %s data from %s weeks ago in %.3fs" % (num_archived, week_back, time.time() - s))
|
||||
week_back += 1
|
||||
time.sleep(0.1)
|
||||
if num_archived == 0:
|
||||
break
|
||||
if week_back > 1:
|
||||
self.execute("VACUUM")
|
60
plugins/Chart/ChartPlugin.py
Normal file
60
plugins/Chart/ChartPlugin.py
Normal file
|
@ -0,0 +1,60 @@
|
|||
import time
|
||||
import itertools
|
||||
|
||||
import gevent
|
||||
|
||||
from Config import config
|
||||
from util import helper
|
||||
from Plugin import PluginManager
|
||||
from ChartDb import ChartDb
|
||||
from ChartCollector import ChartCollector
|
||||
|
||||
if "db" not in locals().keys(): # Share on reloads
|
||||
db = ChartDb()
|
||||
gevent.spawn_later(10 * 60 * 60, db.archive)
|
||||
helper.timer(60 * 60 * 6, db.archive)
|
||||
collector = ChartCollector(db)
|
||||
|
||||
@PluginManager.registerTo("SiteManager")
|
||||
class SiteManagerPlugin(object):
|
||||
def load(self, *args, **kwargs):
|
||||
back = super(SiteManagerPlugin, self).load(*args, **kwargs)
|
||||
collector.setInitialLastValues(self.sites.values())
|
||||
return back
|
||||
|
||||
def delete(self, address, *args, **kwargs):
|
||||
db.deleteSite(address)
|
||||
return super(SiteManagerPlugin, self).delete(address, *args, **kwargs)
|
||||
|
||||
@PluginManager.registerTo("UiWebsocket")
|
||||
class UiWebsocketPlugin(object):
|
||||
def actionChartDbQuery(self, to, query, params=None):
|
||||
if not "ADMIN" in self.permissions:
|
||||
return {"error": "No permission"}
|
||||
|
||||
if config.debug or config.verbose:
|
||||
s = time.time()
|
||||
rows = []
|
||||
try:
|
||||
if not query.strip().upper().startswith("SELECT"):
|
||||
raise Exception("Only SELECT query supported")
|
||||
res = db.execute(query, params)
|
||||
except Exception, err: # Response the error to client
|
||||
self.log.error("ChartDbQuery error: %s" % err)
|
||||
return {"error": str(err)}
|
||||
# Convert result to dict
|
||||
for row in res:
|
||||
rows.append(dict(row))
|
||||
if config.verbose and time.time() - s > 0.1: # Log slow query
|
||||
self.log.debug("Slow query: %s (%.3fs)" % (query, time.time() - s))
|
||||
return rows
|
||||
|
||||
def actionChartGetPeerLocations(self, to):
|
||||
if not "ADMIN" in self.permissions:
|
||||
return {"error": "No permission"}
|
||||
|
||||
peers = {}
|
||||
for site in self.server.sites.values():
|
||||
peers.update(site.peers)
|
||||
peer_locations = self.getPeerLocations(peers)
|
||||
return peer_locations
|
1
plugins/Chart/__init__.py
Normal file
1
plugins/Chart/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
import ChartPlugin
|
Loading…
Reference in a new issue