Redesign the scheduling of site checking and verification

Save `check_files_timestamp` and `verify_files_timestamp` in sites.json and run checks only when the time interval is expired.
This commit is contained in:
Vadim Ushakov 2021-10-21 13:19:10 +07:00
parent 5ec970adb8
commit 5744e40505
2 changed files with 124 additions and 86 deletions

View file

@ -303,12 +303,12 @@ class FileServer(ConnectionServer):
log.debug("Checking randomly chosen site: %s", site.address_short)
self.updateSite(site, force=force)
self.updateSite(site)
def updateSite(self, site, check_files=False, force=False, dry_run=False):
def updateSite(self, site, check_files=False, verify_files=False):
if not site:
return False
return site.considerUpdate(check_files=check_files, force=force, dry_run=dry_run)
return site.update2(check_files=check_files, verify_files=verify_files)
def invalidateUpdateTime(self, invalid_interval):
for address in self.getSiteAddresses():
@ -316,24 +316,30 @@ class FileServer(ConnectionServer):
if site:
site.invalidateUpdateTime(invalid_interval)
def updateSites(self, check_files=False):
self.recheckPort()
def isSiteUpdateTimeValid(self, site_address):
site = self.getSite(site_address)
if not site:
return False
return site.isUpdateTimeValid()
def updateSites(self):
task_nr = self.update_sites_task_next_nr
self.update_sites_task_next_nr += 1
task_description = "updateSites [#%d, check_files=%s]" % (task_nr, check_files)
task_description = "updateSites [#%d]" % task_nr
log.info("%s: started", task_description)
# Don't wait port opening on first startup. Do the instant check now.
if len(self.getSites()) <= 2:
for address, site in list(self.getSites().items()):
self.updateSite(site, check_files=check_files)
self.updateSite(site, check_files=True)
self.recheckPort()
all_site_addresses = self.getSiteAddresses()
site_addresses = [
address for address in all_site_addresses
if self.updateSite(self.getSite(address), check_files=check_files, dry_run=True)
if not self.isSiteUpdateTimeValid(address)
]
log.info("%s: chosen %d sites (of %d)", task_description, len(site_addresses), len(all_site_addresses))
@ -346,33 +352,21 @@ class FileServer(ConnectionServer):
# Check sites integrity
for site_address in site_addresses:
if check_files:
self.sleep(10)
else:
site = None
self.sleep(1)
self.waitForInternetOnline()
if self.stopping:
break
site = self.getSite(site_address)
if not self.updateSite(site, check_files=check_files, dry_run=True):
if not site or site.isUpdateTimeValid():
sites_skipped += 1
continue
sites_processed += 1
while self.running:
self.waitForInternetOnline()
if self.stopping:
break
thread = self.update_pool.spawn(self.updateSite, site, check_files=check_files)
if check_files:
# Limit the concurency
# ZeroNet may be laggy when running from HDD.
thread.join(timeout=60)
if not self.waitForInternetOnline():
break
thread = self.update_pool.spawn(self.updateSite, site)
if self.stopping:
break
@ -398,6 +392,60 @@ class FileServer(ConnectionServer):
else:
log.info("%s: finished in %.2fs", task_description, time.time() - start_time)
def peekSiteForVerification(self):
check_files_interval = 60 * 60 * 24
verify_files_interval = 60 * 60 * 24 * 10
site_addresses = self.getSiteAddresses()
random.shuffle(site_addresses)
for site_address in site_addresses:
site = self.getSite(site_address)
if not site:
continue
mode = site.isFileVerificationExpired(check_files_interval, verify_files_interval)
if mode:
return (site_address, mode)
return (None, None)
def sitesVerificationThread(self):
log.info("sitesVerificationThread started")
short_timeout = 10
long_timeout = 60
self.sleep(long_timeout)
while self.running:
site = None
self.sleep(short_timeout)
if self.stopping:
break
site_address, mode = self.peekSiteForVerification()
if not site_address:
self.sleep(long_timeout)
continue
site = self.getSite(site_address)
if not site:
continue
if mode == "verify":
check_files = False
verify_files = True
elif mode == "check":
check_files = True
verify_files = False
else:
continue
log.info("running <%s> for %s" % (mode, site.address_short))
thread = self.update_pool.spawn(self.updateSite, site,
check_files=check_files, verify_files=verify_files)
log.info("sitesVerificationThread stopped")
def sitesMaintenanceThread(self, mode="full"):
log.info("sitesMaintenanceThread(%s) started" % mode)
@ -603,14 +651,13 @@ class FileServer(ConnectionServer):
if not passive_mode:
thread_keep_alive = self.spawn(self.keepAliveThread)
thread_wakeup_watcher = self.spawn(self.wakeupWatcherThread)
thread_sites_verification = self.spawn(self.sitesVerificationThread)
thread_reload_tracker_files = self.spawn(self.reloadTrackerFilesThread)
thread_sites_maintenance_full = self.spawn(self.sitesMaintenanceThread, mode="full")
thread_sites_maintenance_short = self.spawn(self.sitesMaintenanceThread, mode="short")
self.sleep(0.1)
self.spawn(self.updateSites)
self.sleep(0.1)
self.spawn(self.updateSites, check_files=True)
ConnectionServer.listen(self)

View file

@ -183,7 +183,6 @@ class Site(object):
self.worker_manager = WorkerManager(self) # Handle site download from other peers
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
self.content_updated = None # Content.js update time
self.last_check_files_time = 0
self.last_online_update = 0
self.startup_announce_done = 0
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
@ -227,6 +226,10 @@ class Site(object):
settings = json.load(open("%s/sites.json" % config.data_dir)).get(self.address)
if settings:
self.settings = settings
if "check_files_timestamp" not in settings:
settings["check_files_timestamp"] = 0
if "verify_files_timestamp" not in settings:
settings["verify_files_timestamp"] = 0
if "cache" not in settings:
settings["cache"] = {}
if "size_files_optional" not in settings:
@ -242,8 +245,17 @@ class Site(object):
self.bad_files[inner_path] = min(self.bad_files[inner_path], 20)
else:
self.settings = {
"own": False, "serving": True, "permissions": [], "cache": {"bad_files": {}}, "size_files_optional": 0,
"added": int(time.time()), "downloaded": None, "optional_downloaded": 0, "size_optional": 0
"check_files_timestamp": 0,
"verify_files_timestamp": 0,
"own": False,
"serving": True,
"permissions": [],
"cache": {"bad_files": {}},
"size_files_optional": 0,
"added": int(time.time()),
"downloaded": None,
"optional_downloaded": 0,
"size_optional": 0
} # Default
if config.download_optional == "auto":
self.settings["autodownloadoptional"] = True
@ -281,6 +293,29 @@ class Site(object):
def getSizeLimit(self):
return self.settings.get("size_limit", int(config.size_limit))
def isFileVerificationExpired(self, check_files_interval, verify_files_interval):
now = time.time()
check_files_timestamp = self.settings.get("check_files_timestamp", 0)
verify_files_timestamp = self.settings.get("verify_files_timestamp", 0)
if check_files_interval is None:
check_files_expiration = now + 1
else:
check_files_expiration = check_files_timestamp + check_files_interval
if verify_files_interval is None:
verify_files_expiration = now + 1
else:
verify_files_expiration = verify_files_timestamp + verify_files_interval
if verify_files_expiration < now:
return "verify"
if check_files_expiration < now:
return "check"
return False
# Next size limit based on current size
def getNextSizeLimit(self):
size_limits = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000]
@ -690,9 +725,10 @@ class Site(object):
# Return: None
@util.Noparallel()
def update(self, announce=False, check_files=False, verify_files=False, since=None):
online = self.connection_server.isInternetOnline()
self.content_manager.loadContent("content.json", load_includes=False) # Reload content.json
self.content_updated = None # Reset content updated time
if verify_files:
check_files = True
@ -704,8 +740,11 @@ class Site(object):
if verify_files:
self.storage.updateBadFiles(quick_check=False)
self.settings["check_files_timestamp"] = time.time()
self.settings["verify_files_timestamp"] = time.time()
elif check_files:
self.storage.updateBadFiles(quick_check=True) # Quick check and mark bad files based on file size
self.settings["check_files_timestamp"] = time.time()
if not self.isServing():
self.updateWebsocket(updated=True)
@ -738,66 +777,18 @@ class Site(object):
self.sendMyHashfield()
self.updateHashfield()
self.refreshUpdateTime(valid=self.connection_server.isInternetOnline())
self.updateWebsocket(updated=True)
@util.Noparallel(queue=True, ignore_args=True)
def _considerUpdate_realJob(self, check_files=False, force=False):
if not self._considerUpdate_check(check_files=check_files, force=force, log_reason=True):
return False
online = self.connection_server.isInternetOnline()
# TODO: there should be a configuration options controlling:
# * whether to check files on the program startup
# * whether to check files during the run time and how often
check_files = check_files and (self.last_check_files_time == 0)
self.last_check_files_time = time.time()
if len(self.peers) < 50:
self.announce(mode="update")
online = online and self.connection_server.isInternetOnline()
self.update(check_files=check_files)
online = online and self.connection_server.isInternetOnline()
self.refreshUpdateTime(valid=online)
return True
self.updateWebsocket(updated=True)
def _considerUpdate_check(self, check_files=False, force=False, log_reason=False):
if not self.isServing():
return False
# To be called from FileServer
@util.Noparallel(queue=True, ignore_args=True)
def update2(self, check_files=False, verify_files=False):
if len(self.peers) < 50:
self.announce(mode="update")
online = self.connection_server.isInternetOnline()
run_update = False
msg = None
if force:
run_update = True
msg = "forcing site update"
elif not online:
run_update = True
msg = "network connection seems broken, trying to update a site to check if the network is up"
elif check_files:
run_update = True
msg = "checking site's files..."
elif not self.isUpdateTimeValid():
run_update = True
msg = "update time is not valid, updating now..."
if run_update and log_reason:
self.log.debug(msg)
return run_update
def considerUpdate(self, check_files=False, force=False, dry_run=False):
run_update = self._considerUpdate_check(check_files=check_files, force=force)
if run_update and not dry_run:
run_update = self._considerUpdate_realJob(check_files=check_files, force=force)
return run_update
self.update(check_files=check_files, verify_files=verify_files)
# Update site by redownload all content.json
def redownloadContents(self):