From d1b9cc826153248f4340284f531ab4258aa927e1 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Fri, 30 Oct 2020 23:28:16 +0700 Subject: [PATCH] Redesign the Internet outage detection. Improvements in FileServer threads. --- src/Connection/Connection.py | 12 +++ src/Connection/ConnectionServer.py | 65 +++++++++++---- src/File/FileServer.py | 125 +++++++++++++++++++++++++---- src/Site/Site.py | 1 + 4 files changed, 173 insertions(+), 30 deletions(-) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 22bcf29c..27ae3734 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -442,6 +442,7 @@ class Connection(object): def handleMessage(self, message): cmd = message["cmd"] + self.updateOnlineStatus(successful_activity=True) self.last_message_time = time.time() self.last_cmd_recv = cmd if cmd == "response": # New style response @@ -504,6 +505,7 @@ class Connection(object): # Send data to connection def send(self, message, streaming=False): + self.updateOnlineStatus(outgoing_activity=True) self.last_send_time = time.time() if config.debug_socket: self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % ( @@ -543,6 +545,11 @@ class Connection(object): message = None with self.send_lock: self.sock.sendall(data) + # XXX: Should not be used here: + # self.updateOnlineStatus(successful_activity=True) + # Looks like self.sock.sendall() returns normally, instead of + # raising an Exception (at least, some times). + # So the only way of detecting the network activity is self.handleMessage() except Exception as err: self.close("Send error: %s (cmd: %s)" % (err, stat_key)) return False @@ -633,3 +640,8 @@ class Connection(object): self.sock = None self.unpacker = None self.event_connected = None + + def updateOnlineStatus(self, outgoing_activity=False, successful_activity=False): + self.server.updateOnlineStatus(self, + outgoing_activity=outgoing_activity, + successful_activity=successful_activity) diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 8d377aca..66b50608 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -41,7 +41,11 @@ class ConnectionServer(object): self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood self.broken_ssl_ips = {} # Peerids of broken ssl connections self.ips = {} # Connection by ip + self.has_internet = True # Internet outage detection + self.last_outgoing_internet_activity_time = 0 # Last time the application tried to send any data + self.last_successful_internet_activity_time = 0 # Last time the application successfully sent or received any data + self.internet_outage_threshold = 60 * 2 self.stream_server = None self.stream_server_proxy = None @@ -60,6 +64,8 @@ class ConnectionServer(object): self.num_outgoing = 0 self.had_external_incoming = False + + self.timecorrection = 0.0 self.pool = Pool(500) # do not accept more than 500 connections @@ -252,8 +258,8 @@ class ConnectionServer(object): while self.running: run_i += 1 self.ip_incoming = {} # Reset connected ips counter - last_message_time = 0 s = time.time() + self.updateOnlineStatus(None) for connection in self.connections[:]: # Make a copy if connection.ip.endswith(".onion") or config.tor == "always": timeout_multipler = 2 @@ -261,9 +267,6 @@ class ConnectionServer(object): timeout_multipler = 1 idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time) - if connection.last_message_time > last_message_time and not connection.is_private_ip: - # Message from local IPs does not means internet connection - last_message_time = connection.last_message_time if connection.unpacker and idle > 30: # Delete the unpacker if not needed @@ -311,18 +314,6 @@ class ConnectionServer(object): # Reset bad action counter every 30 min connection.bad_actions = 0 - # Internet outage detection - if time.time() - last_message_time > max(60, 60 * 10 / max(1, float(len(self.connections)) / 50)): - # Offline: Last message more than 60-600sec depending on connection number - if self.has_internet and last_message_time: - self.has_internet = False - self.onInternetOffline() - else: - # Online - if not self.has_internet: - self.has_internet = True - self.onInternetOnline() - self.timecorrection = self.getTimecorrection() if time.time() - s > 0.01: @@ -353,6 +344,48 @@ class ConnectionServer(object): )) return num_closed + # Internet outage detection + def updateOnlineStatus(self, connection, outgoing_activity=False, successful_activity=False): + + now = time.time() + + if connection and not connection.is_private_ip: + if outgoing_activity: + self.last_outgoing_internet_activity_time = now + if successful_activity: + self.last_successful_internet_activity_time = now + self.setInternetStatus(True) + return + + if not self.last_outgoing_internet_activity_time: + return + + if ( + (self.last_successful_internet_activity_time < now - self.internet_outage_threshold) + and + (self.last_successful_internet_activity_time < self.last_outgoing_internet_activity_time) + ): + self.setInternetStatus(False) + return + + # This is the old algorithm just in case we missed something + idle = now - self.last_successful_internet_activity_time + if idle > max(60, 60 * 10 / max(1, float(len(self.connections)) / 50)): + # Offline: Last successful activity more than 60-600sec depending on connection number + self.setInternetStatus(False) + return + + def setInternetStatus(self, status): + if self.has_internet == status: + return + + self.has_internet = status + + if self.has_internet: + gevent.spawn(self.onInternetOnline) + else: + gevent.spawn(self.onInternetOffline) + def onInternetOnline(self): self.log.info("Internet online") diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 8294d179..c425b3f4 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -238,17 +238,50 @@ class FileServer(ConnectionServer): return res + # Returns False if Internet is immediately available + # Returns True if we've spent some time waiting for Internet + def waitForInternetOnline(self): + if self.has_internet: + return False + + while not self.has_internet: + time.sleep(15) + if self.has_internet: + break + if not self.check_pool.full(): + self.check_pool.spawn(self.updateRandomSite) + + return True + + def updateRandomSite(self, site_addresses=None, check_files=False): + if not site_addresses: + site_addresses = self.getSiteAddresses() + + site_addresses = random.sample(site_addresses, 1) + if len(site_addresses) < 1: + return + + address = site_addresses[0] + site = self.sites.get(address, None) + + if not site or not site.isServing(): + return + + log.debug("Checking randomly chosen site: %s", site.address_short) + + self.checkSite(site, check_files=check_files) + # Check site file integrity def checkSite(self, site, check_files=False): - if not site.isServing(): + if not site or not site.isServing(): return quick_start = len(site.peers) >= 50 if quick_start: - log.debug("Checking site: %s (quick start)" % (site.address)) + log.debug("Checking site: %s (quick start)", site.address_short) else: - log.debug("Checking site: %s" % (site.address)) + log.debug("Checking site: %s", site.address_short) if quick_start: site.setDelayedStartupAnnounce() @@ -291,7 +324,12 @@ class FileServer(ConnectionServer): if (not site) or (not site.isServing()): continue - check_thread = self.check_pool.spawn(self.checkSite, site, check_files) # Check in new thread + + while 1: + self.waitForInternetOnline() + self.check_pool.spawn(self.checkSite, site, check_files) + if not self.waitForInternetOnline(): + break if time.time() - progress_print_time > 60: progress_print_time = time.time() @@ -313,21 +351,25 @@ class FileServer(ConnectionServer): else: time.sleep(1) - log.info("Checking sites: finished in %.3fs" % (time.time() - start_time)) + log.info("Checking sites: finished in %.2fs" % (time.time() - start_time)) - def sitesMaintenanceThread(self): - import gc + def sitesMaintenanceThread(self, mode="full"): startup = True short_timeout = 2 - long_timeout = 60 * 2 + min_long_timeout = 10 + max_long_timeout = 60 * 10 + long_timeout = min_long_timeout + short_cycle_time_limit = 60 * 2 while 1: time.sleep(long_timeout) - gc.collect() # Explicit garbage collection + + start_time = time.time() log.debug( - "Starting maintenance cycle: connections=%s, internet=%s", + "Starting <%s> maintenance cycle: connections=%s, internet=%s", + mode, len(self.connections), self.has_internet ) start_time = time.time() @@ -341,7 +383,7 @@ class FileServer(ConnectionServer): if (not site) or (not site.isServing()): continue - log.debug("Running maintenance for site: %s", site.address) + log.debug("Running maintenance for site: %s", site.address_short) done = site.runPeriodicMaintenance(startup=startup) site = None @@ -349,15 +391,68 @@ class FileServer(ConnectionServer): sites_processed += 1 time.sleep(short_timeout) - log.debug("Maintenance cycle finished in %.3fs. Total sites: %d. Processed sites: %d", + # If we host hundreds of sites, the full maintenance cycle may take very + # long time, especially on startup ( > 1 hour). + # This means we are not able to run the maintenance procedure for active + # sites frequently enough using just a single maintenance thread. + # So we run 2 maintenance threads: + # * One running full cycles. + # * And one running short cycles for the most active sites. + # When the short cycle runs out of the time limit, it restarts + # from the beginning of the site list. + if mode == "short" and time.time() - start_time > short_cycle_time_limit: + break + + log.debug("<%s> maintenance cycle finished in %.2fs. Total sites: %d. Processed sites: %d. Timeout: %d", + mode, time.time() - start_time, len(site_addresses), - sites_processed + sites_processed, + long_timeout ) + if sites_processed: + long_timeout = max(int(long_timeout / 2), min_long_timeout) + else: + long_timeout = min(long_timeout + 1, max_long_timeout) + site_addresses = None startup = False + def keepAliveThread(self): + # This thread is mostly useless on a loaded system, since it never does + # any works, if we have active traffic. + # + # We should initiate some network activity to detect the Internet outage + # and avoid false positives. We normally have some network activity + # initiated by various parts on the application as well as network peers. + # So it's not a problem. + # + # However, if it actually happens that we have no network traffic for + # some time (say, we host just a couple of inactive sites, and no peers + # are interested in connecting to them), we initiate some traffic by + # performing the update for a random site. It's way better than just + # silly pinging a random peer for no profit. + while 1: + threshold = self.internet_outage_threshold / 2.0 + time.sleep(threshold / 2.0) + self.waitForInternetOnline() + last_activity_time = max( + self.last_successful_internet_activity_time, + self.last_outgoing_internet_activity_time) + now = time.time() + if not len(self.sites): + continue + if last_activity_time > now - threshold: + continue + if self.check_pool.full(): + continue + + log.info("No network activity for %.2fs. Running an update for a random site.", + now - last_activity_time + ) + self.check_pool.spawn(self.updateRandomSite) + # Periodic reloading of tracker files def reloadTrackerFilesThread(self): # TODO: @@ -420,7 +515,9 @@ class FileServer(ConnectionServer): gevent.spawn(self.checkSites) thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) - thread_sites_maintenance = gevent.spawn(self.sitesMaintenanceThread) + thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full") + thread_sites_maintenance_short = gevent.spawn(self.sitesMaintenanceThread, mode="short") + thread_keep_alive = gevent.spawn(self.keepAliveThread) thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) ConnectionServer.listen(self) diff --git a/src/Site/Site.py b/src/Site/Site.py index b01e9aae..13ee730b 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -1177,6 +1177,7 @@ class Site(object): self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % ( len(connected_peers), need_to_close, closed)) + @util.Noparallel(queue=True) def runPeriodicMaintenance(self, startup=False, force=False): if not self.isServing(): return False