diff --git a/src/Config.py b/src/Config.py index 26fbafb2..15a0c87f 100644 --- a/src/Config.py +++ b/src/Config.py @@ -264,10 +264,22 @@ class Config(object): self.parser.add_argument('--size_limit', help='Default site size limit in MB', default=10, type=int, metavar='limit') self.parser.add_argument('--file_size_limit', help='Maximum per file size limit in MB', default=10, type=int, metavar='limit') - self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=10, type=int, metavar='connected_limit') - self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit') + self.parser.add_argument('--connected_limit', help='Max number of connected peers per site. Soft limit.', default=10, type=int, metavar='connected_limit') + self.parser.add_argument('--global_connected_limit', help='Max number of connections. Soft limit.', default=512, type=int, metavar='global_connected_limit') self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers') + self.parser.add_argument('--site_announce_interval_min', help='Site announce interval for the most active sites, in minutes.', default=4, type=int, metavar='site_announce_interval_min') + self.parser.add_argument('--site_announce_interval_max', help='Site announce interval for inactive sites, in minutes.', default=30, type=int, metavar='site_announce_interval_max') + + self.parser.add_argument('--site_peer_check_interval_min', help='Connectable peers check interval for the most active sites, in minutes.', default=5, type=int, metavar='site_peer_check_interval_min') + self.parser.add_argument('--site_peer_check_interval_max', help='Connectable peers check interval for inactive sites, in minutes.', default=20, type=int, metavar='site_peer_check_interval_max') + + self.parser.add_argument('--site_update_check_interval_min', help='Site update check interval for the most active sites, in minutes.', default=5, type=int, metavar='site_update_check_interval_min') + self.parser.add_argument('--site_update_check_interval_max', help='Site update check interval for inactive sites, in minutes.', default=45, type=int, metavar='site_update_check_interval_max') + + self.parser.add_argument('--site_connectable_peer_count_max', help='Search for as many connectable peers for the most active sites', default=10, type=int, metavar='site_connectable_peer_count_max') + self.parser.add_argument('--site_connectable_peer_count_min', help='Search for as many connectable peers for inactive sites', default=2, type=int, metavar='site_connectable_peer_count_min') + self.parser.add_argument('--send_back_lru_size', help='Size of the send back LRU cache', default=5000, type=int, metavar='send_back_lru_size') self.parser.add_argument('--send_back_limit', help='Send no more than so many files at once back to peer, when we discovered that the peer held older file versions', default=3, type=int, metavar='send_back_limit') diff --git a/src/Site/Site.py b/src/Site/Site.py index 73bb01dc..0cea4733 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -30,6 +30,9 @@ from .SiteAnnouncer import SiteAnnouncer from . import SiteManager from . import SiteHelpers +def lerp(val_min, val_max, scale): + return scale * (val_max - val_min) + val_min + class ScaledTimeoutHandler: def __init__(self, val_min, val_max, handler=None, scaler=None): self.val_min = val_min @@ -40,7 +43,7 @@ class ScaledTimeoutHandler: self.log = logging.getLogger("ScaledTimeoutHandler") def isExpired(self, scale): - interval = scale * (self.val_max - self.val_min) + self.val_min + interval = lerp(self.val_min, self.val_max, scale) expired_at = self.timestamp + interval now = time.time() expired = (now > expired_at) @@ -158,10 +161,19 @@ class Site(object): self.addEventListeners() self.periodic_maintenance_handlers = [ - ScaledTimeoutHandler(60 * 30, 60 * 2, + ScaledTimeoutHandler( + config.site_announce_interval_max * 60, + config.site_announce_interval_min * 60, handler=self.periodicMaintenanceHandler_announce, scaler=self.getAnnounceRating), - ScaledTimeoutHandler(60 * 30, 60 * 5, + ScaledTimeoutHandler( + config.site_peer_check_interval_max * 60, + config.site_peer_check_interval_min * 60, + handler=self.periodicMaintenanceHandler_peer_check, + scaler=self.getAnnounceRating), + ScaledTimeoutHandler( + config.site_update_check_interval_max * 60, + config.site_update_check_interval_min * 60, handler=self.periodicMaintenanceHandler_general, scaler=self.getActivityRating) ] @@ -1279,6 +1291,16 @@ class Site(object): v = [activity_rating, peer_rating, tracker_rating] return sum(v) / float(len(v)) + def getPreferableConnectablePeerCount(self): + if not self.isServing(): + return 0 + + count = lerp( + config.site_connectable_peer_count_min, + config.site_connectable_peer_count_max, + self.getActivityRating(force_safe=True)) + return count + # The engine tries to maintain the number of active connections: # >= getPreferableActiveConnectionCount() # and @@ -1496,6 +1518,33 @@ class Site(object): self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % ( len(connected_peers), need_to_close, closed)) + def lookForConnectablePeers(self): + num_tries = 2 + need_connectable_peers = self.getPreferableConnectablePeerCount() + + connectable_peers = 0 + reachable_peers = [] + + for peer in list(self.peers.values()): + if peer.isConnected() or peer.isConnectable(): + connectable_peers += 1 + elif peer.isReachable(): + reachable_peers.append(peer) + if connectable_peers >= need_connectable_peers: + return True + + random.shuffle(reachable_peers) + + for peer in reachable_peers: + if peer.isConnected() or peer.isConnectable() or peer.removed: + continue + peer.ping() + if peer.isConnected(): + peer.pex() + num_tries -= 1 + if num_tries < 1: + break + @util.Noparallel(queue=True) def runPeriodicMaintenance(self, startup=False, force=False): if not self.isServing(): @@ -1519,11 +1568,8 @@ class Site(object): self.log.debug("periodicMaintenanceHandler_general: startup=%s, force=%s" % (startup, force)) - if not startup: - self.cleanupPeers() - - self.persistent_peer_req = self.needConnections(update_site_on_reconnect=True) - self.persistent_peer_req.result_connected.wait(timeout=2.0) + #self.persistent_peer_req = self.needConnections(update_site_on_reconnect=True) + #self.persistent_peer_req.result_connected.wait(timeout=2.0) #self.announcer.announcePex() @@ -1533,6 +1579,22 @@ class Site(object): return True + def periodicMaintenanceHandler_peer_check(self, startup=False, force=False): + if not self.isServing(): + return False + + if not self.peers: + return False + + self.log.debug("periodicMaintenanceHandler_peer_check: startup=%s, force=%s" % (startup, force)) + + if not startup: + self.cleanupPeers() + + self.lookForConnectablePeers() + + return True + def periodicMaintenanceHandler_announce(self, startup=False, force=False): if not self.isServing(): return False diff --git a/src/Site/SiteHelpers.py b/src/Site/SiteHelpers.py index 53105f65..90a298cf 100644 --- a/src/Site/SiteHelpers.py +++ b/src/Site/SiteHelpers.py @@ -156,6 +156,7 @@ class PeerConnector(object): if not peer.isConnected(): peer.connect() if peer.isConnected(): + peer.ping() self.processReqs2() def peerConnectorController(self):