From 8fd88c50f923fe2338fc793e7ea5a18473c56e38 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Wed, 28 Oct 2020 23:38:17 +0700 Subject: [PATCH] Redesign cleanupSites() and all the related stuff and rename it to periodic maintenance. --- src/Config.py | 2 +- src/File/FileServer.py | 68 +++++++------------ src/Peer/Peer.py | 26 +++++++- src/Site/Site.py | 125 +++++++++++++++++++++++++---------- src/util/CircularIterator.py | 34 ++++++++++ src/util/__init__.py | 1 + 6 files changed, 175 insertions(+), 81 deletions(-) create mode 100644 src/util/CircularIterator.py diff --git a/src/Config.py b/src/Config.py index 41c914f2..70b284bf 100644 --- a/src/Config.py +++ b/src/Config.py @@ -253,7 +253,7 @@ class Config(object): self.parser.add_argument('--size_limit', help='Default site size limit in MB', default=10, type=int, metavar='limit') self.parser.add_argument('--file_size_limit', help='Maximum per file size limit in MB', default=10, type=int, metavar='limit') - self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=8, type=int, metavar='connected_limit') + self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=10, type=int, metavar='connected_limit') self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit') self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers') diff --git a/src/File/FileServer.py b/src/File/FileServer.py index c1134800..fbc3cc85 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -10,6 +10,7 @@ from gevent.server import StreamServer import util from util import helper +from util import CircularIterator from Config import config from .FileRequest import FileRequest from Peer import PeerPortchecker @@ -18,7 +19,6 @@ from Connection import ConnectionServer from Plugin import PluginManager from Debug import Debug - @PluginManager.acceptPlugins class FileServer(ConnectionServer): @@ -259,55 +259,35 @@ class FileServer(ConnectionServer): check_thread.join(timeout=5) self.log.debug("Checksites done in %.3fs" % (time.time() - s)) - def cleanupSites(self): + def sitesMaintenanceThread(self): import gc startup = True - time.sleep(5 * 60) # Sites already cleaned up on startup - peers_protected = set([]) + + short_timeout = 2 + long_timeout = 60 * 5 + + circular_iterator = CircularIterator() + while 1: - # Sites health care every 20 min + if circular_iterator.isWrapped(): + time.sleep(long_timeout) + circular_iterator.resetSuccessiveCount() + gc.collect() # Explicit garbage collection + self.log.debug( - "Running site cleanup, connections: %s, internet: %s, protected peers: %s" % - (len(self.connections), self.has_internet, len(peers_protected)) + "Running site cleanup, connections: %s, internet: %s" % + (len(self.connections), self.has_internet) ) - for address, site in list(self.sites.items()): - if not site.isServing(): - continue + site = circular_iterator.next(list(self.sites.values())) + if site: + done = site.runPeriodicMaintenance(startup=startup) + if done: + time.sleep(short_timeout) + site = None - if not startup: - site.cleanupPeers(peers_protected) - - time.sleep(1) # Prevent too quick request - - peers_protected = set([]) - for address, site in list(self.sites.items()): - if not site.isServing(): - continue - - if site.peers: - with gevent.Timeout(10, exception=False): - site.announcer.announcePex() - - # Last check modification failed - if site.content_updated is False: - site.update() - elif site.bad_files: - site.retryBadFiles() - - # Keep active connections - connected_num = site.needConnections(check_site_on_reconnect=True) - - if connected_num < config.connected_limit: - # This site has small amount of peers, protect them from closing - peers_protected.update([peer.key for peer in site.getConnectedPeers()]) - - time.sleep(1) # Prevent too quick request - - site = None - gc.collect() # Implicit garbage collection - startup = False - time.sleep(60 * 20) + if circular_iterator.isWrapped(): + startup = False def announceSite(self, site): site.announce(mode="update", pex=False) @@ -399,7 +379,7 @@ class FileServer(ConnectionServer): thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) thread_announce_sites = gevent.spawn(self.announceSites) - thread_cleanup_sites = gevent.spawn(self.cleanupSites) + thread_sites_maintenance = gevent.spawn(self.sitesMaintenanceThread) thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) ConnectionServer.listen(self) diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index 75eb190f..0a518fdc 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -46,6 +46,7 @@ class Peer(object): self.is_tracker_connection = False # Tracker connection instead of normal peer self.reputation = 0 # More likely to connect if larger self.last_content_json_update = 0.0 # Modify date of last received content.json + self.protected = 0 self.connection_error = 0 # Series of connection error self.hash_failed = 0 # Number of bad files from peer @@ -74,9 +75,26 @@ class Peer(object): logger.log(self.log_level, "%s:%s %s" % (self.ip, self.port, text)) + # Site marks its Peers protected, if it has not enough peers connected. + # This is to be used to prevent disconnecting from peers when doing + # a periodic cleanup. + def markProtected(self, interval=60*20): + self.protected = time.time() + interval + + def isProtected(self): + if self.protected > 0: + if self.protected < time.time(): + self.protected = 0 + return self.protected > 0 + def isConnected(self): + if self.connection and not self.connection.connected: + self.connection = None return self.connection and self.connection.connected + def isTtlExpired(self, ttl): + return (time.time() - self.time_found) > ttl + # Connect to host def connect(self, connection=None): if self.reputation < -10: @@ -115,6 +133,11 @@ class Peer(object): self.connection = None return self.connection + def disconnect(self, reason="Unknown"): + if self.connection: + self.connection.close(reason) + self.connection = None + # Check if we have connection to peer def findConnection(self): if self.connection and self.connection.connected: # We have connection to peer @@ -400,8 +423,7 @@ class Peer(object): if self.site and self in self.site.peers_recent: self.site.peers_recent.remove(self) - if self.connection: - self.connection.close(reason) + self.disconnect(reason) # - EVENTS - diff --git a/src/Site/Site.py b/src/Site/Site.py index fff592cd..b941b035 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -40,6 +40,9 @@ class Site(object): self.log = logging.getLogger("Site:%s" % self.address_short) self.addEventListeners() + self.periodic_maintenance_interval = 60 * 20 + self.periodic_maintenance_timestamp = 0 + self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers_recent = collections.deque(maxlen=150) @@ -853,6 +856,11 @@ class Site(object): if self.isServing(): self.announcer.announce(*args, **kwargs) + # The engine tries to maintain the number of active connections: + # >= getPreferableActiveConnectionCount() + # and + # <= getActiveConnectionCountLimit() + def getPreferableActiveConnectionCount(self): if not self.isServing(): return 0 @@ -874,8 +882,16 @@ class Site(object): if len(self.peers) < 50: count = max(count, 5) + count = min(count, config.connected_limit) + return count + def getActiveConnectionCountLimit(self): + count_above_preferable = 2 + limit = self.getPreferableActiveConnectionCount() + count_above_preferable + limit = min(limit, config.connected_limit) + return limit + def tryConnectingToMorePeers(self, more=1, pex=True, try_harder=False): max_peers = more * 2 + 10 if try_harder: @@ -920,7 +936,7 @@ class Site(object): if num is None: num = self.getPreferableActiveConnectionCount() - need = min(len(self.peers), num, config.connected_limit) + need = min(len(self.peers), num) connected = self.bringConnections( need=need, @@ -935,8 +951,15 @@ class Site(object): pex=pex, try_harder=True) + if connected < num: + self.markConnectedPeersProtected() + return connected + def markConnectedPeersProtected(self): + for peer in site.getConnectedPeers(): + peer.markProtected() + # Return: Probably peers verified to be connectable recently def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True): peers = list(self.peers.values()) @@ -1022,53 +1045,87 @@ class Site(object): back.append(peer) return back - # Cleanup probably dead peers and close connection if too much - def cleanupPeers(self, peers_protected=[]): + def removeDeadPeers(self): peers = list(self.peers.values()) - if len(peers) > 20: - # Cleanup old peers - removed = 0 - if len(peers) > 1000: - ttl = 60 * 60 * 1 - else: - ttl = 60 * 60 * 4 + if len(peers) <= 20: + return - for peer in peers: - if peer.connection and peer.connection.connected: - continue - if peer.connection and not peer.connection.connected: - peer.connection = None # Dead connection - if time.time() - peer.time_found > ttl: # Not found on tracker or via pex in last 4 hour - peer.remove("Time found expired") - removed += 1 - if removed > len(peers) * 0.1: # Don't remove too much at once - break + removed = 0 + if len(peers) > 1000: + ttl = 60 * 60 * 1 + elif len(peers) > 100: + ttl = 60 * 60 * 4 + else: + ttl = 60 * 60 * 8 - if removed: - self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers))) + for peer in peers: + if peer.isConnected() or peer.isProtected(): + continue + if peer.isTtlExpired(ttl): + peer.remove("TTL expired") + removed += 1 + if removed > len(peers) * 0.1: # Don't remove too much at once + break - # Close peers over the limit - closed = 0 - connected_peers = [peer for peer in self.getConnectedPeers() if peer.connection.connected] # Only fully connected peers - need_to_close = len(connected_peers) - config.connected_limit + if removed: + self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers))) - if closed < need_to_close: - # Try to keep connections with more sites + # Cleanup probably dead peers and close connection if too much + def cleanupPeers(self): + self.removeDeadPeers() + + limit = self.getActiveConnectionCountLimit() + connected_peers = [peer for peer in self.getConnectedPeers() if peer.isConnected()] # Only fully connected peers + need_to_close = len(connected_peers) - limit + + if need_to_close > 0: + closed = 0 for peer in sorted(connected_peers, key=lambda peer: min(peer.connection.sites, 5)): - if not peer.connection: + if not peer.isConnected(): continue - if peer.key in peers_protected: + if peer.isProtected(): continue if peer.connection.sites > 5: break - peer.connection.close("Cleanup peers") - peer.connection = None + peer.disconnect("Cleanup peers") closed += 1 if closed >= need_to_close: break - if need_to_close > 0: - self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % (len(connected_peers), need_to_close, closed)) + self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % ( + len(connected_peers), need_to_close, closed)) + + def runPeriodicMaintenance(self, startup=False, force=False): + if not self.isServing(): + return False + + scheduled_time = self.periodic_maintenance_timestamp + self.periodic_maintenance_interval + + if time.time() < scheduled_time and not force: + return False + + self.periodic_maintenance_timestamp = time.time() + + self.log.debug("runPeriodicMaintenance: startup=%s" % startup) + + if not startup: + self.cleanupPeers() + + if self.peers: + with gevent.Timeout(10, exception=False): + self.announcer.announcePex() + + # Last check modification failed + if self.content_updated is False: + self.update() + elif self.bad_files: + self.retryBadFiles() + + self.needConnections(check_site_on_reconnect=True) + + self.periodic_maintenance_timestamp = time.time() + + return True # Send hashfield to peers def sendMyHashfield(self, limit=5): diff --git a/src/util/CircularIterator.py b/src/util/CircularIterator.py new file mode 100644 index 00000000..3466092e --- /dev/null +++ b/src/util/CircularIterator.py @@ -0,0 +1,34 @@ +import random + +class CircularIterator: + def __init__(self): + self.successive_count = 0 + self.last_size = 0 + self.index = -1 + + def next(self, items): + self.last_size = len(items) + + if self.last_size == 0: + return None + + if self.index < 0: + self.index = random.randint(0, self.last_size) + else: + self.index += 1 + + self.index = self.index % self.last_size + + self.successive_count += 1 + + return items[self.index] + + def resetSuccessiveCount(self): + self.successive_count = 0 + + def getSuccessiveCount(self): + return self.successive_count + + def isWrapped(self): + return self.successive_count >= self.last_size + diff --git a/src/util/__init__.py b/src/util/__init__.py index ab8a8b88..f00c1459 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -1,4 +1,5 @@ from .Cached import Cached +from .CircularIterator import CircularIterator from .Event import Event from .Noparallel import Noparallel from .Pooled import Pooled