180 lines
7.4 KiB
Python
180 lines
7.4 KiB
Python
import time
|
|
import sys
|
|
import collections
|
|
import itertools
|
|
import logging
|
|
|
|
import gevent
|
|
from util import helper
|
|
from Config import config
|
|
|
|
|
|
class ChartCollector(object):
|
|
def __init__(self, db):
|
|
self.db = db
|
|
if config.action == "main":
|
|
gevent.spawn_later(60 * 3, self.collector)
|
|
self.log = logging.getLogger("ChartCollector")
|
|
self.last_values = collections.defaultdict(dict)
|
|
|
|
def setInitialLastValues(self, sites):
|
|
# Recover last value of site bytes/sent
|
|
for site in sites:
|
|
self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0)
|
|
self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0)
|
|
|
|
def getCollectors(self):
|
|
collectors = {}
|
|
file_server = sys.modules["main"].file_server
|
|
sites = file_server.sites
|
|
content_db = sites.values()[0].content_manager.contents.db
|
|
|
|
# Connection stats
|
|
collectors["connection"] = lambda: len(file_server.connections)
|
|
collectors["connection_in"] = (
|
|
lambda: len([1 for connection in file_server.connections if connection.type == "in"])
|
|
)
|
|
collectors["connection_onion"] = (
|
|
lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")])
|
|
)
|
|
collectors["connection_ping_avg"] = (
|
|
lambda: round(1000 * helper.avg(
|
|
[connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
|
|
))
|
|
)
|
|
collectors["connection_ping_min"] = (
|
|
lambda: round(1000 * min(
|
|
[connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
|
|
))
|
|
)
|
|
collectors["connection_rev_avg"] = (
|
|
lambda: helper.avg(
|
|
[connection.handshake["rev"] for connection in file_server.connections if connection.handshake]
|
|
)
|
|
)
|
|
|
|
# Request stats
|
|
collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv
|
|
collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent
|
|
collectors["request_num_recv|change"] = lambda: file_server.num_recv
|
|
collectors["request_num_sent|change"] = lambda: file_server.num_sent
|
|
|
|
# Limit
|
|
collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes()
|
|
collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes()
|
|
collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()])
|
|
|
|
# Peers
|
|
collectors["peer"] = lambda (peers): len(peers)
|
|
collectors["peer_onion"] = lambda (peers): len([True for peer in peers if ".onion" in peer])
|
|
|
|
# Size
|
|
collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()])
|
|
collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()])
|
|
collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()])
|
|
|
|
return collectors
|
|
|
|
def getSiteCollectors(self):
|
|
site_collectors = {}
|
|
|
|
# Size
|
|
site_collectors["site_size"] = lambda(site): site.settings.get("size", 0)
|
|
site_collectors["site_size_optional"] = lambda(site): site.settings.get("size_optional", 0)
|
|
site_collectors["site_optional_downloaded"] = lambda(site): site.settings.get("optional_downloaded", 0)
|
|
site_collectors["site_content"] = lambda(site): len(site.content_manager.contents)
|
|
|
|
# Data transfer
|
|
site_collectors["site_bytes_recv|change"] = lambda(site): site.settings.get("bytes_recv", 0)
|
|
site_collectors["site_bytes_sent|change"] = lambda(site): site.settings.get("bytes_sent", 0)
|
|
|
|
# Peers
|
|
site_collectors["site_peer"] = lambda(site): len(site.peers)
|
|
site_collectors["site_peer_onion"] = lambda(site): len(
|
|
[True for peer in site.peers.itervalues() if peer.ip.endswith(".onion")]
|
|
)
|
|
site_collectors["site_peer_connected"] = lambda(site): len([True for peer in site.peers.itervalues() if peer.connection])
|
|
|
|
return site_collectors
|
|
|
|
def getUniquePeers(self):
|
|
sites = sys.modules["main"].file_server.sites
|
|
return set(itertools.chain.from_iterable(
|
|
[site.peers.keys() for site in sites.values()]
|
|
))
|
|
|
|
def collectDatas(self, collectors, last_values, site=None):
|
|
if site is None:
|
|
peers = self.getUniquePeers()
|
|
datas = {}
|
|
for key, collector in collectors.iteritems():
|
|
try:
|
|
if site:
|
|
value = collector(site)
|
|
elif key.startswith("peer"):
|
|
value = collector(peers)
|
|
else:
|
|
value = collector()
|
|
except Exception as err:
|
|
self.log.info("Collector %s error: %s" % (key, err))
|
|
value = None
|
|
|
|
if "|change" in key: # Store changes relative to last value
|
|
key = key.replace("|change", "")
|
|
last_value = last_values.get(key, 0)
|
|
last_values[key] = value
|
|
value = value - last_value
|
|
|
|
if value is None:
|
|
datas[key] = None
|
|
else:
|
|
datas[key] = round(value, 3)
|
|
return datas
|
|
|
|
def collectGlobal(self, collectors, last_values):
|
|
now = int(time.time())
|
|
s = time.time()
|
|
datas = self.collectDatas(collectors, last_values["global"])
|
|
values = []
|
|
for key, value in datas.iteritems():
|
|
values.append((self.db.getTypeId(key), value, now))
|
|
self.log.debug("Global collectors done in %.3fs" % (time.time() - s))
|
|
|
|
s = time.time()
|
|
cur = self.db.getCursor()
|
|
cur.execute("BEGIN")
|
|
cur.cursor.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values)
|
|
cur.execute("END")
|
|
cur.close()
|
|
self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s))
|
|
|
|
def collectSites(self, sites, collectors, last_values):
|
|
now = int(time.time())
|
|
s = time.time()
|
|
values = []
|
|
for address, site in sites.iteritems():
|
|
site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site)
|
|
for key, value in site_datas.iteritems():
|
|
values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now))
|
|
time.sleep(0.000001)
|
|
self.log.debug("Site collections done in %.3fs" % (time.time() - s))
|
|
|
|
s = time.time()
|
|
cur = self.db.getCursor()
|
|
cur.execute("BEGIN")
|
|
cur.cursor.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values)
|
|
cur.execute("END")
|
|
cur.close()
|
|
self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s))
|
|
|
|
def collector(self):
|
|
collectors = self.getCollectors()
|
|
site_collectors = self.getSiteCollectors()
|
|
sites = sys.modules["main"].file_server.sites
|
|
i = 0
|
|
while 1:
|
|
self.collectGlobal(collectors, self.last_values)
|
|
if i % 12 == 0: # Only collect sites data every hour
|
|
self.collectSites(sites, site_collectors, self.last_values)
|
|
time.sleep(60 * 5)
|
|
i += 1
|