Redesign the Internet outage detection. Improvements in FileServer threads.
This commit is contained in:
parent
829fd46781
commit
d1b9cc8261
4 changed files with 173 additions and 30 deletions
|
@ -442,6 +442,7 @@ class Connection(object):
|
|||
def handleMessage(self, message):
|
||||
cmd = message["cmd"]
|
||||
|
||||
self.updateOnlineStatus(successful_activity=True)
|
||||
self.last_message_time = time.time()
|
||||
self.last_cmd_recv = cmd
|
||||
if cmd == "response": # New style response
|
||||
|
@ -504,6 +505,7 @@ class Connection(object):
|
|||
|
||||
# Send data to connection
|
||||
def send(self, message, streaming=False):
|
||||
self.updateOnlineStatus(outgoing_activity=True)
|
||||
self.last_send_time = time.time()
|
||||
if config.debug_socket:
|
||||
self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (
|
||||
|
@ -543,6 +545,11 @@ class Connection(object):
|
|||
message = None
|
||||
with self.send_lock:
|
||||
self.sock.sendall(data)
|
||||
# XXX: Should not be used here:
|
||||
# self.updateOnlineStatus(successful_activity=True)
|
||||
# Looks like self.sock.sendall() returns normally, instead of
|
||||
# raising an Exception (at least, some times).
|
||||
# So the only way of detecting the network activity is self.handleMessage()
|
||||
except Exception as err:
|
||||
self.close("Send error: %s (cmd: %s)" % (err, stat_key))
|
||||
return False
|
||||
|
@ -633,3 +640,8 @@ class Connection(object):
|
|||
self.sock = None
|
||||
self.unpacker = None
|
||||
self.event_connected = None
|
||||
|
||||
def updateOnlineStatus(self, outgoing_activity=False, successful_activity=False):
|
||||
self.server.updateOnlineStatus(self,
|
||||
outgoing_activity=outgoing_activity,
|
||||
successful_activity=successful_activity)
|
||||
|
|
|
@ -41,7 +41,11 @@ class ConnectionServer(object):
|
|||
self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood
|
||||
self.broken_ssl_ips = {} # Peerids of broken ssl connections
|
||||
self.ips = {} # Connection by ip
|
||||
|
||||
self.has_internet = True # Internet outage detection
|
||||
self.last_outgoing_internet_activity_time = 0 # Last time the application tried to send any data
|
||||
self.last_successful_internet_activity_time = 0 # Last time the application successfully sent or received any data
|
||||
self.internet_outage_threshold = 60 * 2
|
||||
|
||||
self.stream_server = None
|
||||
self.stream_server_proxy = None
|
||||
|
@ -60,6 +64,8 @@ class ConnectionServer(object):
|
|||
self.num_outgoing = 0
|
||||
self.had_external_incoming = False
|
||||
|
||||
|
||||
|
||||
self.timecorrection = 0.0
|
||||
self.pool = Pool(500) # do not accept more than 500 connections
|
||||
|
||||
|
@ -252,8 +258,8 @@ class ConnectionServer(object):
|
|||
while self.running:
|
||||
run_i += 1
|
||||
self.ip_incoming = {} # Reset connected ips counter
|
||||
last_message_time = 0
|
||||
s = time.time()
|
||||
self.updateOnlineStatus(None)
|
||||
for connection in self.connections[:]: # Make a copy
|
||||
if connection.ip.endswith(".onion") or config.tor == "always":
|
||||
timeout_multipler = 2
|
||||
|
@ -261,9 +267,6 @@ class ConnectionServer(object):
|
|||
timeout_multipler = 1
|
||||
|
||||
idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time)
|
||||
if connection.last_message_time > last_message_time and not connection.is_private_ip:
|
||||
# Message from local IPs does not means internet connection
|
||||
last_message_time = connection.last_message_time
|
||||
|
||||
if connection.unpacker and idle > 30:
|
||||
# Delete the unpacker if not needed
|
||||
|
@ -311,18 +314,6 @@ class ConnectionServer(object):
|
|||
# Reset bad action counter every 30 min
|
||||
connection.bad_actions = 0
|
||||
|
||||
# Internet outage detection
|
||||
if time.time() - last_message_time > max(60, 60 * 10 / max(1, float(len(self.connections)) / 50)):
|
||||
# Offline: Last message more than 60-600sec depending on connection number
|
||||
if self.has_internet and last_message_time:
|
||||
self.has_internet = False
|
||||
self.onInternetOffline()
|
||||
else:
|
||||
# Online
|
||||
if not self.has_internet:
|
||||
self.has_internet = True
|
||||
self.onInternetOnline()
|
||||
|
||||
self.timecorrection = self.getTimecorrection()
|
||||
|
||||
if time.time() - s > 0.01:
|
||||
|
@ -353,6 +344,48 @@ class ConnectionServer(object):
|
|||
))
|
||||
return num_closed
|
||||
|
||||
# Internet outage detection
|
||||
def updateOnlineStatus(self, connection, outgoing_activity=False, successful_activity=False):
|
||||
|
||||
now = time.time()
|
||||
|
||||
if connection and not connection.is_private_ip:
|
||||
if outgoing_activity:
|
||||
self.last_outgoing_internet_activity_time = now
|
||||
if successful_activity:
|
||||
self.last_successful_internet_activity_time = now
|
||||
self.setInternetStatus(True)
|
||||
return
|
||||
|
||||
if not self.last_outgoing_internet_activity_time:
|
||||
return
|
||||
|
||||
if (
|
||||
(self.last_successful_internet_activity_time < now - self.internet_outage_threshold)
|
||||
and
|
||||
(self.last_successful_internet_activity_time < self.last_outgoing_internet_activity_time)
|
||||
):
|
||||
self.setInternetStatus(False)
|
||||
return
|
||||
|
||||
# This is the old algorithm just in case we missed something
|
||||
idle = now - self.last_successful_internet_activity_time
|
||||
if idle > max(60, 60 * 10 / max(1, float(len(self.connections)) / 50)):
|
||||
# Offline: Last successful activity more than 60-600sec depending on connection number
|
||||
self.setInternetStatus(False)
|
||||
return
|
||||
|
||||
def setInternetStatus(self, status):
|
||||
if self.has_internet == status:
|
||||
return
|
||||
|
||||
self.has_internet = status
|
||||
|
||||
if self.has_internet:
|
||||
gevent.spawn(self.onInternetOnline)
|
||||
else:
|
||||
gevent.spawn(self.onInternetOffline)
|
||||
|
||||
def onInternetOnline(self):
|
||||
self.log.info("Internet online")
|
||||
|
||||
|
|
|
@ -238,17 +238,50 @@ class FileServer(ConnectionServer):
|
|||
|
||||
return res
|
||||
|
||||
# Returns False if Internet is immediately available
|
||||
# Returns True if we've spent some time waiting for Internet
|
||||
def waitForInternetOnline(self):
|
||||
if self.has_internet:
|
||||
return False
|
||||
|
||||
while not self.has_internet:
|
||||
time.sleep(15)
|
||||
if self.has_internet:
|
||||
break
|
||||
if not self.check_pool.full():
|
||||
self.check_pool.spawn(self.updateRandomSite)
|
||||
|
||||
return True
|
||||
|
||||
def updateRandomSite(self, site_addresses=None, check_files=False):
|
||||
if not site_addresses:
|
||||
site_addresses = self.getSiteAddresses()
|
||||
|
||||
site_addresses = random.sample(site_addresses, 1)
|
||||
if len(site_addresses) < 1:
|
||||
return
|
||||
|
||||
address = site_addresses[0]
|
||||
site = self.sites.get(address, None)
|
||||
|
||||
if not site or not site.isServing():
|
||||
return
|
||||
|
||||
log.debug("Checking randomly chosen site: %s", site.address_short)
|
||||
|
||||
self.checkSite(site, check_files=check_files)
|
||||
|
||||
# Check site file integrity
|
||||
def checkSite(self, site, check_files=False):
|
||||
if not site.isServing():
|
||||
if not site or not site.isServing():
|
||||
return
|
||||
|
||||
quick_start = len(site.peers) >= 50
|
||||
|
||||
if quick_start:
|
||||
log.debug("Checking site: %s (quick start)" % (site.address))
|
||||
log.debug("Checking site: %s (quick start)", site.address_short)
|
||||
else:
|
||||
log.debug("Checking site: %s" % (site.address))
|
||||
log.debug("Checking site: %s", site.address_short)
|
||||
|
||||
if quick_start:
|
||||
site.setDelayedStartupAnnounce()
|
||||
|
@ -291,7 +324,12 @@ class FileServer(ConnectionServer):
|
|||
|
||||
if (not site) or (not site.isServing()):
|
||||
continue
|
||||
check_thread = self.check_pool.spawn(self.checkSite, site, check_files) # Check in new thread
|
||||
|
||||
while 1:
|
||||
self.waitForInternetOnline()
|
||||
self.check_pool.spawn(self.checkSite, site, check_files)
|
||||
if not self.waitForInternetOnline():
|
||||
break
|
||||
|
||||
if time.time() - progress_print_time > 60:
|
||||
progress_print_time = time.time()
|
||||
|
@ -313,21 +351,25 @@ class FileServer(ConnectionServer):
|
|||
else:
|
||||
time.sleep(1)
|
||||
|
||||
log.info("Checking sites: finished in %.3fs" % (time.time() - start_time))
|
||||
log.info("Checking sites: finished in %.2fs" % (time.time() - start_time))
|
||||
|
||||
def sitesMaintenanceThread(self):
|
||||
import gc
|
||||
def sitesMaintenanceThread(self, mode="full"):
|
||||
startup = True
|
||||
|
||||
short_timeout = 2
|
||||
long_timeout = 60 * 2
|
||||
min_long_timeout = 10
|
||||
max_long_timeout = 60 * 10
|
||||
long_timeout = min_long_timeout
|
||||
short_cycle_time_limit = 60 * 2
|
||||
|
||||
while 1:
|
||||
time.sleep(long_timeout)
|
||||
gc.collect() # Explicit garbage collection
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
log.debug(
|
||||
"Starting maintenance cycle: connections=%s, internet=%s",
|
||||
"Starting <%s> maintenance cycle: connections=%s, internet=%s",
|
||||
mode,
|
||||
len(self.connections), self.has_internet
|
||||
)
|
||||
start_time = time.time()
|
||||
|
@ -341,7 +383,7 @@ class FileServer(ConnectionServer):
|
|||
if (not site) or (not site.isServing()):
|
||||
continue
|
||||
|
||||
log.debug("Running maintenance for site: %s", site.address)
|
||||
log.debug("Running maintenance for site: %s", site.address_short)
|
||||
|
||||
done = site.runPeriodicMaintenance(startup=startup)
|
||||
site = None
|
||||
|
@ -349,15 +391,68 @@ class FileServer(ConnectionServer):
|
|||
sites_processed += 1
|
||||
time.sleep(short_timeout)
|
||||
|
||||
log.debug("Maintenance cycle finished in %.3fs. Total sites: %d. Processed sites: %d",
|
||||
# If we host hundreds of sites, the full maintenance cycle may take very
|
||||
# long time, especially on startup ( > 1 hour).
|
||||
# This means we are not able to run the maintenance procedure for active
|
||||
# sites frequently enough using just a single maintenance thread.
|
||||
# So we run 2 maintenance threads:
|
||||
# * One running full cycles.
|
||||
# * And one running short cycles for the most active sites.
|
||||
# When the short cycle runs out of the time limit, it restarts
|
||||
# from the beginning of the site list.
|
||||
if mode == "short" and time.time() - start_time > short_cycle_time_limit:
|
||||
break
|
||||
|
||||
log.debug("<%s> maintenance cycle finished in %.2fs. Total sites: %d. Processed sites: %d. Timeout: %d",
|
||||
mode,
|
||||
time.time() - start_time,
|
||||
len(site_addresses),
|
||||
sites_processed
|
||||
sites_processed,
|
||||
long_timeout
|
||||
)
|
||||
|
||||
if sites_processed:
|
||||
long_timeout = max(int(long_timeout / 2), min_long_timeout)
|
||||
else:
|
||||
long_timeout = min(long_timeout + 1, max_long_timeout)
|
||||
|
||||
site_addresses = None
|
||||
startup = False
|
||||
|
||||
def keepAliveThread(self):
|
||||
# This thread is mostly useless on a loaded system, since it never does
|
||||
# any works, if we have active traffic.
|
||||
#
|
||||
# We should initiate some network activity to detect the Internet outage
|
||||
# and avoid false positives. We normally have some network activity
|
||||
# initiated by various parts on the application as well as network peers.
|
||||
# So it's not a problem.
|
||||
#
|
||||
# However, if it actually happens that we have no network traffic for
|
||||
# some time (say, we host just a couple of inactive sites, and no peers
|
||||
# are interested in connecting to them), we initiate some traffic by
|
||||
# performing the update for a random site. It's way better than just
|
||||
# silly pinging a random peer for no profit.
|
||||
while 1:
|
||||
threshold = self.internet_outage_threshold / 2.0
|
||||
time.sleep(threshold / 2.0)
|
||||
self.waitForInternetOnline()
|
||||
last_activity_time = max(
|
||||
self.last_successful_internet_activity_time,
|
||||
self.last_outgoing_internet_activity_time)
|
||||
now = time.time()
|
||||
if not len(self.sites):
|
||||
continue
|
||||
if last_activity_time > now - threshold:
|
||||
continue
|
||||
if self.check_pool.full():
|
||||
continue
|
||||
|
||||
log.info("No network activity for %.2fs. Running an update for a random site.",
|
||||
now - last_activity_time
|
||||
)
|
||||
self.check_pool.spawn(self.updateRandomSite)
|
||||
|
||||
# Periodic reloading of tracker files
|
||||
def reloadTrackerFilesThread(self):
|
||||
# TODO:
|
||||
|
@ -420,7 +515,9 @@ class FileServer(ConnectionServer):
|
|||
gevent.spawn(self.checkSites)
|
||||
|
||||
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
|
||||
thread_sites_maintenance = gevent.spawn(self.sitesMaintenanceThread)
|
||||
thread_sites_maintenance_full = gevent.spawn(self.sitesMaintenanceThread, mode="full")
|
||||
thread_sites_maintenance_short = gevent.spawn(self.sitesMaintenanceThread, mode="short")
|
||||
thread_keep_alive = gevent.spawn(self.keepAliveThread)
|
||||
thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
|
||||
|
||||
ConnectionServer.listen(self)
|
||||
|
|
|
@ -1177,6 +1177,7 @@ class Site(object):
|
|||
self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % (
|
||||
len(connected_peers), need_to_close, closed))
|
||||
|
||||
@util.Noparallel(queue=True)
|
||||
def runPeriodicMaintenance(self, startup=False, force=False):
|
||||
if not self.isServing():
|
||||
return False
|
||||
|
|
Loading…
Reference in a new issue