More fixes on the way to reliable site updates.
This commit is contained in:
parent
d1b9cc8261
commit
e8358ee8f2
3 changed files with 79 additions and 50 deletions
|
@ -386,6 +386,9 @@ class ConnectionServer(object):
|
||||||
else:
|
else:
|
||||||
gevent.spawn(self.onInternetOffline)
|
gevent.spawn(self.onInternetOffline)
|
||||||
|
|
||||||
|
def isInternetOnline(self):
|
||||||
|
return self.has_internet
|
||||||
|
|
||||||
def onInternetOnline(self):
|
def onInternetOnline(self):
|
||||||
self.log.info("Internet online")
|
self.log.info("Internet online")
|
||||||
|
|
||||||
|
|
|
@ -33,8 +33,8 @@ class FileServer(ConnectionServer):
|
||||||
# self.log = logging.getLogger("FileServer")
|
# self.log = logging.getLogger("FileServer")
|
||||||
# The value of self.log will be overwritten in ConnectionServer.__init__()
|
# The value of self.log will be overwritten in ConnectionServer.__init__()
|
||||||
|
|
||||||
self.check_pool = gevent.pool.Pool(5)
|
self.update_pool = gevent.pool.Pool(5)
|
||||||
self.check_start_time = 0
|
self.update_start_time = 0
|
||||||
|
|
||||||
self.supported_ip_types = ["ipv4"] # Outgoing ip_type support
|
self.supported_ip_types = ["ipv4"] # Outgoing ip_type support
|
||||||
if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported():
|
if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported():
|
||||||
|
@ -153,13 +153,12 @@ class FileServer(ConnectionServer):
|
||||||
log.debug("FileRequest: %s %s" % (str(connection), message["cmd"]))
|
log.debug("FileRequest: %s %s" % (str(connection), message["cmd"]))
|
||||||
req = FileRequest(self, connection)
|
req = FileRequest(self, connection)
|
||||||
req.route(message["cmd"], message.get("req_id"), message.get("params"))
|
req.route(message["cmd"], message.get("req_id"), message.get("params"))
|
||||||
if not self.has_internet and not connection.is_private_ip:
|
if not connection.is_private_ip:
|
||||||
self.has_internet = True
|
self.setInternetStatus(self, True)
|
||||||
self.onInternetOnline()
|
|
||||||
|
|
||||||
def onInternetOnline(self):
|
def onInternetOnline(self):
|
||||||
log.info("Internet online")
|
log.info("Internet online")
|
||||||
gevent.spawn(self.checkSites, check_files=False, force_port_check=True)
|
gevent.spawn(self.updateSites, force_port_check=True)
|
||||||
|
|
||||||
# Reload the FileRequest class to prevent restarts in debug mode
|
# Reload the FileRequest class to prevent restarts in debug mode
|
||||||
def reload(self):
|
def reload(self):
|
||||||
|
@ -241,19 +240,19 @@ class FileServer(ConnectionServer):
|
||||||
# Returns False if Internet is immediately available
|
# Returns False if Internet is immediately available
|
||||||
# Returns True if we've spent some time waiting for Internet
|
# Returns True if we've spent some time waiting for Internet
|
||||||
def waitForInternetOnline(self):
|
def waitForInternetOnline(self):
|
||||||
if self.has_internet:
|
if self.isInternetOnline():
|
||||||
return False
|
return False
|
||||||
|
|
||||||
while not self.has_internet:
|
while not self.isInternetOnline():
|
||||||
time.sleep(15)
|
time.sleep(15)
|
||||||
if self.has_internet:
|
if self.isInternetOnline():
|
||||||
break
|
break
|
||||||
if not self.check_pool.full():
|
if not self.update_pool.full():
|
||||||
self.check_pool.spawn(self.updateRandomSite)
|
self.update_pool.spawn(self.updateRandomSite)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def updateRandomSite(self, site_addresses=None, check_files=False):
|
def updateRandomSite(self, site_addresses=None):
|
||||||
if not site_addresses:
|
if not site_addresses:
|
||||||
site_addresses = self.getSiteAddresses()
|
site_addresses = self.getSiteAddresses()
|
||||||
|
|
||||||
|
@ -269,39 +268,22 @@ class FileServer(ConnectionServer):
|
||||||
|
|
||||||
log.debug("Checking randomly chosen site: %s", site.address_short)
|
log.debug("Checking randomly chosen site: %s", site.address_short)
|
||||||
|
|
||||||
self.checkSite(site, check_files=check_files)
|
self.updateSite(site)
|
||||||
|
|
||||||
# Check site file integrity
|
def updateSite(self, site):
|
||||||
def checkSite(self, site, check_files=False):
|
|
||||||
if not site or not site.isServing():
|
if not site or not site.isServing():
|
||||||
return
|
return
|
||||||
|
site.considerUpdate()
|
||||||
|
|
||||||
quick_start = len(site.peers) >= 50
|
|
||||||
|
|
||||||
if quick_start:
|
|
||||||
log.debug("Checking site: %s (quick start)", site.address_short)
|
|
||||||
else:
|
|
||||||
log.debug("Checking site: %s", site.address_short)
|
|
||||||
|
|
||||||
if quick_start:
|
|
||||||
site.setDelayedStartupAnnounce()
|
|
||||||
else:
|
|
||||||
site.announce(mode="startup")
|
|
||||||
|
|
||||||
site.update(check_files=check_files) # Update site's content.json and download changed files
|
|
||||||
site.sendMyHashfield()
|
|
||||||
site.updateHashfield()
|
|
||||||
|
|
||||||
# Check sites integrity
|
|
||||||
@util.Noparallel()
|
@util.Noparallel()
|
||||||
def checkSites(self, check_files=False, force_port_check=False):
|
def updateSites(self, force_port_check=False):
|
||||||
log.info("Checking sites: check_files=%s, force_port_check=%s", check_files, force_port_check)
|
log.info("Checking sites: force_port_check=%s", force_port_check)
|
||||||
|
|
||||||
# Don't wait port opening on first startup. Do the instant check now.
|
# Don't wait port opening on first startup. Do the instant check now.
|
||||||
if len(self.sites) <= 2:
|
if len(self.sites) <= 2:
|
||||||
sites_checking = True
|
sites_checking = True
|
||||||
for address, site in list(self.sites.items()):
|
for address, site in list(self.sites.items()):
|
||||||
gevent.spawn(self.checkSite, site, check_files)
|
gevent.spawn(self.updateSite, site)
|
||||||
|
|
||||||
# Test and open port if not tested yet
|
# Test and open port if not tested yet
|
||||||
if not self.port_opened or force_port_check:
|
if not self.port_opened or force_port_check:
|
||||||
|
@ -313,7 +295,7 @@ class FileServer(ConnectionServer):
|
||||||
|
|
||||||
sites_processed = 0
|
sites_processed = 0
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
self.check_start_time = start_time
|
self.update_start_time = start_time
|
||||||
progress_print_time = time.time()
|
progress_print_time = time.time()
|
||||||
|
|
||||||
# Check sites integrity
|
# Check sites integrity
|
||||||
|
@ -327,7 +309,7 @@ class FileServer(ConnectionServer):
|
||||||
|
|
||||||
while 1:
|
while 1:
|
||||||
self.waitForInternetOnline()
|
self.waitForInternetOnline()
|
||||||
self.check_pool.spawn(self.checkSite, site, check_files)
|
self.update_pool.spawn(self.updateSite, site)
|
||||||
if not self.waitForInternetOnline():
|
if not self.waitForInternetOnline():
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -345,8 +327,8 @@ class FileServer(ConnectionServer):
|
||||||
time_left
|
time_left
|
||||||
)
|
)
|
||||||
|
|
||||||
if (self.check_start_time != start_time) and self.check_pool.full():
|
if (self.update_start_time != start_time) and self.update_pool.full():
|
||||||
# Another check is running, throttling...
|
# Another updateSites() is running, throttling now...
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
else:
|
else:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
@ -370,7 +352,7 @@ class FileServer(ConnectionServer):
|
||||||
log.debug(
|
log.debug(
|
||||||
"Starting <%s> maintenance cycle: connections=%s, internet=%s",
|
"Starting <%s> maintenance cycle: connections=%s, internet=%s",
|
||||||
mode,
|
mode,
|
||||||
len(self.connections), self.has_internet
|
len(self.connections), self.isInternetOnline()
|
||||||
)
|
)
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
|
@ -445,13 +427,13 @@ class FileServer(ConnectionServer):
|
||||||
continue
|
continue
|
||||||
if last_activity_time > now - threshold:
|
if last_activity_time > now - threshold:
|
||||||
continue
|
continue
|
||||||
if self.check_pool.full():
|
if self.update_pool.full():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.info("No network activity for %.2fs. Running an update for a random site.",
|
log.info("No network activity for %.2fs. Running an update for a random site.",
|
||||||
now - last_activity_time
|
now - last_activity_time
|
||||||
)
|
)
|
||||||
self.check_pool.spawn(self.updateRandomSite)
|
self.update_pool.spawn(self.updateRandomSite)
|
||||||
|
|
||||||
# Periodic reloading of tracker files
|
# Periodic reloading of tracker files
|
||||||
def reloadTrackerFilesThread(self):
|
def reloadTrackerFilesThread(self):
|
||||||
|
@ -484,7 +466,7 @@ class FileServer(ConnectionServer):
|
||||||
log.info("IP change detected from %s to %s" % (last_my_ips, my_ips))
|
log.info("IP change detected from %s to %s" % (last_my_ips, my_ips))
|
||||||
|
|
||||||
if is_time_changed or is_ip_changed:
|
if is_time_changed or is_ip_changed:
|
||||||
self.checkSites(check_files=False, force_port_check=True)
|
self.updateSites(force_port_check=True)
|
||||||
|
|
||||||
last_time = time.time()
|
last_time = time.time()
|
||||||
last_my_ips = my_ips
|
last_my_ips = my_ips
|
||||||
|
@ -512,7 +494,7 @@ class FileServer(ConnectionServer):
|
||||||
DebugReloader.watcher.addCallback(self.reload)
|
DebugReloader.watcher.addCallback(self.reload)
|
||||||
|
|
||||||
if check_sites: # Open port, Update sites, Check files integrity
|
if check_sites: # Open port, Update sites, Check files integrity
|
||||||
gevent.spawn(self.checkSites)
|
gevent.spawn(self.updateSites)
|
||||||
|
|
||||||
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
|
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
|
||||||
thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full")
|
thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full")
|
||||||
|
|
|
@ -76,7 +76,7 @@ class Site(object):
|
||||||
ScaledTimeoutHandler(60 * 30, 60 * 2,
|
ScaledTimeoutHandler(60 * 30, 60 * 2,
|
||||||
handler=self.periodicMaintenanceHandler_announce,
|
handler=self.periodicMaintenanceHandler_announce,
|
||||||
scaler=self.getAnnounceRating),
|
scaler=self.getAnnounceRating),
|
||||||
ScaledTimeoutHandler(60 * 20, 60 * 10,
|
ScaledTimeoutHandler(60 * 30, 60 * 5,
|
||||||
handler=self.periodicMaintenanceHandler_general,
|
handler=self.periodicMaintenanceHandler_general,
|
||||||
scaler=self.getActivityRating)
|
scaler=self.getActivityRating)
|
||||||
]
|
]
|
||||||
|
@ -91,6 +91,8 @@ class Site(object):
|
||||||
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
||||||
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
||||||
self.content_updated = None # Content.js update time
|
self.content_updated = None # Content.js update time
|
||||||
|
self.last_check_files_time = 0
|
||||||
|
self.last_online_update = 0
|
||||||
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
|
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
|
||||||
self.page_requested = False # Page viewed in browser
|
self.page_requested = False # Page viewed in browser
|
||||||
self.websockets = [] # Active site websocket connections
|
self.websockets = [] # Active site websocket connections
|
||||||
|
@ -564,6 +566,44 @@ class Site(object):
|
||||||
|
|
||||||
self.updateWebsocket(updated=True)
|
self.updateWebsocket(updated=True)
|
||||||
|
|
||||||
|
def considerUpdate(self):
|
||||||
|
if not self.isServing():
|
||||||
|
return
|
||||||
|
|
||||||
|
online = self.connection_server.isInternetOnline()
|
||||||
|
|
||||||
|
if online and time.time() - self.last_online_update < 60 * 10:
|
||||||
|
with gevent.Timeout(10, exception=False):
|
||||||
|
self.announcer.announcePex()
|
||||||
|
return
|
||||||
|
|
||||||
|
# TODO: there should be a configuration options controlling:
|
||||||
|
# * whether to check files on the program startup
|
||||||
|
# * whether to check files during the run time and how often
|
||||||
|
check_files = self.last_check_files_time == 0
|
||||||
|
|
||||||
|
self.last_check_files_time = time.time()
|
||||||
|
|
||||||
|
# quick start, avoiding redundant announces
|
||||||
|
if len(self.peers) >= 50:
|
||||||
|
if len(self.getConnectedPeers()) > 4:
|
||||||
|
pass # Don't run announce() at all
|
||||||
|
else:
|
||||||
|
self.setDelayedStartupAnnounce()
|
||||||
|
else:
|
||||||
|
self.announce(mode="startup")
|
||||||
|
|
||||||
|
online = online and self.connection_server.isInternetOnline()
|
||||||
|
|
||||||
|
self.update(check_files=check_files)
|
||||||
|
self.sendMyHashfield()
|
||||||
|
self.updateHashfield()
|
||||||
|
|
||||||
|
online = online and self.connection_server.isInternetOnline()
|
||||||
|
|
||||||
|
if online:
|
||||||
|
self.last_online_update = time.time()
|
||||||
|
|
||||||
# Update site by redownload all content.json
|
# Update site by redownload all content.json
|
||||||
def redownloadContents(self):
|
def redownloadContents(self):
|
||||||
# Download all content.json again
|
# Download all content.json again
|
||||||
|
@ -927,6 +967,14 @@ class Site(object):
|
||||||
if (not force_safe) and self.settings["own"]:
|
if (not force_safe) and self.settings["own"]:
|
||||||
rating = min(rating, 0.6)
|
rating = min(rating, 0.6)
|
||||||
|
|
||||||
|
if self.content_updated is False: # Last check modification failed
|
||||||
|
rating += 0.1
|
||||||
|
elif self.bad_files:
|
||||||
|
rating += 0.1
|
||||||
|
|
||||||
|
if rating > 1.0:
|
||||||
|
rating = 1.0
|
||||||
|
|
||||||
return rating
|
return rating
|
||||||
|
|
||||||
def getAnnounceRating(self):
|
def getAnnounceRating(self):
|
||||||
|
@ -1210,14 +1258,10 @@ class Site(object):
|
||||||
with gevent.Timeout(10, exception=False):
|
with gevent.Timeout(10, exception=False):
|
||||||
self.announcer.announcePex()
|
self.announcer.announcePex()
|
||||||
|
|
||||||
|
self.update()
|
||||||
self.sendMyHashfield(3)
|
self.sendMyHashfield(3)
|
||||||
self.updateHashfield(3)
|
self.updateHashfield(3)
|
||||||
|
|
||||||
if self.content_updated is False: # Last check modification failed
|
|
||||||
self.update()
|
|
||||||
elif self.bad_files:
|
|
||||||
self.retryBadFiles()
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def periodicMaintenanceHandler_announce(self, startup=False, force=False):
|
def periodicMaintenanceHandler_announce(self, startup=False, force=False):
|
||||||
|
|
Loading…
Reference in a new issue