From 5744e40505c5ab6daad597da5aadaf85dedb04c6 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Thu, 21 Oct 2021 13:19:10 +0700 Subject: [PATCH] Redesign the scheduling of site checking and verification Save `check_files_timestamp` and `verify_files_timestamp` in sites.json and run checks only when the time interval is expired. --- src/File/FileServer.py | 101 ++++++++++++++++++++++++++++---------- src/Site/Site.py | 109 +++++++++++++++++++---------------------- 2 files changed, 124 insertions(+), 86 deletions(-) diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 73c86bd4..a0f16b97 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -303,12 +303,12 @@ class FileServer(ConnectionServer): log.debug("Checking randomly chosen site: %s", site.address_short) - self.updateSite(site, force=force) + self.updateSite(site) - def updateSite(self, site, check_files=False, force=False, dry_run=False): + def updateSite(self, site, check_files=False, verify_files=False): if not site: return False - return site.considerUpdate(check_files=check_files, force=force, dry_run=dry_run) + return site.update2(check_files=check_files, verify_files=verify_files) def invalidateUpdateTime(self, invalid_interval): for address in self.getSiteAddresses(): @@ -316,24 +316,30 @@ class FileServer(ConnectionServer): if site: site.invalidateUpdateTime(invalid_interval) - def updateSites(self, check_files=False): - self.recheckPort() + def isSiteUpdateTimeValid(self, site_address): + site = self.getSite(site_address) + if not site: + return False + return site.isUpdateTimeValid() + def updateSites(self): task_nr = self.update_sites_task_next_nr self.update_sites_task_next_nr += 1 - task_description = "updateSites [#%d, check_files=%s]" % (task_nr, check_files) + task_description = "updateSites [#%d]" % task_nr log.info("%s: started", task_description) # Don't wait port opening on first startup. Do the instant check now. if len(self.getSites()) <= 2: for address, site in list(self.getSites().items()): - self.updateSite(site, check_files=check_files) + self.updateSite(site, check_files=True) + + self.recheckPort() all_site_addresses = self.getSiteAddresses() site_addresses = [ address for address in all_site_addresses - if self.updateSite(self.getSite(address), check_files=check_files, dry_run=True) + if not self.isSiteUpdateTimeValid(address) ] log.info("%s: chosen %d sites (of %d)", task_description, len(site_addresses), len(all_site_addresses)) @@ -346,33 +352,21 @@ class FileServer(ConnectionServer): # Check sites integrity for site_address in site_addresses: - if check_files: - self.sleep(10) - else: - self.sleep(1) + site = None + self.sleep(1) + self.waitForInternetOnline() if self.stopping: break site = self.getSite(site_address) - if not self.updateSite(site, check_files=check_files, dry_run=True): + if not site or site.isUpdateTimeValid(): sites_skipped += 1 continue sites_processed += 1 - while self.running: - self.waitForInternetOnline() - if self.stopping: - break - - thread = self.update_pool.spawn(self.updateSite, site, check_files=check_files) - if check_files: - # Limit the concurency - # ZeroNet may be laggy when running from HDD. - thread.join(timeout=60) - if not self.waitForInternetOnline(): - break + thread = self.update_pool.spawn(self.updateSite, site) if self.stopping: break @@ -398,6 +392,60 @@ class FileServer(ConnectionServer): else: log.info("%s: finished in %.2fs", task_description, time.time() - start_time) + def peekSiteForVerification(self): + check_files_interval = 60 * 60 * 24 + verify_files_interval = 60 * 60 * 24 * 10 + site_addresses = self.getSiteAddresses() + random.shuffle(site_addresses) + for site_address in site_addresses: + site = self.getSite(site_address) + if not site: + continue + mode = site.isFileVerificationExpired(check_files_interval, verify_files_interval) + if mode: + return (site_address, mode) + return (None, None) + + + def sitesVerificationThread(self): + log.info("sitesVerificationThread started") + short_timeout = 10 + long_timeout = 60 + + self.sleep(long_timeout) + + while self.running: + site = None + self.sleep(short_timeout) + + if self.stopping: + break + + site_address, mode = self.peekSiteForVerification() + if not site_address: + self.sleep(long_timeout) + continue + + site = self.getSite(site_address) + if not site: + continue + + if mode == "verify": + check_files = False + verify_files = True + elif mode == "check": + check_files = True + verify_files = False + else: + continue + + log.info("running <%s> for %s" % (mode, site.address_short)) + + thread = self.update_pool.spawn(self.updateSite, site, + check_files=check_files, verify_files=verify_files) + + log.info("sitesVerificationThread stopped") + def sitesMaintenanceThread(self, mode="full"): log.info("sitesMaintenanceThread(%s) started" % mode) @@ -603,14 +651,13 @@ class FileServer(ConnectionServer): if not passive_mode: thread_keep_alive = self.spawn(self.keepAliveThread) thread_wakeup_watcher = self.spawn(self.wakeupWatcherThread) + thread_sites_verification = self.spawn(self.sitesVerificationThread) thread_reload_tracker_files = self.spawn(self.reloadTrackerFilesThread) thread_sites_maintenance_full = self.spawn(self.sitesMaintenanceThread, mode="full") thread_sites_maintenance_short = self.spawn(self.sitesMaintenanceThread, mode="short") self.sleep(0.1) self.spawn(self.updateSites) - self.sleep(0.1) - self.spawn(self.updateSites, check_files=True) ConnectionServer.listen(self) diff --git a/src/Site/Site.py b/src/Site/Site.py index 9e333751..aa2bc66b 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -183,7 +183,6 @@ class Site(object): self.worker_manager = WorkerManager(self) # Handle site download from other peers self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) self.content_updated = None # Content.js update time - self.last_check_files_time = 0 self.last_online_update = 0 self.startup_announce_done = 0 self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] @@ -227,6 +226,10 @@ class Site(object): settings = json.load(open("%s/sites.json" % config.data_dir)).get(self.address) if settings: self.settings = settings + if "check_files_timestamp" not in settings: + settings["check_files_timestamp"] = 0 + if "verify_files_timestamp" not in settings: + settings["verify_files_timestamp"] = 0 if "cache" not in settings: settings["cache"] = {} if "size_files_optional" not in settings: @@ -242,8 +245,17 @@ class Site(object): self.bad_files[inner_path] = min(self.bad_files[inner_path], 20) else: self.settings = { - "own": False, "serving": True, "permissions": [], "cache": {"bad_files": {}}, "size_files_optional": 0, - "added": int(time.time()), "downloaded": None, "optional_downloaded": 0, "size_optional": 0 + "check_files_timestamp": 0, + "verify_files_timestamp": 0, + "own": False, + "serving": True, + "permissions": [], + "cache": {"bad_files": {}}, + "size_files_optional": 0, + "added": int(time.time()), + "downloaded": None, + "optional_downloaded": 0, + "size_optional": 0 } # Default if config.download_optional == "auto": self.settings["autodownloadoptional"] = True @@ -281,6 +293,29 @@ class Site(object): def getSizeLimit(self): return self.settings.get("size_limit", int(config.size_limit)) + def isFileVerificationExpired(self, check_files_interval, verify_files_interval): + now = time.time() + check_files_timestamp = self.settings.get("check_files_timestamp", 0) + verify_files_timestamp = self.settings.get("verify_files_timestamp", 0) + + if check_files_interval is None: + check_files_expiration = now + 1 + else: + check_files_expiration = check_files_timestamp + check_files_interval + + if verify_files_interval is None: + verify_files_expiration = now + 1 + else: + verify_files_expiration = verify_files_timestamp + verify_files_interval + + if verify_files_expiration < now: + return "verify" + + if check_files_expiration < now: + return "check" + + return False + # Next size limit based on current size def getNextSizeLimit(self): size_limits = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000] @@ -690,9 +725,10 @@ class Site(object): # Return: None @util.Noparallel() def update(self, announce=False, check_files=False, verify_files=False, since=None): + online = self.connection_server.isInternetOnline() + self.content_manager.loadContent("content.json", load_includes=False) # Reload content.json self.content_updated = None # Reset content updated time - if verify_files: check_files = True @@ -704,8 +740,11 @@ class Site(object): if verify_files: self.storage.updateBadFiles(quick_check=False) + self.settings["check_files_timestamp"] = time.time() + self.settings["verify_files_timestamp"] = time.time() elif check_files: self.storage.updateBadFiles(quick_check=True) # Quick check and mark bad files based on file size + self.settings["check_files_timestamp"] = time.time() if not self.isServing(): self.updateWebsocket(updated=True) @@ -738,66 +777,18 @@ class Site(object): self.sendMyHashfield() self.updateHashfield() - self.refreshUpdateTime(valid=self.connection_server.isInternetOnline()) - - self.updateWebsocket(updated=True) - - @util.Noparallel(queue=True, ignore_args=True) - def _considerUpdate_realJob(self, check_files=False, force=False): - if not self._considerUpdate_check(check_files=check_files, force=force, log_reason=True): - return False - - online = self.connection_server.isInternetOnline() - - # TODO: there should be a configuration options controlling: - # * whether to check files on the program startup - # * whether to check files during the run time and how often - check_files = check_files and (self.last_check_files_time == 0) - self.last_check_files_time = time.time() - - if len(self.peers) < 50: - self.announce(mode="update") - online = online and self.connection_server.isInternetOnline() - - self.update(check_files=check_files) - online = online and self.connection_server.isInternetOnline() self.refreshUpdateTime(valid=online) - return True + self.updateWebsocket(updated=True) - def _considerUpdate_check(self, check_files=False, force=False, log_reason=False): - if not self.isServing(): - return False + # To be called from FileServer + @util.Noparallel(queue=True, ignore_args=True) + def update2(self, check_files=False, verify_files=False): + if len(self.peers) < 50: + self.announce(mode="update") - online = self.connection_server.isInternetOnline() - - run_update = False - msg = None - - if force: - run_update = True - msg = "forcing site update" - elif not online: - run_update = True - msg = "network connection seems broken, trying to update a site to check if the network is up" - elif check_files: - run_update = True - msg = "checking site's files..." - elif not self.isUpdateTimeValid(): - run_update = True - msg = "update time is not valid, updating now..." - - if run_update and log_reason: - self.log.debug(msg) - - return run_update - - def considerUpdate(self, check_files=False, force=False, dry_run=False): - run_update = self._considerUpdate_check(check_files=check_files, force=force) - if run_update and not dry_run: - run_update = self._considerUpdate_realJob(check_files=check_files, force=force) - return run_update + self.update(check_files=check_files, verify_files=verify_files) # Update site by redownload all content.json def redownloadContents(self):