From e8358ee8f2df8e1b66b9a823a6ea6b28859300bc Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Sat, 31 Oct 2020 03:59:54 +0700 Subject: [PATCH] More fixes on the way to reliable site updates. --- src/Connection/ConnectionServer.py | 3 ++ src/File/FileServer.py | 70 +++++++++++------------------- src/Site/Site.py | 56 +++++++++++++++++++++--- 3 files changed, 79 insertions(+), 50 deletions(-) diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 66b50608..2b4586e9 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -386,6 +386,9 @@ class ConnectionServer(object): else: gevent.spawn(self.onInternetOffline) + def isInternetOnline(self): + return self.has_internet + def onInternetOnline(self): self.log.info("Internet online") diff --git a/src/File/FileServer.py b/src/File/FileServer.py index c425b3f4..de14a9c9 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -33,8 +33,8 @@ class FileServer(ConnectionServer): # self.log = logging.getLogger("FileServer") # The value of self.log will be overwritten in ConnectionServer.__init__() - self.check_pool = gevent.pool.Pool(5) - self.check_start_time = 0 + self.update_pool = gevent.pool.Pool(5) + self.update_start_time = 0 self.supported_ip_types = ["ipv4"] # Outgoing ip_type support if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported(): @@ -153,13 +153,12 @@ class FileServer(ConnectionServer): log.debug("FileRequest: %s %s" % (str(connection), message["cmd"])) req = FileRequest(self, connection) req.route(message["cmd"], message.get("req_id"), message.get("params")) - if not self.has_internet and not connection.is_private_ip: - self.has_internet = True - self.onInternetOnline() + if not connection.is_private_ip: + self.setInternetStatus(self, True) def onInternetOnline(self): log.info("Internet online") - gevent.spawn(self.checkSites, check_files=False, force_port_check=True) + gevent.spawn(self.updateSites, force_port_check=True) # Reload the FileRequest class to prevent restarts in debug mode def reload(self): @@ -241,19 +240,19 @@ class FileServer(ConnectionServer): # Returns False if Internet is immediately available # Returns True if we've spent some time waiting for Internet def waitForInternetOnline(self): - if self.has_internet: + if self.isInternetOnline(): return False - while not self.has_internet: + while not self.isInternetOnline(): time.sleep(15) - if self.has_internet: + if self.isInternetOnline(): break - if not self.check_pool.full(): - self.check_pool.spawn(self.updateRandomSite) + if not self.update_pool.full(): + self.update_pool.spawn(self.updateRandomSite) return True - def updateRandomSite(self, site_addresses=None, check_files=False): + def updateRandomSite(self, site_addresses=None): if not site_addresses: site_addresses = self.getSiteAddresses() @@ -269,39 +268,22 @@ class FileServer(ConnectionServer): log.debug("Checking randomly chosen site: %s", site.address_short) - self.checkSite(site, check_files=check_files) + self.updateSite(site) - # Check site file integrity - def checkSite(self, site, check_files=False): + def updateSite(self, site): if not site or not site.isServing(): return + site.considerUpdate() - quick_start = len(site.peers) >= 50 - - if quick_start: - log.debug("Checking site: %s (quick start)", site.address_short) - else: - log.debug("Checking site: %s", site.address_short) - - if quick_start: - site.setDelayedStartupAnnounce() - else: - site.announce(mode="startup") - - site.update(check_files=check_files) # Update site's content.json and download changed files - site.sendMyHashfield() - site.updateHashfield() - - # Check sites integrity @util.Noparallel() - def checkSites(self, check_files=False, force_port_check=False): - log.info("Checking sites: check_files=%s, force_port_check=%s", check_files, force_port_check) + def updateSites(self, force_port_check=False): + log.info("Checking sites: force_port_check=%s", force_port_check) # Don't wait port opening on first startup. Do the instant check now. if len(self.sites) <= 2: sites_checking = True for address, site in list(self.sites.items()): - gevent.spawn(self.checkSite, site, check_files) + gevent.spawn(self.updateSite, site) # Test and open port if not tested yet if not self.port_opened or force_port_check: @@ -313,7 +295,7 @@ class FileServer(ConnectionServer): sites_processed = 0 start_time = time.time() - self.check_start_time = start_time + self.update_start_time = start_time progress_print_time = time.time() # Check sites integrity @@ -327,7 +309,7 @@ class FileServer(ConnectionServer): while 1: self.waitForInternetOnline() - self.check_pool.spawn(self.checkSite, site, check_files) + self.update_pool.spawn(self.updateSite, site) if not self.waitForInternetOnline(): break @@ -345,8 +327,8 @@ class FileServer(ConnectionServer): time_left ) - if (self.check_start_time != start_time) and self.check_pool.full(): - # Another check is running, throttling... + if (self.update_start_time != start_time) and self.update_pool.full(): + # Another updateSites() is running, throttling now... time.sleep(5) else: time.sleep(1) @@ -370,7 +352,7 @@ class FileServer(ConnectionServer): log.debug( "Starting <%s> maintenance cycle: connections=%s, internet=%s", mode, - len(self.connections), self.has_internet + len(self.connections), self.isInternetOnline() ) start_time = time.time() @@ -445,13 +427,13 @@ class FileServer(ConnectionServer): continue if last_activity_time > now - threshold: continue - if self.check_pool.full(): + if self.update_pool.full(): continue log.info("No network activity for %.2fs. Running an update for a random site.", now - last_activity_time ) - self.check_pool.spawn(self.updateRandomSite) + self.update_pool.spawn(self.updateRandomSite) # Periodic reloading of tracker files def reloadTrackerFilesThread(self): @@ -484,7 +466,7 @@ class FileServer(ConnectionServer): log.info("IP change detected from %s to %s" % (last_my_ips, my_ips)) if is_time_changed or is_ip_changed: - self.checkSites(check_files=False, force_port_check=True) + self.updateSites(force_port_check=True) last_time = time.time() last_my_ips = my_ips @@ -512,7 +494,7 @@ class FileServer(ConnectionServer): DebugReloader.watcher.addCallback(self.reload) if check_sites: # Open port, Update sites, Check files integrity - gevent.spawn(self.checkSites) + gevent.spawn(self.updateSites) thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full") diff --git a/src/Site/Site.py b/src/Site/Site.py index 13ee730b..6d72e119 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -76,7 +76,7 @@ class Site(object): ScaledTimeoutHandler(60 * 30, 60 * 2, handler=self.periodicMaintenanceHandler_announce, scaler=self.getAnnounceRating), - ScaledTimeoutHandler(60 * 20, 60 * 10, + ScaledTimeoutHandler(60 * 30, 60 * 5, handler=self.periodicMaintenanceHandler_general, scaler=self.getActivityRating) ] @@ -91,6 +91,8 @@ class Site(object): self.worker_manager = WorkerManager(self) # Handle site download from other peers self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) self.content_updated = None # Content.js update time + self.last_check_files_time = 0 + self.last_online_update = 0 self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] self.page_requested = False # Page viewed in browser self.websockets = [] # Active site websocket connections @@ -564,6 +566,44 @@ class Site(object): self.updateWebsocket(updated=True) + def considerUpdate(self): + if not self.isServing(): + return + + online = self.connection_server.isInternetOnline() + + if online and time.time() - self.last_online_update < 60 * 10: + with gevent.Timeout(10, exception=False): + self.announcer.announcePex() + return + + # TODO: there should be a configuration options controlling: + # * whether to check files on the program startup + # * whether to check files during the run time and how often + check_files = self.last_check_files_time == 0 + + self.last_check_files_time = time.time() + + # quick start, avoiding redundant announces + if len(self.peers) >= 50: + if len(self.getConnectedPeers()) > 4: + pass # Don't run announce() at all + else: + self.setDelayedStartupAnnounce() + else: + self.announce(mode="startup") + + online = online and self.connection_server.isInternetOnline() + + self.update(check_files=check_files) + self.sendMyHashfield() + self.updateHashfield() + + online = online and self.connection_server.isInternetOnline() + + if online: + self.last_online_update = time.time() + # Update site by redownload all content.json def redownloadContents(self): # Download all content.json again @@ -927,6 +967,14 @@ class Site(object): if (not force_safe) and self.settings["own"]: rating = min(rating, 0.6) + if self.content_updated is False: # Last check modification failed + rating += 0.1 + elif self.bad_files: + rating += 0.1 + + if rating > 1.0: + rating = 1.0 + return rating def getAnnounceRating(self): @@ -1210,14 +1258,10 @@ class Site(object): with gevent.Timeout(10, exception=False): self.announcer.announcePex() + self.update() self.sendMyHashfield(3) self.updateHashfield(3) - if self.content_updated is False: # Last check modification failed - self.update() - elif self.bad_files: - self.retryBadFiles() - return True def periodicMaintenanceHandler_announce(self, startup=False, force=False):