From c84b413f58bfc810f1e56b7e6414367f81efadf6 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Tue, 3 Nov 2020 21:21:33 +0700 Subject: [PATCH] Refactor ConnectionServer, FileServer; fix bugs introduced in previous commits --- src/Connection/ConnectionServer.py | 89 +++++++++++++++-- src/File/FileServer.py | 148 ++++++++++++++++++----------- src/Site/Site.py | 66 ++++++++----- src/Site/SiteAnnouncer.py | 6 ++ 4 files changed, 222 insertions(+), 87 deletions(-) diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 3ec0932d..b66d1739 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -8,6 +8,7 @@ import gevent import msgpack from gevent.server import StreamServer from gevent.pool import Pool +import gevent.event import util from util import helper @@ -35,6 +36,8 @@ class ConnectionServer(object): self.port_opened = {} self.peer_blacklist = SiteManager.peer_blacklist + self.managed_pools = {} + self.tor_manager = TorManager(self.ip, self.port) self.connections = [] # Connections self.whitelist = config.ip_local # No flood protection on this ips @@ -53,8 +56,12 @@ class ConnectionServer(object): self.stream_server_proxy = None self.running = False self.stopping = False + self.stopping_event = gevent.event.Event() self.thread_checker = None + self.thread_pool = Pool(None) + self.managed_pools["thread"] = self.thread_pool + self.stat_recv = defaultdict(lambda: defaultdict(int)) self.stat_sent = defaultdict(lambda: defaultdict(int)) self.bytes_recv = 0 @@ -70,6 +77,7 @@ class ConnectionServer(object): self.timecorrection = 0.0 self.pool = Pool(500) # do not accept more than 500 connections + self.managed_pools["incoming"] = self.pool # Bittorrent style peerid self.peer_id = "-UT3530-%s" % CryptHash.random(12, "base64") @@ -90,7 +98,7 @@ class ConnectionServer(object): return False self.running = True if check_connections: - self.thread_checker = gevent.spawn(self.checkConnections) + self.thread_checker = self.spawn(self.checkConnections) CryptConnection.manager.loadCerts() if config.tor != "disable": self.tor_manager.start() @@ -114,7 +122,7 @@ class ConnectionServer(object): return None if self.stream_server_proxy: - gevent.spawn(self.listenProxy) + self.spawn(self.listenProxy) try: self.stream_server.serve_forever() except Exception as err: @@ -126,18 +134,65 @@ class ConnectionServer(object): self.log.debug("Stopping %s" % self.stream_server) self.stopping = True self.running = False + self.stopping_event.set() + self.onStop() + + def onStop(self): + prev_sizes = {} + for i in range(60): + sizes = {} + total_size = 0 + + for name, pool in self.managed_pools.items(): + pool.join(timeout=1) + size = len(pool) + sizes[name] = size + total_size += size + + if total_size == 0: + break + + if prev_sizes != sizes: + s = "" + for name, size in sizes.items(): + s += "%s pool: %s, " % (name, size) + s += "total: %s" % total_size + + self.log.info("Waiting for tasks in managed pools to stop: %s", s) + + prev_sizes = sizes + + for name, pool in self.managed_pools.items(): + size = len(pool) + if size: + self.log.info("Killing %s tasks in %s pool", size, name) + pool.kill() + if self.thread_checker: gevent.kill(self.thread_checker) + self.thread_checker = None if self.stream_server: self.stream_server.stop() + # Sleeps the specified amount of time or until ConnectionServer is stopped + def sleep(self, t): + if t: + self.stopping_event.wait(timeout=t) + else: + time.sleep(t) + + # Spawns a thread that will be waited for on server being stooped (and killed after a timeout) + def spawn(self, *args, **kwargs): + thread = self.thread_pool.spawn(*args, **kwargs) + return thread + def closeConnections(self): self.log.debug("Closing all connection: %s" % len(self.connections)) for connection in self.connections[:]: connection.close("Close all connections") def handleIncomingConnection(self, sock, addr): - if config.offline: + if self.allowsAcceptingConnections(): sock.close() return False @@ -155,7 +210,7 @@ class ConnectionServer(object): self.ip_incoming[ip] += 1 if self.ip_incoming[ip] > 6: # Allow 6 in 1 minute from same ip self.log.debug("Connection flood detected from %s" % ip) - time.sleep(30) + self.sleep(30) sock.close() return False else: @@ -207,7 +262,7 @@ class ConnectionServer(object): return connection # No connection found - if create and not config.offline: # Allow to create new connection if not found + if create and self.allowsCreatingConnections(): if port == 0: raise Exception("This peer is not connectable") @@ -233,7 +288,7 @@ class ConnectionServer(object): raise err if len(self.connections) > config.global_connected_limit: - gevent.spawn(self.checkMaxConnections) + self.spawn(self.checkMaxConnections) return connection else: @@ -256,7 +311,7 @@ class ConnectionServer(object): def checkConnections(self): run_i = 0 - time.sleep(15) + self.sleep(15) while self.running: run_i += 1 self.ip_incoming = {} # Reset connected ips counter @@ -321,7 +376,7 @@ class ConnectionServer(object): if time.time() - s > 0.01: self.log.debug("Connection cleanup in %.3fs" % (time.time() - s)) - time.sleep(15) + self.sleep(15) self.log.debug("Checkconnections ended") @util.Noparallel(blocking=False) @@ -385,10 +440,10 @@ class ConnectionServer(object): if self.has_internet: self.internet_online_since = time.time() - gevent.spawn(self.onInternetOnline) + self.spawn(self.onInternetOnline) else: self.internet_offline_since = time.time() - gevent.spawn(self.onInternetOffline) + self.spawn(self.onInternetOffline) def isInternetOnline(self): return self.has_internet @@ -400,6 +455,20 @@ class ConnectionServer(object): self.had_external_incoming = False self.log.info("Internet offline") + def allowsCreatingConnections(self): + if config.offline: + return False + if self.stopping: + return False + return True + + def allowsAcceptingConnections(self): + if config.offline: + return False + if self.stopping: + return False + return True + def getTimecorrection(self): corrections = sorted([ connection.handshake.get("time") - connection.handshake_time + connection.last_ping_delay diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 66c8c135..b228680a 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -64,6 +64,8 @@ class FileServer(ConnectionServer): ConnectionServer.__init__(self, ip, port, self.handleRequest) log.debug("Supported IP types: %s" % self.supported_ip_types) + self.managed_pools["update"] = self.pool + if ip_type == "dual" and ip == "::": # Also bind to ipv4 addres in dual mode try: @@ -76,18 +78,28 @@ class FileServer(ConnectionServer): self.port_opened = {} - self.sites = self.site_manager.sites self.last_request = time.time() self.files_parsing = {} self.ui_server = None + def getSites(self): + sites = self.site_manager.list() + # We need to keep self.sites for the backward compatibility with plugins. + # Never. Ever. Use it. + # TODO: fix plugins + self.sites = sites + return sites + + def getSite(self, address): + return self.getSites().get(address, None) + def getSiteAddresses(self): # Avoid saving the site list on the stack, since a site may be deleted # from the original list while iterating. # Use the list of addresses instead. return [ site.address for site in - sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True) + sorted(list(self.getSites().values()), key=lambda site: site.settings.get("modified", 0), reverse=True) ] def getRandomPort(self, ip, port_range_from, port_range_to): @@ -110,7 +122,7 @@ class FileServer(ConnectionServer): log.info("Found unused random port: %s" % port) return port else: - time.sleep(0.1) + self.sleep(0.1) return False def isIpv6Supported(self): @@ -157,7 +169,7 @@ class FileServer(ConnectionServer): req = FileRequest(self, connection) req.route(message["cmd"], message.get("req_id"), message.get("params")) if not connection.is_private_ip: - self.setInternetStatus(self, True) + self.setInternetStatus(True) def onInternetOnline(self): log.info("Internet online") @@ -167,7 +179,7 @@ class FileServer(ConnectionServer): ) self.invalidateUpdateTime(invalid_interval) self.recheck_port = True - gevent.spawn(self.updateSites) + self.spawn(self.updateSites) # Reload the FileRequest class to prevent restarts in debug mode def reload(self): @@ -201,7 +213,7 @@ class FileServer(ConnectionServer): self.ui_server.updateWebsocket() if "ipv6" in self.supported_ip_types: - res_ipv6_thread = gevent.spawn(self.portchecker.portCheck, self.port, "ipv6") + res_ipv6_thread = self.spawn(self.portchecker.portCheck, self.port, "ipv6") else: res_ipv6_thread = None @@ -251,7 +263,7 @@ class FileServer(ConnectionServer): if not self.recheck_port: return - if not self.port_opened or self.recheck_port: + if not self.port_opened: self.portCheck() if not self.port_opened["ipv4"]: self.tor_manager.startOnions() @@ -259,21 +271,28 @@ class FileServer(ConnectionServer): # Returns False if Internet is immediately available # Returns True if we've spent some time waiting for Internet + # Returns None if FileServer is stopping or the Offline mode is enabled def waitForInternetOnline(self): + if config.offline or self.stopping: + return None + if self.isInternetOnline(): return False while not self.isInternetOnline(): - time.sleep(15) + self.sleep(30) + if config.offline or self.stopping: + return None if self.isInternetOnline(): break - if not self.update_pool.full(): - self.update_pool.spawn(self.updateRandomSite) + if len(self.update_pool) == 0: + thread = self.update_pool.spawn(self.updateRandomSite) + thread.join() self.recheckPort() return True - def updateRandomSite(self, site_addresses=None): + def updateRandomSite(self, site_addresses=None, force=False): if not site_addresses: site_addresses = self.getSiteAddresses() @@ -282,22 +301,19 @@ class FileServer(ConnectionServer): return address = site_addresses[0] - site = self.sites.get(address, None) + site = self.getSite(address) - if not site or not site.isServing(): + if not site: return log.debug("Checking randomly chosen site: %s", site.address_short) - self.updateSite(site) + self.updateSite(site, force=force) - def updateSite(self, site, check_files=False, dry_run=False): - if not site or not site.isServing(): + def updateSite(self, site, check_files=False, force=False, dry_run=False): + if not site: return False - return site.considerUpdate(check_files=check_files, dry_run=dry_run) - - def getSite(self, address): - return self.sites.get(address, None) + return site.considerUpdate(check_files=check_files, force=force, dry_run=dry_run) def invalidateUpdateTime(self, invalid_interval): for address in self.getSiteAddresses(): @@ -313,8 +329,8 @@ class FileServer(ConnectionServer): log.info("%s: started", task_description) # Don't wait port opening on first startup. Do the instant check now. - if len(self.sites) <= 2: - for address, site in list(self.sites.items()): + if len(self.getSites()) <= 2: + for address, site in list(self.getSites().items()): self.updateSite(site, check_files=check_files) all_site_addresses = self.getSiteAddresses() @@ -334,9 +350,12 @@ class FileServer(ConnectionServer): # Check sites integrity for site_address in site_addresses: if check_files: - time.sleep(10) + self.sleep(10) else: - time.sleep(1) + self.sleep(1) + + if self.stopping: + break site = self.getSite(site_address) if not self.updateSite(site, check_files=check_files, dry_run=True): @@ -345,10 +364,9 @@ class FileServer(ConnectionServer): sites_processed += 1 - while 1: + while self.running: self.waitForInternetOnline() - thread = self.update_pool.spawn(self.updateSite, - site, check_files=check_files) + thread = self.update_pool.spawn(self.updateSite, site, check_files=check_files) if check_files: # Limit the concurency # ZeroNet may be laggy when running from HDD. @@ -383,8 +401,11 @@ class FileServer(ConnectionServer): long_timeout = min_long_timeout short_cycle_time_limit = 60 * 2 - while 1: - time.sleep(long_timeout) + while self.running: + self.sleep(long_timeout) + + if self.stopping: + break start_time = time.time() @@ -400,8 +421,11 @@ class FileServer(ConnectionServer): sites_processed = 0 for site_address in site_addresses: - site = self.sites.get(site_address, None) - if (not site) or (not site.isServing()): + if self.stopping: + break + + site = self.getSite(site_address) + if not site: continue log.debug("Running maintenance for site: %s", site.address_short) @@ -410,7 +434,7 @@ class FileServer(ConnectionServer): site = None if done: sites_processed += 1 - time.sleep(short_timeout) + self.sleep(short_timeout) # If we host hundreds of sites, the full maintenance cycle may take very # long time, especially on startup ( > 1 hour). @@ -454,19 +478,24 @@ class FileServer(ConnectionServer): # are interested in connecting to them), we initiate some traffic by # performing the update for a random site. It's way better than just # silly pinging a random peer for no profit. - while 1: - threshold = self.internet_outage_threshold / 2.0 - time.sleep(threshold / 2.0) + while self.running: self.waitForInternetOnline() + + threshold = self.internet_outage_threshold / 2.0 + + self.sleep(threshold / 2.0) + if self.stopping: + break + last_activity_time = max( self.last_successful_internet_activity_time, self.last_outgoing_internet_activity_time) now = time.time() - if not len(self.sites): + if not len(self.getSites()): continue if last_activity_time > now - threshold: continue - if self.update_pool.full(): + if len(self.update_pool) == 0: continue log.info("No network activity for %.2fs. Running an update for a random site.", @@ -481,16 +510,18 @@ class FileServer(ConnectionServer): # We should check if the files have actually changed, # and do it more often. interval = 60 * 10 - while 1: - time.sleep(interval) + while self.running: + self.sleep(interval) + if self.stopping: + break config.loadTrackersFile() # Detects if computer back from wakeup def wakeupWatcher(self): last_time = time.time() last_my_ips = socket.gethostbyname_ex('')[2] - while 1: - time.sleep(30) + while self.running: + self.sleep(30) is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3 if is_time_changed: # If taken more than 3 minute then the computer was in sleep mode @@ -511,17 +542,28 @@ class FileServer(ConnectionServer): ) self.invalidateUpdateTime(invalid_interval) self.recheck_port = True - gevent.spawn(self.updateSites) + self.spawn(self.updateSites) last_time = time.time() last_my_ips = my_ips # Bind and start serving sites - def start(self, check_sites=True): + # If passive_mode is False, FileServer starts the full-featured file serving: + # * Checks for updates at startup. + # * Checks site's integrity. + # * Runs periodic update checks. + # * Watches for internet being up or down and for computer to wake up and runs update checks. + # If passive_mode is True, all the mentioned activity is disabled. + def start(self, passive_mode=False, check_sites=None, check_connections=True): + + # Backward compatibility for a misnamed argument: + if check_sites is not None: + passive_mode = not check_sites + if self.stopping: return False - ConnectionServer.start(self) + ConnectionServer.start(self, check_connections=check_connections) try: self.stream_server.start() @@ -532,22 +574,22 @@ class FileServer(ConnectionServer): sys.modules["main"].ui_server.stop() return False - self.sites = self.site_manager.list() if config.debug: # Auto reload FileRequest on change from Debug import DebugReloader DebugReloader.watcher.addCallback(self.reload) - if check_sites: # Open port, Update sites, Check files integrity - gevent.spawn(self.updateSites, check_files=True) + if not passive_mode: + self.spawn(self.updateSites) + thread_reaload_tracker_files = self.spawn(self.reloadTrackerFilesThread) + thread_sites_maintenance_full = self.spawn(self.sitesMaintenanceThread, mode="full") + thread_sites_maintenance_short = self.spawn(self.sitesMaintenanceThread, mode="short") + thread_keep_alive = self.spawn(self.keepAliveThread) + thread_wakeup_watcher = self.spawn(self.wakeupWatcher) - gevent.spawn(self.updateSites) + self.sleep(0.1) + self.spawn(self.updateSites, check_files=True) - thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) - thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full") - thread_sites_maintenance_short = gevent.spawn(self.sitesMaintenanceThread, mode="short") - thread_keep_alive = gevent.spawn(self.keepAliveThread) - thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) ConnectionServer.listen(self) diff --git a/src/Site/Site.py b/src/Site/Site.py index e808b706..1581a106 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -172,6 +172,8 @@ class Site(object): def isServing(self): if config.offline: return False + elif self.connection_server.stopping: + return False else: return self.settings["serving"] @@ -532,7 +534,7 @@ class Site(object): a = b if a <= self.last_online_update and self.last_online_update <= b: self.last_online_update = 0 - self.log.info("Update time invalidated") + self.log.debug("Update time invalidated") def isUpdateTimeValid(self): if not self.last_online_update: @@ -593,33 +595,12 @@ class Site(object): self.updateWebsocket(updated=True) @util.Noparallel(queue=True, ignore_args=True) - def considerUpdate(self, check_files=False, dry_run=False): - if not self.isServing(): + def _considerUpdate_realJob(self, check_files=False, force=False): + if not self._considerUpdate_check(check_files=check_files, force=force, log_reason=True): return False online = self.connection_server.isInternetOnline() - run_update = False - msg = None - - if not online: - run_update = True - msg = "network connection seems broken, trying to update the site to check if the network is up" - elif check_files: - run_update = True - msg = "checking site's files..." - elif not self.isUpdateTimeValid(): - run_update = True - msg = "update time is not valid, updating now..." - - if not run_update: - return False - - if dry_run: - return True - - self.log.debug(msg) - # TODO: there should be a configuration options controlling: # * whether to check files on the program startup # * whether to check files during the run time and how often @@ -628,6 +609,7 @@ class Site(object): if len(self.peers) < 50: self.announce(mode="update") + online = online and self.connection_server.isInternetOnline() self.update(check_files=check_files) @@ -636,6 +618,39 @@ class Site(object): return True + def _considerUpdate_check(self, check_files=False, force=False, log_reason=False): + if not self.isServing(): + return False + + online = self.connection_server.isInternetOnline() + + run_update = False + msg = None + + if force: + run_update = True + msg = "forcing site update" + elif not online: + run_update = True + msg = "network connection seems broken, trying to update a site to check if the network is up" + elif check_files: + run_update = True + msg = "checking site's files..." + elif not self.isUpdateTimeValid(): + run_update = True + msg = "update time is not valid, updating now..." + + if run_update and log_reason: + self.log.debug(msg) + + return run_update + + def considerUpdate(self, check_files=False, force=False, dry_run=False): + run_update = self._considerUpdate_check(check_files=check_files, force=force) + if run_update and not dry_run: + run_update = self._considerUpdate_realJob(check_files=check_files, force=force) + return run_update + # Update site by redownload all content.json def redownloadContents(self): # Download all content.json again @@ -1085,6 +1100,9 @@ class Site(object): # Keep connections def needConnections(self, num=None, check_site_on_reconnect=False, pex=True): + if not self.connection_server.allowsCreatingConnections(): + return + if num is None: num = self.getPreferableActiveConnectionCount() diff --git a/src/Site/SiteAnnouncer.py b/src/Site/SiteAnnouncer.py index 5a97807e..6a510583 100644 --- a/src/Site/SiteAnnouncer.py +++ b/src/Site/SiteAnnouncer.py @@ -127,6 +127,9 @@ class SiteAnnouncer(object): @util.Noparallel() def announce(self, force=False, mode="start", pex=True): + if not self.site.isServing(): + return + if time.time() - self.time_last_announce < 30 and not force: return # No reannouncing within 30 secs @@ -300,6 +303,9 @@ class SiteAnnouncer(object): @util.Noparallel(blocking=False) def announcePex(self, query_num=2, need_num=10): + if not self.site.isServing(): + return + self.updateWebsocket(pex="announcing") peers = self.site.getConnectedPeers()