Refactor ConnectionServer, FileServer; fix bugs introduced in previous commits

This commit is contained in:
Vadim Ushakov 2020-11-03 21:21:33 +07:00
parent ba16fdcae9
commit c84b413f58
4 changed files with 222 additions and 87 deletions

View file

@ -8,6 +8,7 @@ import gevent
import msgpack
from gevent.server import StreamServer
from gevent.pool import Pool
import gevent.event
import util
from util import helper
@ -35,6 +36,8 @@ class ConnectionServer(object):
self.port_opened = {}
self.peer_blacklist = SiteManager.peer_blacklist
self.managed_pools = {}
self.tor_manager = TorManager(self.ip, self.port)
self.connections = [] # Connections
self.whitelist = config.ip_local # No flood protection on this ips
@ -53,8 +56,12 @@ class ConnectionServer(object):
self.stream_server_proxy = None
self.running = False
self.stopping = False
self.stopping_event = gevent.event.Event()
self.thread_checker = None
self.thread_pool = Pool(None)
self.managed_pools["thread"] = self.thread_pool
self.stat_recv = defaultdict(lambda: defaultdict(int))
self.stat_sent = defaultdict(lambda: defaultdict(int))
self.bytes_recv = 0
@ -70,6 +77,7 @@ class ConnectionServer(object):
self.timecorrection = 0.0
self.pool = Pool(500) # do not accept more than 500 connections
self.managed_pools["incoming"] = self.pool
# Bittorrent style peerid
self.peer_id = "-UT3530-%s" % CryptHash.random(12, "base64")
@ -90,7 +98,7 @@ class ConnectionServer(object):
return False
self.running = True
if check_connections:
self.thread_checker = gevent.spawn(self.checkConnections)
self.thread_checker = self.spawn(self.checkConnections)
CryptConnection.manager.loadCerts()
if config.tor != "disable":
self.tor_manager.start()
@ -114,7 +122,7 @@ class ConnectionServer(object):
return None
if self.stream_server_proxy:
gevent.spawn(self.listenProxy)
self.spawn(self.listenProxy)
try:
self.stream_server.serve_forever()
except Exception as err:
@ -126,18 +134,65 @@ class ConnectionServer(object):
self.log.debug("Stopping %s" % self.stream_server)
self.stopping = True
self.running = False
self.stopping_event.set()
self.onStop()
def onStop(self):
prev_sizes = {}
for i in range(60):
sizes = {}
total_size = 0
for name, pool in self.managed_pools.items():
pool.join(timeout=1)
size = len(pool)
sizes[name] = size
total_size += size
if total_size == 0:
break
if prev_sizes != sizes:
s = ""
for name, size in sizes.items():
s += "%s pool: %s, " % (name, size)
s += "total: %s" % total_size
self.log.info("Waiting for tasks in managed pools to stop: %s", s)
prev_sizes = sizes
for name, pool in self.managed_pools.items():
size = len(pool)
if size:
self.log.info("Killing %s tasks in %s pool", size, name)
pool.kill()
if self.thread_checker:
gevent.kill(self.thread_checker)
self.thread_checker = None
if self.stream_server:
self.stream_server.stop()
# Sleeps the specified amount of time or until ConnectionServer is stopped
def sleep(self, t):
if t:
self.stopping_event.wait(timeout=t)
else:
time.sleep(t)
# Spawns a thread that will be waited for on server being stooped (and killed after a timeout)
def spawn(self, *args, **kwargs):
thread = self.thread_pool.spawn(*args, **kwargs)
return thread
def closeConnections(self):
self.log.debug("Closing all connection: %s" % len(self.connections))
for connection in self.connections[:]:
connection.close("Close all connections")
def handleIncomingConnection(self, sock, addr):
if config.offline:
if self.allowsAcceptingConnections():
sock.close()
return False
@ -155,7 +210,7 @@ class ConnectionServer(object):
self.ip_incoming[ip] += 1
if self.ip_incoming[ip] > 6: # Allow 6 in 1 minute from same ip
self.log.debug("Connection flood detected from %s" % ip)
time.sleep(30)
self.sleep(30)
sock.close()
return False
else:
@ -207,7 +262,7 @@ class ConnectionServer(object):
return connection
# No connection found
if create and not config.offline: # Allow to create new connection if not found
if create and self.allowsCreatingConnections():
if port == 0:
raise Exception("This peer is not connectable")
@ -233,7 +288,7 @@ class ConnectionServer(object):
raise err
if len(self.connections) > config.global_connected_limit:
gevent.spawn(self.checkMaxConnections)
self.spawn(self.checkMaxConnections)
return connection
else:
@ -256,7 +311,7 @@ class ConnectionServer(object):
def checkConnections(self):
run_i = 0
time.sleep(15)
self.sleep(15)
while self.running:
run_i += 1
self.ip_incoming = {} # Reset connected ips counter
@ -321,7 +376,7 @@ class ConnectionServer(object):
if time.time() - s > 0.01:
self.log.debug("Connection cleanup in %.3fs" % (time.time() - s))
time.sleep(15)
self.sleep(15)
self.log.debug("Checkconnections ended")
@util.Noparallel(blocking=False)
@ -385,10 +440,10 @@ class ConnectionServer(object):
if self.has_internet:
self.internet_online_since = time.time()
gevent.spawn(self.onInternetOnline)
self.spawn(self.onInternetOnline)
else:
self.internet_offline_since = time.time()
gevent.spawn(self.onInternetOffline)
self.spawn(self.onInternetOffline)
def isInternetOnline(self):
return self.has_internet
@ -400,6 +455,20 @@ class ConnectionServer(object):
self.had_external_incoming = False
self.log.info("Internet offline")
def allowsCreatingConnections(self):
if config.offline:
return False
if self.stopping:
return False
return True
def allowsAcceptingConnections(self):
if config.offline:
return False
if self.stopping:
return False
return True
def getTimecorrection(self):
corrections = sorted([
connection.handshake.get("time") - connection.handshake_time + connection.last_ping_delay

View file

@ -64,6 +64,8 @@ class FileServer(ConnectionServer):
ConnectionServer.__init__(self, ip, port, self.handleRequest)
log.debug("Supported IP types: %s" % self.supported_ip_types)
self.managed_pools["update"] = self.pool
if ip_type == "dual" and ip == "::":
# Also bind to ipv4 addres in dual mode
try:
@ -76,18 +78,28 @@ class FileServer(ConnectionServer):
self.port_opened = {}
self.sites = self.site_manager.sites
self.last_request = time.time()
self.files_parsing = {}
self.ui_server = None
def getSites(self):
sites = self.site_manager.list()
# We need to keep self.sites for the backward compatibility with plugins.
# Never. Ever. Use it.
# TODO: fix plugins
self.sites = sites
return sites
def getSite(self, address):
return self.getSites().get(address, None)
def getSiteAddresses(self):
# Avoid saving the site list on the stack, since a site may be deleted
# from the original list while iterating.
# Use the list of addresses instead.
return [
site.address for site in
sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True)
sorted(list(self.getSites().values()), key=lambda site: site.settings.get("modified", 0), reverse=True)
]
def getRandomPort(self, ip, port_range_from, port_range_to):
@ -110,7 +122,7 @@ class FileServer(ConnectionServer):
log.info("Found unused random port: %s" % port)
return port
else:
time.sleep(0.1)
self.sleep(0.1)
return False
def isIpv6Supported(self):
@ -157,7 +169,7 @@ class FileServer(ConnectionServer):
req = FileRequest(self, connection)
req.route(message["cmd"], message.get("req_id"), message.get("params"))
if not connection.is_private_ip:
self.setInternetStatus(self, True)
self.setInternetStatus(True)
def onInternetOnline(self):
log.info("Internet online")
@ -167,7 +179,7 @@ class FileServer(ConnectionServer):
)
self.invalidateUpdateTime(invalid_interval)
self.recheck_port = True
gevent.spawn(self.updateSites)
self.spawn(self.updateSites)
# Reload the FileRequest class to prevent restarts in debug mode
def reload(self):
@ -201,7 +213,7 @@ class FileServer(ConnectionServer):
self.ui_server.updateWebsocket()
if "ipv6" in self.supported_ip_types:
res_ipv6_thread = gevent.spawn(self.portchecker.portCheck, self.port, "ipv6")
res_ipv6_thread = self.spawn(self.portchecker.portCheck, self.port, "ipv6")
else:
res_ipv6_thread = None
@ -251,7 +263,7 @@ class FileServer(ConnectionServer):
if not self.recheck_port:
return
if not self.port_opened or self.recheck_port:
if not self.port_opened:
self.portCheck()
if not self.port_opened["ipv4"]:
self.tor_manager.startOnions()
@ -259,21 +271,28 @@ class FileServer(ConnectionServer):
# Returns False if Internet is immediately available
# Returns True if we've spent some time waiting for Internet
# Returns None if FileServer is stopping or the Offline mode is enabled
def waitForInternetOnline(self):
if config.offline or self.stopping:
return None
if self.isInternetOnline():
return False
while not self.isInternetOnline():
time.sleep(15)
self.sleep(30)
if config.offline or self.stopping:
return None
if self.isInternetOnline():
break
if not self.update_pool.full():
self.update_pool.spawn(self.updateRandomSite)
if len(self.update_pool) == 0:
thread = self.update_pool.spawn(self.updateRandomSite)
thread.join()
self.recheckPort()
return True
def updateRandomSite(self, site_addresses=None):
def updateRandomSite(self, site_addresses=None, force=False):
if not site_addresses:
site_addresses = self.getSiteAddresses()
@ -282,22 +301,19 @@ class FileServer(ConnectionServer):
return
address = site_addresses[0]
site = self.sites.get(address, None)
site = self.getSite(address)
if not site or not site.isServing():
if not site:
return
log.debug("Checking randomly chosen site: %s", site.address_short)
self.updateSite(site)
self.updateSite(site, force=force)
def updateSite(self, site, check_files=False, dry_run=False):
if not site or not site.isServing():
def updateSite(self, site, check_files=False, force=False, dry_run=False):
if not site:
return False
return site.considerUpdate(check_files=check_files, dry_run=dry_run)
def getSite(self, address):
return self.sites.get(address, None)
return site.considerUpdate(check_files=check_files, force=force, dry_run=dry_run)
def invalidateUpdateTime(self, invalid_interval):
for address in self.getSiteAddresses():
@ -313,8 +329,8 @@ class FileServer(ConnectionServer):
log.info("%s: started", task_description)
# Don't wait port opening on first startup. Do the instant check now.
if len(self.sites) <= 2:
for address, site in list(self.sites.items()):
if len(self.getSites()) <= 2:
for address, site in list(self.getSites().items()):
self.updateSite(site, check_files=check_files)
all_site_addresses = self.getSiteAddresses()
@ -334,9 +350,12 @@ class FileServer(ConnectionServer):
# Check sites integrity
for site_address in site_addresses:
if check_files:
time.sleep(10)
self.sleep(10)
else:
time.sleep(1)
self.sleep(1)
if self.stopping:
break
site = self.getSite(site_address)
if not self.updateSite(site, check_files=check_files, dry_run=True):
@ -345,10 +364,9 @@ class FileServer(ConnectionServer):
sites_processed += 1
while 1:
while self.running:
self.waitForInternetOnline()
thread = self.update_pool.spawn(self.updateSite,
site, check_files=check_files)
thread = self.update_pool.spawn(self.updateSite, site, check_files=check_files)
if check_files:
# Limit the concurency
# ZeroNet may be laggy when running from HDD.
@ -383,8 +401,11 @@ class FileServer(ConnectionServer):
long_timeout = min_long_timeout
short_cycle_time_limit = 60 * 2
while 1:
time.sleep(long_timeout)
while self.running:
self.sleep(long_timeout)
if self.stopping:
break
start_time = time.time()
@ -400,8 +421,11 @@ class FileServer(ConnectionServer):
sites_processed = 0
for site_address in site_addresses:
site = self.sites.get(site_address, None)
if (not site) or (not site.isServing()):
if self.stopping:
break
site = self.getSite(site_address)
if not site:
continue
log.debug("Running maintenance for site: %s", site.address_short)
@ -410,7 +434,7 @@ class FileServer(ConnectionServer):
site = None
if done:
sites_processed += 1
time.sleep(short_timeout)
self.sleep(short_timeout)
# If we host hundreds of sites, the full maintenance cycle may take very
# long time, especially on startup ( > 1 hour).
@ -454,19 +478,24 @@ class FileServer(ConnectionServer):
# are interested in connecting to them), we initiate some traffic by
# performing the update for a random site. It's way better than just
# silly pinging a random peer for no profit.
while 1:
threshold = self.internet_outage_threshold / 2.0
time.sleep(threshold / 2.0)
while self.running:
self.waitForInternetOnline()
threshold = self.internet_outage_threshold / 2.0
self.sleep(threshold / 2.0)
if self.stopping:
break
last_activity_time = max(
self.last_successful_internet_activity_time,
self.last_outgoing_internet_activity_time)
now = time.time()
if not len(self.sites):
if not len(self.getSites()):
continue
if last_activity_time > now - threshold:
continue
if self.update_pool.full():
if len(self.update_pool) == 0:
continue
log.info("No network activity for %.2fs. Running an update for a random site.",
@ -481,16 +510,18 @@ class FileServer(ConnectionServer):
# We should check if the files have actually changed,
# and do it more often.
interval = 60 * 10
while 1:
time.sleep(interval)
while self.running:
self.sleep(interval)
if self.stopping:
break
config.loadTrackersFile()
# Detects if computer back from wakeup
def wakeupWatcher(self):
last_time = time.time()
last_my_ips = socket.gethostbyname_ex('')[2]
while 1:
time.sleep(30)
while self.running:
self.sleep(30)
is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3
if is_time_changed:
# If taken more than 3 minute then the computer was in sleep mode
@ -511,17 +542,28 @@ class FileServer(ConnectionServer):
)
self.invalidateUpdateTime(invalid_interval)
self.recheck_port = True
gevent.spawn(self.updateSites)
self.spawn(self.updateSites)
last_time = time.time()
last_my_ips = my_ips
# Bind and start serving sites
def start(self, check_sites=True):
# If passive_mode is False, FileServer starts the full-featured file serving:
# * Checks for updates at startup.
# * Checks site's integrity.
# * Runs periodic update checks.
# * Watches for internet being up or down and for computer to wake up and runs update checks.
# If passive_mode is True, all the mentioned activity is disabled.
def start(self, passive_mode=False, check_sites=None, check_connections=True):
# Backward compatibility for a misnamed argument:
if check_sites is not None:
passive_mode = not check_sites
if self.stopping:
return False
ConnectionServer.start(self)
ConnectionServer.start(self, check_connections=check_connections)
try:
self.stream_server.start()
@ -532,22 +574,22 @@ class FileServer(ConnectionServer):
sys.modules["main"].ui_server.stop()
return False
self.sites = self.site_manager.list()
if config.debug:
# Auto reload FileRequest on change
from Debug import DebugReloader
DebugReloader.watcher.addCallback(self.reload)
if check_sites: # Open port, Update sites, Check files integrity
gevent.spawn(self.updateSites, check_files=True)
if not passive_mode:
self.spawn(self.updateSites)
thread_reaload_tracker_files = self.spawn(self.reloadTrackerFilesThread)
thread_sites_maintenance_full = self.spawn(self.sitesMaintenanceThread, mode="full")
thread_sites_maintenance_short = self.spawn(self.sitesMaintenanceThread, mode="short")
thread_keep_alive = self.spawn(self.keepAliveThread)
thread_wakeup_watcher = self.spawn(self.wakeupWatcher)
gevent.spawn(self.updateSites)
self.sleep(0.1)
self.spawn(self.updateSites, check_files=True)
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full")
thread_sites_maintenance_short = gevent.spawn(self.sitesMaintenanceThread, mode="short")
thread_keep_alive = gevent.spawn(self.keepAliveThread)
thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
ConnectionServer.listen(self)

View file

@ -172,6 +172,8 @@ class Site(object):
def isServing(self):
if config.offline:
return False
elif self.connection_server.stopping:
return False
else:
return self.settings["serving"]
@ -532,7 +534,7 @@ class Site(object):
a = b
if a <= self.last_online_update and self.last_online_update <= b:
self.last_online_update = 0
self.log.info("Update time invalidated")
self.log.debug("Update time invalidated")
def isUpdateTimeValid(self):
if not self.last_online_update:
@ -593,33 +595,12 @@ class Site(object):
self.updateWebsocket(updated=True)
@util.Noparallel(queue=True, ignore_args=True)
def considerUpdate(self, check_files=False, dry_run=False):
if not self.isServing():
def _considerUpdate_realJob(self, check_files=False, force=False):
if not self._considerUpdate_check(check_files=check_files, force=force, log_reason=True):
return False
online = self.connection_server.isInternetOnline()
run_update = False
msg = None
if not online:
run_update = True
msg = "network connection seems broken, trying to update the site to check if the network is up"
elif check_files:
run_update = True
msg = "checking site's files..."
elif not self.isUpdateTimeValid():
run_update = True
msg = "update time is not valid, updating now..."
if not run_update:
return False
if dry_run:
return True
self.log.debug(msg)
# TODO: there should be a configuration options controlling:
# * whether to check files on the program startup
# * whether to check files during the run time and how often
@ -628,6 +609,7 @@ class Site(object):
if len(self.peers) < 50:
self.announce(mode="update")
online = online and self.connection_server.isInternetOnline()
self.update(check_files=check_files)
@ -636,6 +618,39 @@ class Site(object):
return True
def _considerUpdate_check(self, check_files=False, force=False, log_reason=False):
if not self.isServing():
return False
online = self.connection_server.isInternetOnline()
run_update = False
msg = None
if force:
run_update = True
msg = "forcing site update"
elif not online:
run_update = True
msg = "network connection seems broken, trying to update a site to check if the network is up"
elif check_files:
run_update = True
msg = "checking site's files..."
elif not self.isUpdateTimeValid():
run_update = True
msg = "update time is not valid, updating now..."
if run_update and log_reason:
self.log.debug(msg)
return run_update
def considerUpdate(self, check_files=False, force=False, dry_run=False):
run_update = self._considerUpdate_check(check_files=check_files, force=force)
if run_update and not dry_run:
run_update = self._considerUpdate_realJob(check_files=check_files, force=force)
return run_update
# Update site by redownload all content.json
def redownloadContents(self):
# Download all content.json again
@ -1085,6 +1100,9 @@ class Site(object):
# Keep connections
def needConnections(self, num=None, check_site_on_reconnect=False, pex=True):
if not self.connection_server.allowsCreatingConnections():
return
if num is None:
num = self.getPreferableActiveConnectionCount()

View file

@ -127,6 +127,9 @@ class SiteAnnouncer(object):
@util.Noparallel()
def announce(self, force=False, mode="start", pex=True):
if not self.site.isServing():
return
if time.time() - self.time_last_announce < 30 and not force:
return # No reannouncing within 30 secs
@ -300,6 +303,9 @@ class SiteAnnouncer(object):
@util.Noparallel(blocking=False)
def announcePex(self, query_num=2, need_num=10):
if not self.site.isServing():
return
self.updateWebsocket(pex="announcing")
peers = self.site.getConnectedPeers()