TrackerZero: copy the Bootstrapper code to a new plugin TrackerZero

This commit is contained in:
Vadim Ushakov 2019-07-12 01:48:52 +07:00
parent 735061b79d
commit b7550474a5
3 changed files with 312 additions and 0 deletions

View file

@ -0,0 +1,156 @@
import time
import re
import gevent
from Config import config
from Db import Db
from util import helper
class TrackerZeroDb(Db.Db):
def __init__(self):
self.version = 7
self.hash_ids = {} # hash -> id cache
super(TrackerZeroDb, self).__init__({"db_name": "TrackerZero"}, "%s/tracker-zero.db" % config.data_dir)
self.foreign_keys = True
self.checkTables()
self.updateHashCache()
gevent.spawn(self.cleanup)
def cleanup(self):
while 1:
time.sleep(4 * 60)
timeout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 60 * 40))
self.execute("DELETE FROM peer WHERE date_announced < ?", [timeout])
def updateHashCache(self):
res = self.execute("SELECT * FROM hash")
self.hash_ids = {str(row["hash"]): row["hash_id"] for row in res}
self.log.debug("Loaded %s hash_ids" % len(self.hash_ids))
def checkTables(self):
version = int(self.execute("PRAGMA user_version").fetchone()[0])
self.log.debug("Db version: %s, needed: %s" % (version, self.version))
if version < self.version:
self.createTables()
else:
self.execute("VACUUM")
def createTables(self):
# Delete all tables
self.execute("PRAGMA writable_schema = 1")
self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')")
self.execute("PRAGMA writable_schema = 0")
self.execute("VACUUM")
self.execute("PRAGMA INTEGRITY_CHECK")
# Create new tables
self.execute("""
CREATE TABLE peer (
peer_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE,
type TEXT,
address TEXT,
port INTEGER NOT NULL,
date_added DATETIME DEFAULT (CURRENT_TIMESTAMP),
date_announced DATETIME DEFAULT (CURRENT_TIMESTAMP)
);
""")
self.execute("CREATE UNIQUE INDEX peer_key ON peer (address, port);")
self.execute("""
CREATE TABLE peer_to_hash (
peer_to_hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
peer_id INTEGER REFERENCES peer (peer_id) ON DELETE CASCADE,
hash_id INTEGER REFERENCES hash (hash_id)
);
""")
self.execute("CREATE INDEX peer_id ON peer_to_hash (peer_id);")
self.execute("CREATE INDEX hash_id ON peer_to_hash (hash_id);")
self.execute("""
CREATE TABLE hash (
hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
hash BLOB UNIQUE NOT NULL,
date_added DATETIME DEFAULT (CURRENT_TIMESTAMP)
);
""")
self.execute("PRAGMA user_version = %s" % self.version)
def getHashId(self, hash):
if hash not in self.hash_ids:
self.log.debug("New hash: %s" % repr(hash))
self.execute("INSERT OR IGNORE INTO hash ?", {"hash": hash})
self.hash_ids[hash] = self.cur.cursor.lastrowid
return self.hash_ids[hash]
def peerAnnounce(self, ip_type, address, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False):
hashes_ids_announced = []
for hash in hashes:
hashes_ids_announced.append(self.getHashId(hash))
# Check user
res = self.execute("SELECT peer_id FROM peer WHERE ? LIMIT 1", {"address": address, "port": port})
user_row = res.fetchone()
now = time.strftime("%Y-%m-%d %H:%M:%S")
if user_row:
peer_id = user_row["peer_id"]
self.execute("UPDATE peer SET date_announced = ? WHERE peer_id = ?", (now, peer_id))
else:
self.log.debug("New peer: %s signed: %s" % (address, onion_signed))
if ip_type == "onion" and not onion_signed:
return len(hashes)
self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now})
peer_id = self.cur.cursor.lastrowid
# Check user's hashes
res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id})
hash_ids_db = [row["hash_id"] for row in res]
if hash_ids_db != hashes_ids_announced:
hash_ids_added = set(hashes_ids_announced) - set(hash_ids_db)
hash_ids_removed = set(hash_ids_db) - set(hashes_ids_announced)
if ip_type != "onion" or onion_signed:
for hash_id in hash_ids_added:
self.execute("INSERT INTO peer_to_hash ?", {"peer_id": peer_id, "hash_id": hash_id})
if hash_ids_removed and delete_missing_hashes:
self.execute("DELETE FROM peer_to_hash WHERE ?", {"peer_id": peer_id, "hash_id": list(hash_ids_removed)})
return len(hash_ids_added) + len(hash_ids_removed)
else:
return 0
def peerList(self, hash, address=None, onions=[], port=None, limit=30, need_types=["ipv4", "onion"], order=True):
back = {"ipv4": [], "ipv6": [], "onion": []}
if limit == 0:
return back
hashid = self.getHashId(hash)
if order:
order_sql = "ORDER BY date_announced DESC"
else:
order_sql = ""
where_sql = "hash_id = :hashid"
if onions:
onions_escaped = ["'%s'" % re.sub("[^a-z0-9,]", "", onion) for onion in onions if type(onion) is str]
where_sql += " AND address NOT IN (%s)" % ",".join(onions_escaped)
elif address:
where_sql += " AND NOT (address = :address AND port = :port)"
query = """
SELECT type, address, port
FROM peer_to_hash
LEFT JOIN peer USING (peer_id)
WHERE %s
%s
LIMIT :limit
""" % (where_sql, order_sql)
res = self.execute(query, {"hashid": hashid, "address": address, "port": port, "limit": limit})
for row in res:
if row["type"] in need_types:
if row["type"] == "onion":
packed = helper.packOnionAddress(row["address"], row["port"])
else:
packed = helper.packAddress(str(row["address"]), row["port"])
back[row["type"]].append(packed)
return back

View file

@ -0,0 +1,155 @@
import time
from util import helper
from Plugin import PluginManager
from .TrackerZeroDb import TrackerZeroDb
from Crypt import CryptRsa
from Config import config
if "db" not in locals().keys(): # Share during reloads
db = TrackerZeroDb()
@PluginManager.registerTo("FileRequest")
class FileRequestPlugin(object):
def checkOnionSigns(self, onions, onion_signs, onion_sign_this):
if not onion_signs or len(onion_signs) != len(set(onions)):
return False
if time.time() - float(onion_sign_this) > 3 * 60:
return False # Signed out of allowed 3 minutes
onions_signed = []
# Check onion signs
for onion_publickey, onion_sign in onion_signs.items():
if CryptRsa.verify(onion_sign_this.encode(), onion_publickey, onion_sign):
onions_signed.append(CryptRsa.publickeyToOnion(onion_publickey))
else:
break
# Check if the same onion addresses signed as the announced onces
if sorted(onions_signed) == sorted(set(onions)):
return True
else:
return False
def actionAnnounce(self, params):
time_started = time.time()
s = time.time()
# Backward compatibility
if "ip4" in params["add"]:
params["add"].append("ipv4")
if "ip4" in params["need_types"]:
params["need_types"].append("ipv4")
hashes = params["hashes"]
all_onions_signed = self.checkOnionSigns(params.get("onions", []), params.get("onion_signs"), params.get("onion_sign_this"))
time_onion_check = time.time() - s
ip_type = helper.getIpType(self.connection.ip)
if ip_type == "onion" or self.connection.ip in config.ip_local:
is_port_open = False
elif ip_type in params["add"]:
is_port_open = True
else:
is_port_open = False
s = time.time()
# Separatley add onions to sites or at once if no onions present
i = 0
onion_to_hash = {}
for onion in params.get("onions", []):
if onion not in onion_to_hash:
onion_to_hash[onion] = []
onion_to_hash[onion].append(hashes[i])
i += 1
hashes_changed = 0
for onion, onion_hashes in onion_to_hash.items():
hashes_changed += db.peerAnnounce(
ip_type="onion",
address=onion,
port=params["port"],
hashes=onion_hashes,
onion_signed=all_onions_signed
)
time_db_onion = time.time() - s
s = time.time()
if is_port_open:
hashes_changed += db.peerAnnounce(
ip_type=ip_type,
address=self.connection.ip,
port=params["port"],
hashes=hashes,
delete_missing_hashes=params.get("delete")
)
time_db_ip = time.time() - s
s = time.time()
# Query sites
back = {}
peers = []
if params.get("onions") and not all_onions_signed and hashes_changed:
back["onion_sign_this"] = "%.0f" % time.time() # Send back nonce for signing
if len(hashes) > 500 or not hashes_changed:
limit = 5
order = False
else:
limit = 30
order = True
for hash in hashes:
if time.time() - time_started > 1: # 1 sec limit on request
self.connection.log("Announce time limit exceeded after %s/%s sites" % (len(peers), len(hashes)))
break
hash_peers = db.peerList(
hash,
address=self.connection.ip, onions=list(onion_to_hash.keys()), port=params["port"],
limit=min(limit, params["need_num"]), need_types=params["need_types"], order=order
)
if "ip4" in params["need_types"]: # Backward compatibility
hash_peers["ip4"] = hash_peers["ipv4"]
del(hash_peers["ipv4"])
peers.append(hash_peers)
time_peerlist = time.time() - s
back["peers"] = peers
self.connection.log(
"Announce %s sites (onions: %s, onion_check: %.3fs, db_onion: %.3fs, db_ip: %.3fs, peerlist: %.3fs, limit: %s)" %
(len(hashes), len(onion_to_hash), time_onion_check, time_db_onion, time_db_ip, time_peerlist, limit)
)
self.response(back)
@PluginManager.registerTo("UiRequest")
class UiRequestPlugin(object):
def actionStatsTrackerZero(self):
self.sendHeader()
# Style
yield """
<style>
* { font-family: monospace; white-space: pre }
table td, table th { text-align: right; padding: 0px 10px }
</style>
"""
hash_rows = db.execute("SELECT * FROM hash").fetchall()
for hash_row in hash_rows:
peer_rows = db.execute(
"SELECT * FROM peer LEFT JOIN peer_to_hash USING (peer_id) WHERE hash_id = :hash_id",
{"hash_id": hash_row["hash_id"]}
).fetchall()
yield "<br>%s (added: %s, peers: %s)<br>" % (
str(hash_row["hash"]).encode("hex"), hash_row["date_added"], len(peer_rows)
)
for peer_row in peer_rows:
yield " - {ip4: <30} {onion: <30} added: {date_added}, announced: {date_announced}<br>".format(**dict(peer_row))

View file

@ -0,0 +1 @@
from . import TrackerZeroPlugin