Redesign the site updating strategy in Site.py, SiteAnnouncer.py, FileServer.py

This commit is contained in:
Vadim Ushakov 2020-10-30 14:36:08 +07:00
parent adf40dbb6b
commit 829fd46781
4 changed files with 285 additions and 129 deletions

View file

@ -257,6 +257,8 @@ class Config(object):
self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit')
self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers')
self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False)
self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip')
self.parser.add_argument('--fileserver_port', help='FileServer bind port (0: randomize)', default=0, type=int, metavar='port')
self.parser.add_argument('--fileserver_port_range', help='FileServer randomization range', default="10000-40000", metavar='port')

View file

@ -10,7 +10,6 @@ from gevent.server import StreamServer
import util
from util import helper
from util import CircularIterator
from Config import config
from .FileRequest import FileRequest
from Peer import PeerPortchecker
@ -19,16 +18,24 @@ from Connection import ConnectionServer
from Plugin import PluginManager
from Debug import Debug
log = logging.getLogger("FileServer")
@PluginManager.acceptPlugins
class FileServer(ConnectionServer):
def __init__(self, ip=config.fileserver_ip, port=config.fileserver_port, ip_type=config.fileserver_ip_type):
self.site_manager = SiteManager.site_manager
self.portchecker = PeerPortchecker.PeerPortchecker(self)
self.log = logging.getLogger("FileServer")
self.ip_type = ip_type
self.ip_external_list = []
# This is wrong:
# self.log = logging.getLogger("FileServer")
# The value of self.log will be overwritten in ConnectionServer.__init__()
self.check_pool = gevent.pool.Pool(5)
self.check_start_time = 0
self.supported_ip_types = ["ipv4"] # Outgoing ip_type support
if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported():
self.supported_ip_types.append("ipv6")
@ -52,17 +59,17 @@ class FileServer(ConnectionServer):
config.arguments.fileserver_port = port
ConnectionServer.__init__(self, ip, port, self.handleRequest)
self.log.debug("Supported IP types: %s" % self.supported_ip_types)
log.debug("Supported IP types: %s" % self.supported_ip_types)
if ip_type == "dual" and ip == "::":
# Also bind to ipv4 addres in dual mode
try:
self.log.debug("Binding proxy to %s:%s" % ("::", self.port))
log.debug("Binding proxy to %s:%s" % ("::", self.port))
self.stream_server_proxy = StreamServer(
("0.0.0.0", self.port), self.handleIncomingConnection, spawn=self.pool, backlog=100
)
except Exception as err:
self.log.info("StreamServer proxy create error: %s" % Debug.formatException(err))
log.info("StreamServer proxy create error: %s" % Debug.formatException(err))
self.port_opened = {}
@ -71,8 +78,17 @@ class FileServer(ConnectionServer):
self.files_parsing = {}
self.ui_server = None
def getSiteAddresses(self):
# Avoid saving the site list on the stack, since a site may be deleted
# from the original list while iterating.
# Use the list of addresses instead.
return [
site.address for site in
sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True)
]
def getRandomPort(self, ip, port_range_from, port_range_to):
self.log.info("Getting random port in range %s-%s..." % (port_range_from, port_range_to))
log.info("Getting random port in range %s-%s..." % (port_range_from, port_range_to))
tried = []
for bind_retry in range(100):
port = random.randint(port_range_from, port_range_to)
@ -84,11 +100,11 @@ class FileServer(ConnectionServer):
sock.bind((ip, port))
success = True
except Exception as err:
self.log.warning("Error binding to port %s: %s" % (port, err))
log.warning("Error binding to port %s: %s" % (port, err))
success = False
sock.close()
if success:
self.log.info("Found unused random port: %s" % port)
log.info("Found unused random port: %s" % port)
return port
else:
time.sleep(0.1)
@ -104,16 +120,16 @@ class FileServer(ConnectionServer):
sock.connect((ipv6_testip, 80))
local_ipv6 = sock.getsockname()[0]
if local_ipv6 == "::1":
self.log.debug("IPv6 not supported, no local IPv6 address")
log.debug("IPv6 not supported, no local IPv6 address")
return False
else:
self.log.debug("IPv6 supported on IP %s" % local_ipv6)
log.debug("IPv6 supported on IP %s" % local_ipv6)
return True
except socket.error as err:
self.log.warning("IPv6 not supported: %s" % err)
log.warning("IPv6 not supported: %s" % err)
return False
except Exception as err:
self.log.error("IPv6 check error: %s" % err)
log.error("IPv6 check error: %s" % err)
return False
def listenProxy(self):
@ -121,20 +137,20 @@ class FileServer(ConnectionServer):
self.stream_server_proxy.serve_forever()
except Exception as err:
if err.errno == 98: # Address already in use error
self.log.debug("StreamServer proxy listen error: %s" % err)
log.debug("StreamServer proxy listen error: %s" % err)
else:
self.log.info("StreamServer proxy listen error: %s" % err)
log.info("StreamServer proxy listen error: %s" % err)
# Handle request to fileserver
def handleRequest(self, connection, message):
if config.verbose:
if "params" in message:
self.log.debug(
log.debug(
"FileRequest: %s %s %s %s" %
(str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path"))
)
else:
self.log.debug("FileRequest: %s %s" % (str(connection), message["cmd"]))
log.debug("FileRequest: %s %s" % (str(connection), message["cmd"]))
req = FileRequest(self, connection)
req.route(message["cmd"], message.get("req_id"), message.get("params"))
if not self.has_internet and not connection.is_private_ip:
@ -142,7 +158,7 @@ class FileServer(ConnectionServer):
self.onInternetOnline()
def onInternetOnline(self):
self.log.info("Internet online")
log.info("Internet online")
gevent.spawn(self.checkSites, check_files=False, force_port_check=True)
# Reload the FileRequest class to prevent restarts in debug mode
@ -153,7 +169,7 @@ class FileServer(ConnectionServer):
def portCheck(self):
if config.offline:
self.log.info("Offline mode: port check disabled")
log.info("Offline mode: port check disabled")
res = {"ipv4": None, "ipv6": None}
self.port_opened = res
return res
@ -169,7 +185,7 @@ class FileServer(ConnectionServer):
}
self.ip_external_list = config.ip_external
self.port_opened.update(res)
self.log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"]))
log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"]))
return res
self.port_opened = {}
@ -191,7 +207,7 @@ class FileServer(ConnectionServer):
else:
res_ipv6 = res_ipv6_thread.get()
if res_ipv6["opened"] and not helper.getIpType(res_ipv6["ip"]) == "ipv6":
self.log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"])
log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"])
res_ipv6["opened"] = False
self.ip_external_list = []
@ -200,7 +216,7 @@ class FileServer(ConnectionServer):
self.ip_external_list.append(res_ip["ip"])
SiteManager.peer_blacklist.append((res_ip["ip"], self.port))
self.log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"]))
log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"]))
res = {"ipv4": res_ipv4["opened"], "ipv6": res_ipv6["opened"]}
@ -213,7 +229,7 @@ class FileServer(ConnectionServer):
self.ip_external_list.append(ip)
res[helper.getIpType(ip)] = True # We have opened port if we have external ip
SiteManager.peer_blacklist.append((ip, self.port))
self.log.debug("External ip found on interfaces: %s" % ip)
log.debug("External ip found on interfaces: %s" % ip)
self.port_opened.update(res)
@ -224,79 +240,123 @@ class FileServer(ConnectionServer):
# Check site file integrity
def checkSite(self, site, check_files=False):
if site.isServing():
site.announce(mode="startup") # Announce site to tracker
site.update(check_files=check_files) # Update site's content.json and download changed files
site.sendMyHashfield()
site.updateHashfield()
if not site.isServing():
return
quick_start = len(site.peers) >= 50
if quick_start:
log.debug("Checking site: %s (quick start)" % (site.address))
else:
log.debug("Checking site: %s" % (site.address))
if quick_start:
site.setDelayedStartupAnnounce()
else:
site.announce(mode="startup")
site.update(check_files=check_files) # Update site's content.json and download changed files
site.sendMyHashfield()
site.updateHashfield()
# Check sites integrity
@util.Noparallel()
def checkSites(self, check_files=False, force_port_check=False):
self.log.debug("Checking sites...")
s = time.time()
sites_checking = False
if not self.port_opened or force_port_check: # Test and open port if not tested yet
if len(self.sites) <= 2: # Don't wait port opening on first startup
sites_checking = True
for address, site in list(self.sites.items()):
gevent.spawn(self.checkSite, site, check_files)
log.info("Checking sites: check_files=%s, force_port_check=%s", check_files, force_port_check)
# Don't wait port opening on first startup. Do the instant check now.
if len(self.sites) <= 2:
sites_checking = True
for address, site in list(self.sites.items()):
gevent.spawn(self.checkSite, site, check_files)
# Test and open port if not tested yet
if not self.port_opened or force_port_check:
self.portCheck()
if not self.port_opened["ipv4"]:
self.tor_manager.startOnions()
if not sites_checking:
check_pool = gevent.pool.Pool(5)
# Check sites integrity
for site in sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True):
if not site.isServing():
continue
check_thread = check_pool.spawn(self.checkSite, site, check_files) # Check in new thread
time.sleep(2)
if site.settings.get("modified", 0) < time.time() - 60 * 60 * 24: # Not so active site, wait some sec to finish
check_thread.join(timeout=5)
self.log.debug("Checksites done in %.3fs" % (time.time() - s))
site_addresses = self.getSiteAddresses()
sites_processed = 0
start_time = time.time()
self.check_start_time = start_time
progress_print_time = time.time()
# Check sites integrity
for site_address in site_addresses:
site = self.sites.get(site_address, None)
sites_processed += 1
if (not site) or (not site.isServing()):
continue
check_thread = self.check_pool.spawn(self.checkSite, site, check_files) # Check in new thread
if time.time() - progress_print_time > 60:
progress_print_time = time.time()
time_spent = time.time() - start_time
time_per_site = time_spent / float(sites_processed)
sites_left = len(site_addresses) - sites_processed
time_left = time_per_site * sites_left
log.info("Checking sites: DONE: %d sites in %.2fs (%.2fs per site); LEFT: %d sites in %.2fs",
sites_processed,
time_spent,
time_per_site,
sites_left,
time_left
)
if (self.check_start_time != start_time) and self.check_pool.full():
# Another check is running, throttling...
time.sleep(5)
else:
time.sleep(1)
log.info("Checking sites: finished in %.3fs" % (time.time() - start_time))
def sitesMaintenanceThread(self):
import gc
startup = True
short_timeout = 2
long_timeout = 60 * 5
circular_iterator = CircularIterator()
long_timeout = 60 * 2
while 1:
if circular_iterator.isWrapped():
time.sleep(long_timeout)
circular_iterator.resetSuccessiveCount()
gc.collect() # Explicit garbage collection
time.sleep(long_timeout)
gc.collect() # Explicit garbage collection
self.log.debug(
"Running site cleanup, connections: %s, internet: %s" %
(len(self.connections), self.has_internet)
log.debug(
"Starting maintenance cycle: connections=%s, internet=%s",
len(self.connections), self.has_internet
)
start_time = time.time()
site_addresses = self.getSiteAddresses()
sites_processed = 0
for site_address in site_addresses:
site = self.sites.get(site_address, None)
if (not site) or (not site.isServing()):
continue
log.debug("Running maintenance for site: %s", site.address)
done = site.runPeriodicMaintenance(startup=startup)
site = None
if done:
sites_processed += 1
time.sleep(short_timeout)
log.debug("Maintenance cycle finished in %.3fs. Total sites: %d. Processed sites: %d",
time.time() - start_time,
len(site_addresses),
sites_processed
)
site = circular_iterator.next(list(self.sites.values()))
if site:
done = site.runPeriodicMaintenance(startup=startup)
if done:
time.sleep(short_timeout)
site = None
if circular_iterator.isWrapped():
startup = False
def announceSite(self, site):
site.announce(mode="update", pex=False)
active_site = time.time() - site.settings.get("modified", 0) < 24 * 60 * 60
if site.settings["own"] or active_site:
# Check connections more frequently on own and active sites to speed-up first connections
site.needConnections(check_site_on_reconnect=True)
site.sendMyHashfield(3)
site.updateHashfield(3)
site_addresses = None
startup = False
# Periodic reloading of tracker files
def reloadTrackerFilesThread(self):
@ -309,24 +369,6 @@ class FileServer(ConnectionServer):
time.sleep(interval)
config.loadTrackersFile()
# Announce sites every 20 min
def announceSites(self):
time.sleep(5 * 60) # Sites already announced on startup
while 1:
s = time.time()
for address, site in list(self.sites.items()):
if not site.isServing():
continue
gevent.spawn(self.announceSite, site).join(timeout=10)
time.sleep(1)
taken = time.time() - s
# Query all trackers one-by-one in 20 minutes evenly distributed
sleep = max(0, 60 * 20 / len(config.trackers) - taken)
self.log.debug("Site announce tracker done in %.3fs, sleeping for %.3fs..." % (taken, sleep))
time.sleep(sleep)
# Detects if computer back from wakeup
def wakeupWatcher(self):
last_time = time.time()
@ -336,7 +378,7 @@ class FileServer(ConnectionServer):
is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3
if is_time_changed:
# If taken more than 3 minute then the computer was in sleep mode
self.log.info(
log.info(
"Wakeup detected: time warp from %0.f to %0.f (%0.f sleep seconds), acting like startup..." %
(last_time, time.time(), time.time() - last_time)
)
@ -344,7 +386,7 @@ class FileServer(ConnectionServer):
my_ips = socket.gethostbyname_ex('')[2]
is_ip_changed = my_ips != last_my_ips
if is_ip_changed:
self.log.info("IP change detected from %s to %s" % (last_my_ips, my_ips))
log.info("IP change detected from %s to %s" % (last_my_ips, my_ips))
if is_time_changed or is_ip_changed:
self.checkSites(check_files=False, force_port_check=True)
@ -362,9 +404,9 @@ class FileServer(ConnectionServer):
try:
self.stream_server.start()
except Exception as err:
self.log.error("Error listening on: %s:%s: %s" % (self.ip, self.port, err))
log.error("Error listening on: %s:%s: %s" % (self.ip, self.port, err))
if "ui_server" in dir(sys.modules["main"]):
self.log.debug("Stopping UI Server.")
log.debug("Stopping UI Server.")
sys.modules["main"].ui_server.stop()
return False
@ -378,21 +420,20 @@ class FileServer(ConnectionServer):
gevent.spawn(self.checkSites)
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
thread_announce_sites = gevent.spawn(self.announceSites)
thread_sites_maintenance = gevent.spawn(self.sitesMaintenanceThread)
thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
ConnectionServer.listen(self)
self.log.debug("Stopped.")
log.debug("Stopped.")
def stop(self):
if self.running and self.portchecker.upnp_port_opened:
self.log.debug('Closing port %d' % self.port)
log.debug('Closing port %d' % self.port)
try:
self.portchecker.portClose(self.port)
self.log.info('Closed port via upnp.')
log.info('Closed port via upnp.')
except Exception as err:
self.log.info("Failed at attempt to use upnp to close port: %s" % err)
log.info("Failed at attempt to use upnp to close port: %s" % err)
return ConnectionServer.stop(self)

View file

@ -28,6 +28,38 @@ from File import FileServer
from .SiteAnnouncer import SiteAnnouncer
from . import SiteManager
class ScaledTimeoutHandler:
def __init__(self, val_min, val_max, handler=None, scaler=None):
self.val_min = val_min
self.val_max = val_max
self.timestamp = 0
self.handler = handler
self.scaler = scaler
self.log = logging.getLogger("ScaledTimeoutHandler")
def isExpired(self, scale):
interval = scale * (self.val_max - self.val_min) + self.val_min
expired_at = self.timestamp + interval
now = time.time()
expired = (now > expired_at)
if expired:
self.log.debug(
"Expired: [%d..%d]: scale=%f, interval=%f",
self.val_min, self.val_max, scale, interval)
return expired
def done(self):
self.timestamp = time.time()
def run(self, *args, **kwargs):
do_run = kwargs["force"] or self.isExpired(self.scaler())
if do_run:
result = self.handler(*args, **kwargs)
if result:
self.done()
return result
else:
return None
@PluginManager.acceptPlugins
class Site(object):
@ -40,8 +72,16 @@ class Site(object):
self.log = logging.getLogger("Site:%s" % self.address_short)
self.addEventListeners()
self.periodic_maintenance_interval = 60 * 20
self.periodic_maintenance_timestamp = 0
self.periodic_maintenance_handlers = [
ScaledTimeoutHandler(60 * 30, 60 * 2,
handler=self.periodicMaintenanceHandler_announce,
scaler=self.getAnnounceRating),
ScaledTimeoutHandler(60 * 20, 60 * 10,
handler=self.periodicMaintenanceHandler_general,
scaler=self.getActivityRating)
]
self.delayed_startup_announce = False
self.content = None # Load content.json
self.peers = {} # Key: ip:port, Value: Peer.Peer
@ -852,10 +892,63 @@ class Site(object):
peer.found(source)
return peer
def setDelayedStartupAnnounce(self):
self.delayed_startup_announce = True
def applyDelayedStartupAnnounce(self):
if self.delayed_startup_announce:
self.delayed_startup_announce = False
self.announce(mode="startup")
return True
return False
def announce(self, *args, **kwargs):
if self.isServing():
self.announcer.announce(*args, **kwargs)
def getActivityRating(self, force_safe=False):
age = time.time() - self.settings.get("modified", 0)
if age < 60 * 60:
rating = 1.0
elif age < 60 * 60 * 5:
rating = 0.8
elif age < 60 * 60 * 24:
rating = 0.6
elif age < 60 * 60 * 24 * 3:
rating = 0.4
elif age < 60 * 60 * 24 * 7:
rating = 0.2
else:
rating = 0.0
force_safe = force_safe or config.expose_no_ownership
if (not force_safe) and self.settings["own"]:
rating = min(rating, 0.6)
return rating
def getAnnounceRating(self):
# rare frequent
# announces announces
# 0 ------------------- 1
# activity -------------> -- active site ==> frequent announces
# <---------------- peers -- many peers ==> rare announces
# trackers -------------> -- many trackers ==> frequent announces to iterate over more trackers
activity_rating = self.getActivityRating(force_safe=True)
peer_count = len(self.peers)
peer_rating = 1.0 - min(peer_count, 50) / 50.0
tracker_count = self.announcer.getSupportedTrackerCount()
tracker_count = max(tracker_count, 1)
tracker_rating = 1.0 - (1.0 / tracker_count)
v = [activity_rating, peer_rating, tracker_rating]
return sum(v) / float(len(v))
# The engine tries to maintain the number of active connections:
# >= getPreferableActiveConnectionCount()
# and
@ -866,18 +959,7 @@ class Site(object):
return 0
age = time.time() - self.settings.get("modified", 0)
count = 0
if age < 60 * 60:
count = 10
elif age < 60 * 60 * 5:
count = 8
elif age < 60 * 60 * 24:
count = 6
elif age < 60 * 60 * 24 * 3:
count = 4
elif age < 60 * 60 * 24 * 7:
count = 2
count = int(10 * self.getActivityRating(force_safe=True))
if len(self.peers) < 50:
count = max(count, 5)
@ -957,7 +1039,7 @@ class Site(object):
return connected
def markConnectedPeersProtected(self):
for peer in site.getConnectedPeers():
for peer in self.getConnectedPeers():
peer.markProtected()
# Return: Probably peers verified to be connectable recently
@ -1099,32 +1181,54 @@ class Site(object):
if not self.isServing():
return False
scheduled_time = self.periodic_maintenance_timestamp + self.periodic_maintenance_interval
self.log.debug("runPeriodicMaintenance: startup=%s, force=%s" % (startup, force))
if time.time() < scheduled_time and not force:
result = False
for handler in self.periodic_maintenance_handlers:
result = result | bool(handler.run(startup=startup, force=force))
return result
def periodicMaintenanceHandler_general(self, startup=False, force=False):
if not self.isServing():
return False
self.periodic_maintenance_timestamp = time.time()
self.applyDelayedStartupAnnounce()
self.log.debug("runPeriodicMaintenance: startup=%s" % startup)
if not self.peers:
return False
self.log.debug("periodicMaintenanceHandler_general: startup=%s, force=%s" % (startup, force))
if not startup:
self.cleanupPeers()
if self.peers:
with gevent.Timeout(10, exception=False):
self.announcer.announcePex()
self.needConnections(check_site_on_reconnect=True)
# Last check modification failed
if self.content_updated is False:
with gevent.Timeout(10, exception=False):
self.announcer.announcePex()
self.sendMyHashfield(3)
self.updateHashfield(3)
if self.content_updated is False: # Last check modification failed
self.update()
elif self.bad_files:
self.retryBadFiles()
self.needConnections(check_site_on_reconnect=True)
return True
self.periodic_maintenance_timestamp = time.time()
def periodicMaintenanceHandler_announce(self, startup=False, force=False):
if not self.isServing():
return False
self.log.debug("periodicMaintenanceHandler_announce: startup=%s, force=%s" % (startup, force))
if self.applyDelayedStartupAnnounce():
return True
self.announce(mode="update", pex=False)
return True
# Send hashfield to peers

View file

@ -33,6 +33,7 @@ class SiteAnnouncer(object):
self.peer_id = self.site.connection_server.peer_id
self.tracker_circular_iterator = CircularIterator()
self.time_last_announce = 0
self.supported_tracker_count = 0
def getTrackers(self):
return config.trackers
@ -50,6 +51,12 @@ class SiteAnnouncer(object):
return trackers
# Returns a cached value of len(self.getSupportedTrackers()), which can be
# inacurate.
# To be used from Site for estimating available tracker count.
def getSupportedTrackerCount(self):
return self.supported_tracker_count
def shouldTrackerBeTemporarilyIgnored(self, tracker, mode, force):
if not tracker:
return True
@ -71,6 +78,8 @@ class SiteAnnouncer(object):
def getAnnouncingTrackers(self, mode, force):
trackers = self.getSupportedTrackers()
self.supported_tracker_count = len(trackers)
if trackers and (mode == "update" or mode == "more"):
# Choose just 2 trackers to announce to
@ -116,7 +125,7 @@ class SiteAnnouncer(object):
back.append("onion")
return back
@util.Noparallel(blocking=False)
@util.Noparallel()
def announce(self, force=False, mode="start", pex=True):
if time.time() - self.time_last_announce < 30 and not force:
return # No reannouncing within 30 secs