diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 2b4586e9..3ec0932d 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -43,6 +43,8 @@ class ConnectionServer(object): self.ips = {} # Connection by ip self.has_internet = True # Internet outage detection + self.internet_online_since = 0 + self.internet_offline_since = 0 self.last_outgoing_internet_activity_time = 0 # Last time the application tried to send any data self.last_successful_internet_activity_time = 0 # Last time the application successfully sent or received any data self.internet_outage_threshold = 60 * 2 @@ -382,8 +384,10 @@ class ConnectionServer(object): self.has_internet = status if self.has_internet: + self.internet_online_since = time.time() gevent.spawn(self.onInternetOnline) else: + self.internet_offline_since = time.time() gevent.spawn(self.onInternetOffline) def isInternetOnline(self): diff --git a/src/File/FileServer.py b/src/File/FileServer.py index de14a9c9..66c8c135 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -33,8 +33,11 @@ class FileServer(ConnectionServer): # self.log = logging.getLogger("FileServer") # The value of self.log will be overwritten in ConnectionServer.__init__() + self.recheck_port = True + self.update_pool = gevent.pool.Pool(5) self.update_start_time = 0 + self.update_sites_task_next_nr = 1 self.supported_ip_types = ["ipv4"] # Outgoing ip_type support if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported(): @@ -158,7 +161,13 @@ class FileServer(ConnectionServer): def onInternetOnline(self): log.info("Internet online") - gevent.spawn(self.updateSites, force_port_check=True) + invalid_interval=( + self.internet_offline_since - self.internet_outage_threshold - random.randint(60 * 5, 60 * 10), + time.time() + ) + self.invalidateUpdateTime(invalid_interval) + self.recheck_port = True + gevent.spawn(self.updateSites) # Reload the FileRequest class to prevent restarts in debug mode def reload(self): @@ -237,6 +246,17 @@ class FileServer(ConnectionServer): return res + @util.Noparallel(queue=True) + def recheckPort(self): + if not self.recheck_port: + return + + if not self.port_opened or self.recheck_port: + self.portCheck() + if not self.port_opened["ipv4"]: + self.tor_manager.startOnions() + self.recheck_port = False + # Returns False if Internet is immediately available # Returns True if we've spent some time waiting for Internet def waitForInternetOnline(self): @@ -250,6 +270,7 @@ class FileServer(ConnectionServer): if not self.update_pool.full(): self.update_pool.spawn(self.updateRandomSite) + self.recheckPort() return True def updateRandomSite(self, site_addresses=None): @@ -270,46 +291,68 @@ class FileServer(ConnectionServer): self.updateSite(site) - def updateSite(self, site): + def updateSite(self, site, check_files=False, dry_run=False): if not site or not site.isServing(): - return - site.considerUpdate() + return False + return site.considerUpdate(check_files=check_files, dry_run=dry_run) - @util.Noparallel() - def updateSites(self, force_port_check=False): - log.info("Checking sites: force_port_check=%s", force_port_check) + def getSite(self, address): + return self.sites.get(address, None) + + def invalidateUpdateTime(self, invalid_interval): + for address in self.getSiteAddresses(): + site = self.getSite(address) + if site: + site.invalidateUpdateTime(invalid_interval) + + def updateSites(self, check_files=False): + task_nr = self.update_sites_task_next_nr + self.update_sites_task_next_nr += 1 + + task_description = "updateSites: #%d, check_files=%s" % (task_nr, check_files) + log.info("%s: started", task_description) # Don't wait port opening on first startup. Do the instant check now. if len(self.sites) <= 2: - sites_checking = True for address, site in list(self.sites.items()): - gevent.spawn(self.updateSite, site) + self.updateSite(site, check_files=check_files) - # Test and open port if not tested yet - if not self.port_opened or force_port_check: - self.portCheck() - if not self.port_opened["ipv4"]: - self.tor_manager.startOnions() + all_site_addresses = self.getSiteAddresses() + site_addresses = [ + address for address in all_site_addresses + if self.updateSite(self.getSite(address), check_files=check_files, dry_run=True) + ] - site_addresses = self.getSiteAddresses() + log.info("%s: chosen %d sites (of %d)", task_description, len(site_addresses), len(all_site_addresses)) sites_processed = 0 + sites_skipped = 0 start_time = time.time() self.update_start_time = start_time progress_print_time = time.time() # Check sites integrity for site_address in site_addresses: - site = self.sites.get(site_address, None) + if check_files: + time.sleep(10) + else: + time.sleep(1) + + site = self.getSite(site_address) + if not self.updateSite(site, check_files=check_files, dry_run=True): + sites_skipped += 1 + continue sites_processed += 1 - if (not site) or (not site.isServing()): - continue - while 1: self.waitForInternetOnline() - self.update_pool.spawn(self.updateSite, site) + thread = self.update_pool.spawn(self.updateSite, + site, check_files=check_files) + if check_files: + # Limit the concurency + # ZeroNet may be laggy when running from HDD. + thread.join(timeout=60) if not self.waitForInternetOnline(): break @@ -319,21 +362,17 @@ class FileServer(ConnectionServer): time_per_site = time_spent / float(sites_processed) sites_left = len(site_addresses) - sites_processed time_left = time_per_site * sites_left - log.info("Checking sites: DONE: %d sites in %.2fs (%.2fs per site); LEFT: %d sites in %.2fs", + log.info("%s: DONE: %d sites in %.2fs (%.2fs per site); SKIPPED: %d sites; LEFT: %d sites in %.2fs", + task_description, sites_processed, time_spent, time_per_site, + sites_skipped, sites_left, time_left ) - if (self.update_start_time != start_time) and self.update_pool.full(): - # Another updateSites() is running, throttling now... - time.sleep(5) - else: - time.sleep(1) - - log.info("Checking sites: finished in %.2fs" % (time.time() - start_time)) + log.info("%s: finished in %.2fs", task_description, time.time() - start_time) def sitesMaintenanceThread(self, mode="full"): startup = True @@ -402,7 +441,7 @@ class FileServer(ConnectionServer): startup = False def keepAliveThread(self): - # This thread is mostly useless on a loaded system, since it never does + # This thread is mostly useless on a system under load, since it never does # any works, if we have active traffic. # # We should initiate some network activity to detect the Internet outage @@ -466,7 +505,13 @@ class FileServer(ConnectionServer): log.info("IP change detected from %s to %s" % (last_my_ips, my_ips)) if is_time_changed or is_ip_changed: - self.updateSites(force_port_check=True) + invalid_interval=( + last_time - self.internet_outage_threshold - random.randint(60 * 5, 60 * 10), + time.time() + ) + self.invalidateUpdateTime(invalid_interval) + self.recheck_port = True + gevent.spawn(self.updateSites) last_time = time.time() last_my_ips = my_ips @@ -494,7 +539,9 @@ class FileServer(ConnectionServer): DebugReloader.watcher.addCallback(self.reload) if check_sites: # Open port, Update sites, Check files integrity - gevent.spawn(self.updateSites) + gevent.spawn(self.updateSites, check_files=True) + + gevent.spawn(self.updateSites) thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full") diff --git a/src/Site/Site.py b/src/Site/Site.py index 6d72e119..14077aae 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -81,8 +81,6 @@ class Site(object): scaler=self.getActivityRating) ] - self.delayed_startup_announce = False - self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers_recent = collections.deque(maxlen=150) @@ -93,6 +91,7 @@ class Site(object): self.content_updated = None # Content.js update time self.last_check_files_time = 0 self.last_online_update = 0 + self.startup_announce_done = 0 self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] self.page_requested = False # Page viewed in browser self.websockets = [] # Active site websocket connections @@ -525,6 +524,28 @@ class Site(object): time.sleep(0.1) return queried + def invalidateUpdateTime(self, invalid_interval): + a, b = invalid_interval + if b is None: + b = time.time() + if a is None: + a = b + if a <= self.last_online_update and self.last_online_update <= b: + self.last_online_update = 0 + self.log.info("Update time invalidated") + + def isUpdateTimeValid(self): + if not self.last_online_update: + return False + expirationThreshold = 60 * 60 * 6 + return self.last_online_update > time.time() - expirationThreshold + + def refreshUpdateTime(self, valid=True): + if valid: + self.last_online_update = time.time() + else: + self.last_online_update = 0 + # Update content.json from peers and download changed files # Return: None @util.Noparallel() @@ -564,45 +585,56 @@ class Site(object): else: self.content_updated = time.time() + self.sendMyHashfield() + self.updateHashfield() + + self.refreshUpdateTime(valid=self.connection_server.isInternetOnline()) + self.updateWebsocket(updated=True) - def considerUpdate(self): + @util.Noparallel(queue=True, ignore_args=True) + def considerUpdate(self, check_files=False, dry_run=False): if not self.isServing(): - return + return False online = self.connection_server.isInternetOnline() - if online and time.time() - self.last_online_update < 60 * 10: - with gevent.Timeout(10, exception=False): - self.announcer.announcePex() - return + run_update = False + msg = None + + if not online: + run_update = True + msg = "network connection seems broken, trying to update the site to check if the network is up" + elif check_files: + run_update = True + msg = "checking site's files..." + elif not self.isUpdateTimeValid(): + run_update = True + msg = "update time is not invalid, updating now..." + + if not run_update: + return False + + if dry_run: + return True + + self.log.debug(msg) # TODO: there should be a configuration options controlling: # * whether to check files on the program startup # * whether to check files during the run time and how often - check_files = self.last_check_files_time == 0 - + check_files = check_files and (self.last_check_files_time == 0) self.last_check_files_time = time.time() - # quick start, avoiding redundant announces - if len(self.peers) >= 50: - if len(self.getConnectedPeers()) > 4: - pass # Don't run announce() at all - else: - self.setDelayedStartupAnnounce() - else: - self.announce(mode="startup") - - online = online and self.connection_server.isInternetOnline() + if len(self.peers) < 50: + self.announce(mode="update") self.update(check_files=check_files) - self.sendMyHashfield() - self.updateHashfield() online = online and self.connection_server.isInternetOnline() + self.refreshUpdateTime(valid=online) - if online: - self.last_online_update = time.time() + return True # Update site by redownload all content.json def redownloadContents(self): @@ -932,16 +964,6 @@ class Site(object): peer.found(source) return peer - def setDelayedStartupAnnounce(self): - self.delayed_startup_announce = True - - def applyDelayedStartupAnnounce(self): - if self.delayed_startup_announce: - self.delayed_startup_announce = False - self.announce(mode="startup") - return True - return False - def announce(self, *args, **kwargs): if self.isServing(): self.announcer.announce(*args, **kwargs) @@ -1243,8 +1265,6 @@ class Site(object): if not self.isServing(): return False - self.applyDelayedStartupAnnounce() - if not self.peers: return False @@ -1259,8 +1279,6 @@ class Site(object): self.announcer.announcePex() self.update() - self.sendMyHashfield(3) - self.updateHashfield(3) return True @@ -1270,10 +1288,11 @@ class Site(object): self.log.debug("periodicMaintenanceHandler_announce: startup=%s, force=%s" % (startup, force)) - if self.applyDelayedStartupAnnounce(): - return True + if startup and len(self.peers) < 10: + self.announce(mode="startup") + else: + self.announce(mode="update", pex=False) - self.announce(mode="update", pex=False) return True # Send hashfield to peers