Add explicit invalidation and expiration of site update timestamps

This commit is contained in:
Vadim Ushakov 2020-10-31 18:05:50 +07:00
parent e8358ee8f2
commit ea21b32b93
3 changed files with 142 additions and 72 deletions

View file

@ -43,6 +43,8 @@ class ConnectionServer(object):
self.ips = {} # Connection by ip self.ips = {} # Connection by ip
self.has_internet = True # Internet outage detection self.has_internet = True # Internet outage detection
self.internet_online_since = 0
self.internet_offline_since = 0
self.last_outgoing_internet_activity_time = 0 # Last time the application tried to send any data self.last_outgoing_internet_activity_time = 0 # Last time the application tried to send any data
self.last_successful_internet_activity_time = 0 # Last time the application successfully sent or received any data self.last_successful_internet_activity_time = 0 # Last time the application successfully sent or received any data
self.internet_outage_threshold = 60 * 2 self.internet_outage_threshold = 60 * 2
@ -382,8 +384,10 @@ class ConnectionServer(object):
self.has_internet = status self.has_internet = status
if self.has_internet: if self.has_internet:
self.internet_online_since = time.time()
gevent.spawn(self.onInternetOnline) gevent.spawn(self.onInternetOnline)
else: else:
self.internet_offline_since = time.time()
gevent.spawn(self.onInternetOffline) gevent.spawn(self.onInternetOffline)
def isInternetOnline(self): def isInternetOnline(self):

View file

@ -33,8 +33,11 @@ class FileServer(ConnectionServer):
# self.log = logging.getLogger("FileServer") # self.log = logging.getLogger("FileServer")
# The value of self.log will be overwritten in ConnectionServer.__init__() # The value of self.log will be overwritten in ConnectionServer.__init__()
self.recheck_port = True
self.update_pool = gevent.pool.Pool(5) self.update_pool = gevent.pool.Pool(5)
self.update_start_time = 0 self.update_start_time = 0
self.update_sites_task_next_nr = 1
self.supported_ip_types = ["ipv4"] # Outgoing ip_type support self.supported_ip_types = ["ipv4"] # Outgoing ip_type support
if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported(): if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported():
@ -158,7 +161,13 @@ class FileServer(ConnectionServer):
def onInternetOnline(self): def onInternetOnline(self):
log.info("Internet online") log.info("Internet online")
gevent.spawn(self.updateSites, force_port_check=True) invalid_interval=(
self.internet_offline_since - self.internet_outage_threshold - random.randint(60 * 5, 60 * 10),
time.time()
)
self.invalidateUpdateTime(invalid_interval)
self.recheck_port = True
gevent.spawn(self.updateSites)
# Reload the FileRequest class to prevent restarts in debug mode # Reload the FileRequest class to prevent restarts in debug mode
def reload(self): def reload(self):
@ -237,6 +246,17 @@ class FileServer(ConnectionServer):
return res return res
@util.Noparallel(queue=True)
def recheckPort(self):
if not self.recheck_port:
return
if not self.port_opened or self.recheck_port:
self.portCheck()
if not self.port_opened["ipv4"]:
self.tor_manager.startOnions()
self.recheck_port = False
# Returns False if Internet is immediately available # Returns False if Internet is immediately available
# Returns True if we've spent some time waiting for Internet # Returns True if we've spent some time waiting for Internet
def waitForInternetOnline(self): def waitForInternetOnline(self):
@ -250,6 +270,7 @@ class FileServer(ConnectionServer):
if not self.update_pool.full(): if not self.update_pool.full():
self.update_pool.spawn(self.updateRandomSite) self.update_pool.spawn(self.updateRandomSite)
self.recheckPort()
return True return True
def updateRandomSite(self, site_addresses=None): def updateRandomSite(self, site_addresses=None):
@ -270,46 +291,68 @@ class FileServer(ConnectionServer):
self.updateSite(site) self.updateSite(site)
def updateSite(self, site): def updateSite(self, site, check_files=False, dry_run=False):
if not site or not site.isServing(): if not site or not site.isServing():
return return False
site.considerUpdate() return site.considerUpdate(check_files=check_files, dry_run=dry_run)
@util.Noparallel() def getSite(self, address):
def updateSites(self, force_port_check=False): return self.sites.get(address, None)
log.info("Checking sites: force_port_check=%s", force_port_check)
def invalidateUpdateTime(self, invalid_interval):
for address in self.getSiteAddresses():
site = self.getSite(address)
if site:
site.invalidateUpdateTime(invalid_interval)
def updateSites(self, check_files=False):
task_nr = self.update_sites_task_next_nr
self.update_sites_task_next_nr += 1
task_description = "updateSites: #%d, check_files=%s" % (task_nr, check_files)
log.info("%s: started", task_description)
# Don't wait port opening on first startup. Do the instant check now. # Don't wait port opening on first startup. Do the instant check now.
if len(self.sites) <= 2: if len(self.sites) <= 2:
sites_checking = True
for address, site in list(self.sites.items()): for address, site in list(self.sites.items()):
gevent.spawn(self.updateSite, site) self.updateSite(site, check_files=check_files)
# Test and open port if not tested yet all_site_addresses = self.getSiteAddresses()
if not self.port_opened or force_port_check: site_addresses = [
self.portCheck() address for address in all_site_addresses
if not self.port_opened["ipv4"]: if self.updateSite(self.getSite(address), check_files=check_files, dry_run=True)
self.tor_manager.startOnions() ]
site_addresses = self.getSiteAddresses() log.info("%s: chosen %d sites (of %d)", task_description, len(site_addresses), len(all_site_addresses))
sites_processed = 0 sites_processed = 0
sites_skipped = 0
start_time = time.time() start_time = time.time()
self.update_start_time = start_time self.update_start_time = start_time
progress_print_time = time.time() progress_print_time = time.time()
# Check sites integrity # Check sites integrity
for site_address in site_addresses: for site_address in site_addresses:
site = self.sites.get(site_address, None) if check_files:
time.sleep(10)
else:
time.sleep(1)
site = self.getSite(site_address)
if not self.updateSite(site, check_files=check_files, dry_run=True):
sites_skipped += 1
continue
sites_processed += 1 sites_processed += 1
if (not site) or (not site.isServing()):
continue
while 1: while 1:
self.waitForInternetOnline() self.waitForInternetOnline()
self.update_pool.spawn(self.updateSite, site) thread = self.update_pool.spawn(self.updateSite,
site, check_files=check_files)
if check_files:
# Limit the concurency
# ZeroNet may be laggy when running from HDD.
thread.join(timeout=60)
if not self.waitForInternetOnline(): if not self.waitForInternetOnline():
break break
@ -319,21 +362,17 @@ class FileServer(ConnectionServer):
time_per_site = time_spent / float(sites_processed) time_per_site = time_spent / float(sites_processed)
sites_left = len(site_addresses) - sites_processed sites_left = len(site_addresses) - sites_processed
time_left = time_per_site * sites_left time_left = time_per_site * sites_left
log.info("Checking sites: DONE: %d sites in %.2fs (%.2fs per site); LEFT: %d sites in %.2fs", log.info("%s: DONE: %d sites in %.2fs (%.2fs per site); SKIPPED: %d sites; LEFT: %d sites in %.2fs",
task_description,
sites_processed, sites_processed,
time_spent, time_spent,
time_per_site, time_per_site,
sites_skipped,
sites_left, sites_left,
time_left time_left
) )
if (self.update_start_time != start_time) and self.update_pool.full(): log.info("%s: finished in %.2fs", task_description, time.time() - start_time)
# Another updateSites() is running, throttling now...
time.sleep(5)
else:
time.sleep(1)
log.info("Checking sites: finished in %.2fs" % (time.time() - start_time))
def sitesMaintenanceThread(self, mode="full"): def sitesMaintenanceThread(self, mode="full"):
startup = True startup = True
@ -402,7 +441,7 @@ class FileServer(ConnectionServer):
startup = False startup = False
def keepAliveThread(self): def keepAliveThread(self):
# This thread is mostly useless on a loaded system, since it never does # This thread is mostly useless on a system under load, since it never does
# any works, if we have active traffic. # any works, if we have active traffic.
# #
# We should initiate some network activity to detect the Internet outage # We should initiate some network activity to detect the Internet outage
@ -466,7 +505,13 @@ class FileServer(ConnectionServer):
log.info("IP change detected from %s to %s" % (last_my_ips, my_ips)) log.info("IP change detected from %s to %s" % (last_my_ips, my_ips))
if is_time_changed or is_ip_changed: if is_time_changed or is_ip_changed:
self.updateSites(force_port_check=True) invalid_interval=(
last_time - self.internet_outage_threshold - random.randint(60 * 5, 60 * 10),
time.time()
)
self.invalidateUpdateTime(invalid_interval)
self.recheck_port = True
gevent.spawn(self.updateSites)
last_time = time.time() last_time = time.time()
last_my_ips = my_ips last_my_ips = my_ips
@ -494,7 +539,9 @@ class FileServer(ConnectionServer):
DebugReloader.watcher.addCallback(self.reload) DebugReloader.watcher.addCallback(self.reload)
if check_sites: # Open port, Update sites, Check files integrity if check_sites: # Open port, Update sites, Check files integrity
gevent.spawn(self.updateSites) gevent.spawn(self.updateSites, check_files=True)
gevent.spawn(self.updateSites)
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full") thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full")

View file

@ -81,8 +81,6 @@ class Site(object):
scaler=self.getActivityRating) scaler=self.getActivityRating)
] ]
self.delayed_startup_announce = False
self.content = None # Load content.json self.content = None # Load content.json
self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers = {} # Key: ip:port, Value: Peer.Peer
self.peers_recent = collections.deque(maxlen=150) self.peers_recent = collections.deque(maxlen=150)
@ -93,6 +91,7 @@ class Site(object):
self.content_updated = None # Content.js update time self.content_updated = None # Content.js update time
self.last_check_files_time = 0 self.last_check_files_time = 0
self.last_online_update = 0 self.last_online_update = 0
self.startup_announce_done = 0
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
self.page_requested = False # Page viewed in browser self.page_requested = False # Page viewed in browser
self.websockets = [] # Active site websocket connections self.websockets = [] # Active site websocket connections
@ -525,6 +524,28 @@ class Site(object):
time.sleep(0.1) time.sleep(0.1)
return queried return queried
def invalidateUpdateTime(self, invalid_interval):
a, b = invalid_interval
if b is None:
b = time.time()
if a is None:
a = b
if a <= self.last_online_update and self.last_online_update <= b:
self.last_online_update = 0
self.log.info("Update time invalidated")
def isUpdateTimeValid(self):
if not self.last_online_update:
return False
expirationThreshold = 60 * 60 * 6
return self.last_online_update > time.time() - expirationThreshold
def refreshUpdateTime(self, valid=True):
if valid:
self.last_online_update = time.time()
else:
self.last_online_update = 0
# Update content.json from peers and download changed files # Update content.json from peers and download changed files
# Return: None # Return: None
@util.Noparallel() @util.Noparallel()
@ -564,45 +585,56 @@ class Site(object):
else: else:
self.content_updated = time.time() self.content_updated = time.time()
self.sendMyHashfield()
self.updateHashfield()
self.refreshUpdateTime(valid=self.connection_server.isInternetOnline())
self.updateWebsocket(updated=True) self.updateWebsocket(updated=True)
def considerUpdate(self): @util.Noparallel(queue=True, ignore_args=True)
def considerUpdate(self, check_files=False, dry_run=False):
if not self.isServing(): if not self.isServing():
return return False
online = self.connection_server.isInternetOnline() online = self.connection_server.isInternetOnline()
if online and time.time() - self.last_online_update < 60 * 10: run_update = False
with gevent.Timeout(10, exception=False): msg = None
self.announcer.announcePex()
return if not online:
run_update = True
msg = "network connection seems broken, trying to update the site to check if the network is up"
elif check_files:
run_update = True
msg = "checking site's files..."
elif not self.isUpdateTimeValid():
run_update = True
msg = "update time is not invalid, updating now..."
if not run_update:
return False
if dry_run:
return True
self.log.debug(msg)
# TODO: there should be a configuration options controlling: # TODO: there should be a configuration options controlling:
# * whether to check files on the program startup # * whether to check files on the program startup
# * whether to check files during the run time and how often # * whether to check files during the run time and how often
check_files = self.last_check_files_time == 0 check_files = check_files and (self.last_check_files_time == 0)
self.last_check_files_time = time.time() self.last_check_files_time = time.time()
# quick start, avoiding redundant announces if len(self.peers) < 50:
if len(self.peers) >= 50: self.announce(mode="update")
if len(self.getConnectedPeers()) > 4:
pass # Don't run announce() at all
else:
self.setDelayedStartupAnnounce()
else:
self.announce(mode="startup")
online = online and self.connection_server.isInternetOnline()
self.update(check_files=check_files) self.update(check_files=check_files)
self.sendMyHashfield()
self.updateHashfield()
online = online and self.connection_server.isInternetOnline() online = online and self.connection_server.isInternetOnline()
self.refreshUpdateTime(valid=online)
if online: return True
self.last_online_update = time.time()
# Update site by redownload all content.json # Update site by redownload all content.json
def redownloadContents(self): def redownloadContents(self):
@ -932,16 +964,6 @@ class Site(object):
peer.found(source) peer.found(source)
return peer return peer
def setDelayedStartupAnnounce(self):
self.delayed_startup_announce = True
def applyDelayedStartupAnnounce(self):
if self.delayed_startup_announce:
self.delayed_startup_announce = False
self.announce(mode="startup")
return True
return False
def announce(self, *args, **kwargs): def announce(self, *args, **kwargs):
if self.isServing(): if self.isServing():
self.announcer.announce(*args, **kwargs) self.announcer.announce(*args, **kwargs)
@ -1243,8 +1265,6 @@ class Site(object):
if not self.isServing(): if not self.isServing():
return False return False
self.applyDelayedStartupAnnounce()
if not self.peers: if not self.peers:
return False return False
@ -1259,8 +1279,6 @@ class Site(object):
self.announcer.announcePex() self.announcer.announcePex()
self.update() self.update()
self.sendMyHashfield(3)
self.updateHashfield(3)
return True return True
@ -1270,10 +1288,11 @@ class Site(object):
self.log.debug("periodicMaintenanceHandler_announce: startup=%s, force=%s" % (startup, force)) self.log.debug("periodicMaintenanceHandler_announce: startup=%s, force=%s" % (startup, force))
if self.applyDelayedStartupAnnounce(): if startup and len(self.peers) < 10:
return True self.announce(mode="startup")
else:
self.announce(mode="update", pex=False)
self.announce(mode="update", pex=False)
return True return True
# Send hashfield to peers # Send hashfield to peers