Redesign cleanupSites() and all the related stuff and rename it to periodic maintenance.

This commit is contained in:
Vadim Ushakov 2020-10-28 23:38:17 +07:00
parent 511a90a5c5
commit 8fd88c50f9
6 changed files with 175 additions and 81 deletions

View file

@ -253,7 +253,7 @@ class Config(object):
self.parser.add_argument('--size_limit', help='Default site size limit in MB', default=10, type=int, metavar='limit') self.parser.add_argument('--size_limit', help='Default site size limit in MB', default=10, type=int, metavar='limit')
self.parser.add_argument('--file_size_limit', help='Maximum per file size limit in MB', default=10, type=int, metavar='limit') self.parser.add_argument('--file_size_limit', help='Maximum per file size limit in MB', default=10, type=int, metavar='limit')
self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=8, type=int, metavar='connected_limit') self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=10, type=int, metavar='connected_limit')
self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit') self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit')
self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers') self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers')

View file

@ -10,6 +10,7 @@ from gevent.server import StreamServer
import util import util
from util import helper from util import helper
from util import CircularIterator
from Config import config from Config import config
from .FileRequest import FileRequest from .FileRequest import FileRequest
from Peer import PeerPortchecker from Peer import PeerPortchecker
@ -18,7 +19,6 @@ from Connection import ConnectionServer
from Plugin import PluginManager from Plugin import PluginManager
from Debug import Debug from Debug import Debug
@PluginManager.acceptPlugins @PluginManager.acceptPlugins
class FileServer(ConnectionServer): class FileServer(ConnectionServer):
@ -259,55 +259,35 @@ class FileServer(ConnectionServer):
check_thread.join(timeout=5) check_thread.join(timeout=5)
self.log.debug("Checksites done in %.3fs" % (time.time() - s)) self.log.debug("Checksites done in %.3fs" % (time.time() - s))
def cleanupSites(self): def sitesMaintenanceThread(self):
import gc import gc
startup = True startup = True
time.sleep(5 * 60) # Sites already cleaned up on startup
peers_protected = set([]) short_timeout = 2
long_timeout = 60 * 5
circular_iterator = CircularIterator()
while 1: while 1:
# Sites health care every 20 min if circular_iterator.isWrapped():
time.sleep(long_timeout)
circular_iterator.resetSuccessiveCount()
gc.collect() # Explicit garbage collection
self.log.debug( self.log.debug(
"Running site cleanup, connections: %s, internet: %s, protected peers: %s" % "Running site cleanup, connections: %s, internet: %s" %
(len(self.connections), self.has_internet, len(peers_protected)) (len(self.connections), self.has_internet)
) )
for address, site in list(self.sites.items()): site = circular_iterator.next(list(self.sites.values()))
if not site.isServing(): if site:
continue done = site.runPeriodicMaintenance(startup=startup)
if done:
time.sleep(short_timeout)
site = None
if not startup: if circular_iterator.isWrapped():
site.cleanupPeers(peers_protected) startup = False
time.sleep(1) # Prevent too quick request
peers_protected = set([])
for address, site in list(self.sites.items()):
if not site.isServing():
continue
if site.peers:
with gevent.Timeout(10, exception=False):
site.announcer.announcePex()
# Last check modification failed
if site.content_updated is False:
site.update()
elif site.bad_files:
site.retryBadFiles()
# Keep active connections
connected_num = site.needConnections(check_site_on_reconnect=True)
if connected_num < config.connected_limit:
# This site has small amount of peers, protect them from closing
peers_protected.update([peer.key for peer in site.getConnectedPeers()])
time.sleep(1) # Prevent too quick request
site = None
gc.collect() # Implicit garbage collection
startup = False
time.sleep(60 * 20)
def announceSite(self, site): def announceSite(self, site):
site.announce(mode="update", pex=False) site.announce(mode="update", pex=False)
@ -399,7 +379,7 @@ class FileServer(ConnectionServer):
thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread)
thread_announce_sites = gevent.spawn(self.announceSites) thread_announce_sites = gevent.spawn(self.announceSites)
thread_cleanup_sites = gevent.spawn(self.cleanupSites) thread_sites_maintenance = gevent.spawn(self.sitesMaintenanceThread)
thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
ConnectionServer.listen(self) ConnectionServer.listen(self)

View file

@ -46,6 +46,7 @@ class Peer(object):
self.is_tracker_connection = False # Tracker connection instead of normal peer self.is_tracker_connection = False # Tracker connection instead of normal peer
self.reputation = 0 # More likely to connect if larger self.reputation = 0 # More likely to connect if larger
self.last_content_json_update = 0.0 # Modify date of last received content.json self.last_content_json_update = 0.0 # Modify date of last received content.json
self.protected = 0
self.connection_error = 0 # Series of connection error self.connection_error = 0 # Series of connection error
self.hash_failed = 0 # Number of bad files from peer self.hash_failed = 0 # Number of bad files from peer
@ -74,9 +75,26 @@ class Peer(object):
logger.log(self.log_level, "%s:%s %s" % (self.ip, self.port, text)) logger.log(self.log_level, "%s:%s %s" % (self.ip, self.port, text))
# Site marks its Peers protected, if it has not enough peers connected.
# This is to be used to prevent disconnecting from peers when doing
# a periodic cleanup.
def markProtected(self, interval=60*20):
self.protected = time.time() + interval
def isProtected(self):
if self.protected > 0:
if self.protected < time.time():
self.protected = 0
return self.protected > 0
def isConnected(self): def isConnected(self):
if self.connection and not self.connection.connected:
self.connection = None
return self.connection and self.connection.connected return self.connection and self.connection.connected
def isTtlExpired(self, ttl):
return (time.time() - self.time_found) > ttl
# Connect to host # Connect to host
def connect(self, connection=None): def connect(self, connection=None):
if self.reputation < -10: if self.reputation < -10:
@ -115,6 +133,11 @@ class Peer(object):
self.connection = None self.connection = None
return self.connection return self.connection
def disconnect(self, reason="Unknown"):
if self.connection:
self.connection.close(reason)
self.connection = None
# Check if we have connection to peer # Check if we have connection to peer
def findConnection(self): def findConnection(self):
if self.connection and self.connection.connected: # We have connection to peer if self.connection and self.connection.connected: # We have connection to peer
@ -400,8 +423,7 @@ class Peer(object):
if self.site and self in self.site.peers_recent: if self.site and self in self.site.peers_recent:
self.site.peers_recent.remove(self) self.site.peers_recent.remove(self)
if self.connection: self.disconnect(reason)
self.connection.close(reason)
# - EVENTS - # - EVENTS -

View file

@ -40,6 +40,9 @@ class Site(object):
self.log = logging.getLogger("Site:%s" % self.address_short) self.log = logging.getLogger("Site:%s" % self.address_short)
self.addEventListeners() self.addEventListeners()
self.periodic_maintenance_interval = 60 * 20
self.periodic_maintenance_timestamp = 0
self.content = None # Load content.json self.content = None # Load content.json
self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers = {} # Key: ip:port, Value: Peer.Peer
self.peers_recent = collections.deque(maxlen=150) self.peers_recent = collections.deque(maxlen=150)
@ -853,6 +856,11 @@ class Site(object):
if self.isServing(): if self.isServing():
self.announcer.announce(*args, **kwargs) self.announcer.announce(*args, **kwargs)
# The engine tries to maintain the number of active connections:
# >= getPreferableActiveConnectionCount()
# and
# <= getActiveConnectionCountLimit()
def getPreferableActiveConnectionCount(self): def getPreferableActiveConnectionCount(self):
if not self.isServing(): if not self.isServing():
return 0 return 0
@ -874,8 +882,16 @@ class Site(object):
if len(self.peers) < 50: if len(self.peers) < 50:
count = max(count, 5) count = max(count, 5)
count = min(count, config.connected_limit)
return count return count
def getActiveConnectionCountLimit(self):
count_above_preferable = 2
limit = self.getPreferableActiveConnectionCount() + count_above_preferable
limit = min(limit, config.connected_limit)
return limit
def tryConnectingToMorePeers(self, more=1, pex=True, try_harder=False): def tryConnectingToMorePeers(self, more=1, pex=True, try_harder=False):
max_peers = more * 2 + 10 max_peers = more * 2 + 10
if try_harder: if try_harder:
@ -920,7 +936,7 @@ class Site(object):
if num is None: if num is None:
num = self.getPreferableActiveConnectionCount() num = self.getPreferableActiveConnectionCount()
need = min(len(self.peers), num, config.connected_limit) need = min(len(self.peers), num)
connected = self.bringConnections( connected = self.bringConnections(
need=need, need=need,
@ -935,8 +951,15 @@ class Site(object):
pex=pex, pex=pex,
try_harder=True) try_harder=True)
if connected < num:
self.markConnectedPeersProtected()
return connected return connected
def markConnectedPeersProtected(self):
for peer in site.getConnectedPeers():
peer.markProtected()
# Return: Probably peers verified to be connectable recently # Return: Probably peers verified to be connectable recently
def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True): def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True):
peers = list(self.peers.values()) peers = list(self.peers.values())
@ -1022,53 +1045,87 @@ class Site(object):
back.append(peer) back.append(peer)
return back return back
# Cleanup probably dead peers and close connection if too much def removeDeadPeers(self):
def cleanupPeers(self, peers_protected=[]):
peers = list(self.peers.values()) peers = list(self.peers.values())
if len(peers) > 20: if len(peers) <= 20:
# Cleanup old peers return
removed = 0
if len(peers) > 1000:
ttl = 60 * 60 * 1
else:
ttl = 60 * 60 * 4
for peer in peers: removed = 0
if peer.connection and peer.connection.connected: if len(peers) > 1000:
continue ttl = 60 * 60 * 1
if peer.connection and not peer.connection.connected: elif len(peers) > 100:
peer.connection = None # Dead connection ttl = 60 * 60 * 4
if time.time() - peer.time_found > ttl: # Not found on tracker or via pex in last 4 hour else:
peer.remove("Time found expired") ttl = 60 * 60 * 8
removed += 1
if removed > len(peers) * 0.1: # Don't remove too much at once
break
if removed: for peer in peers:
self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers))) if peer.isConnected() or peer.isProtected():
continue
if peer.isTtlExpired(ttl):
peer.remove("TTL expired")
removed += 1
if removed > len(peers) * 0.1: # Don't remove too much at once
break
# Close peers over the limit if removed:
closed = 0 self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers)))
connected_peers = [peer for peer in self.getConnectedPeers() if peer.connection.connected] # Only fully connected peers
need_to_close = len(connected_peers) - config.connected_limit
if closed < need_to_close: # Cleanup probably dead peers and close connection if too much
# Try to keep connections with more sites def cleanupPeers(self):
self.removeDeadPeers()
limit = self.getActiveConnectionCountLimit()
connected_peers = [peer for peer in self.getConnectedPeers() if peer.isConnected()] # Only fully connected peers
need_to_close = len(connected_peers) - limit
if need_to_close > 0:
closed = 0
for peer in sorted(connected_peers, key=lambda peer: min(peer.connection.sites, 5)): for peer in sorted(connected_peers, key=lambda peer: min(peer.connection.sites, 5)):
if not peer.connection: if not peer.isConnected():
continue continue
if peer.key in peers_protected: if peer.isProtected():
continue continue
if peer.connection.sites > 5: if peer.connection.sites > 5:
break break
peer.connection.close("Cleanup peers") peer.disconnect("Cleanup peers")
peer.connection = None
closed += 1 closed += 1
if closed >= need_to_close: if closed >= need_to_close:
break break
if need_to_close > 0: self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % (
self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % (len(connected_peers), need_to_close, closed)) len(connected_peers), need_to_close, closed))
def runPeriodicMaintenance(self, startup=False, force=False):
if not self.isServing():
return False
scheduled_time = self.periodic_maintenance_timestamp + self.periodic_maintenance_interval
if time.time() < scheduled_time and not force:
return False
self.periodic_maintenance_timestamp = time.time()
self.log.debug("runPeriodicMaintenance: startup=%s" % startup)
if not startup:
self.cleanupPeers()
if self.peers:
with gevent.Timeout(10, exception=False):
self.announcer.announcePex()
# Last check modification failed
if self.content_updated is False:
self.update()
elif self.bad_files:
self.retryBadFiles()
self.needConnections(check_site_on_reconnect=True)
self.periodic_maintenance_timestamp = time.time()
return True
# Send hashfield to peers # Send hashfield to peers
def sendMyHashfield(self, limit=5): def sendMyHashfield(self, limit=5):

View file

@ -0,0 +1,34 @@
import random
class CircularIterator:
def __init__(self):
self.successive_count = 0
self.last_size = 0
self.index = -1
def next(self, items):
self.last_size = len(items)
if self.last_size == 0:
return None
if self.index < 0:
self.index = random.randint(0, self.last_size)
else:
self.index += 1
self.index = self.index % self.last_size
self.successive_count += 1
return items[self.index]
def resetSuccessiveCount(self):
self.successive_count = 0
def getSuccessiveCount(self):
return self.successive_count
def isWrapped(self):
return self.successive_count >= self.last_size

View file

@ -1,4 +1,5 @@
from .Cached import Cached from .Cached import Cached
from .CircularIterator import CircularIterator
from .Event import Event from .Event import Event
from .Noparallel import Noparallel from .Noparallel import Noparallel
from .Pooled import Pooled from .Pooled import Pooled