Add new configuration variables and temporarily disable Site.persistent_peer_req

New configuration options:

site_announce_interval_min
site_announce_interval_max

site_peer_check_interval_min
site_peer_check_interval_max

site_update_check_interval_min
site_update_check_interval_max

site_connectable_peer_count_max
site_connectable_peer_count_min

Site.persistent_peer_req is temporarily disabled since it makes excessive pressure on the network when working over TOR and needs some reworking.
This commit is contained in:
Vadim Ushakov 2021-10-27 20:57:44 +07:00
parent 77e0bb3650
commit 168c436b73
3 changed files with 85 additions and 10 deletions

View file

@ -264,10 +264,22 @@ class Config(object):
self.parser.add_argument('--size_limit', help='Default site size limit in MB', default=10, type=int, metavar='limit')
self.parser.add_argument('--file_size_limit', help='Maximum per file size limit in MB', default=10, type=int, metavar='limit')
self.parser.add_argument('--connected_limit', help='Max connected peer per site', default=10, type=int, metavar='connected_limit')
self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit')
self.parser.add_argument('--connected_limit', help='Max number of connected peers per site. Soft limit.', default=10, type=int, metavar='connected_limit')
self.parser.add_argument('--global_connected_limit', help='Max number of connections. Soft limit.', default=512, type=int, metavar='global_connected_limit')
self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers')
self.parser.add_argument('--site_announce_interval_min', help='Site announce interval for the most active sites, in minutes.', default=4, type=int, metavar='site_announce_interval_min')
self.parser.add_argument('--site_announce_interval_max', help='Site announce interval for inactive sites, in minutes.', default=30, type=int, metavar='site_announce_interval_max')
self.parser.add_argument('--site_peer_check_interval_min', help='Connectable peers check interval for the most active sites, in minutes.', default=5, type=int, metavar='site_peer_check_interval_min')
self.parser.add_argument('--site_peer_check_interval_max', help='Connectable peers check interval for inactive sites, in minutes.', default=20, type=int, metavar='site_peer_check_interval_max')
self.parser.add_argument('--site_update_check_interval_min', help='Site update check interval for the most active sites, in minutes.', default=5, type=int, metavar='site_update_check_interval_min')
self.parser.add_argument('--site_update_check_interval_max', help='Site update check interval for inactive sites, in minutes.', default=45, type=int, metavar='site_update_check_interval_max')
self.parser.add_argument('--site_connectable_peer_count_max', help='Search for as many connectable peers for the most active sites', default=10, type=int, metavar='site_connectable_peer_count_max')
self.parser.add_argument('--site_connectable_peer_count_min', help='Search for as many connectable peers for inactive sites', default=2, type=int, metavar='site_connectable_peer_count_min')
self.parser.add_argument('--send_back_lru_size', help='Size of the send back LRU cache', default=5000, type=int, metavar='send_back_lru_size')
self.parser.add_argument('--send_back_limit', help='Send no more than so many files at once back to peer, when we discovered that the peer held older file versions', default=3, type=int, metavar='send_back_limit')

View file

@ -30,6 +30,9 @@ from .SiteAnnouncer import SiteAnnouncer
from . import SiteManager
from . import SiteHelpers
def lerp(val_min, val_max, scale):
return scale * (val_max - val_min) + val_min
class ScaledTimeoutHandler:
def __init__(self, val_min, val_max, handler=None, scaler=None):
self.val_min = val_min
@ -40,7 +43,7 @@ class ScaledTimeoutHandler:
self.log = logging.getLogger("ScaledTimeoutHandler")
def isExpired(self, scale):
interval = scale * (self.val_max - self.val_min) + self.val_min
interval = lerp(self.val_min, self.val_max, scale)
expired_at = self.timestamp + interval
now = time.time()
expired = (now > expired_at)
@ -158,10 +161,19 @@ class Site(object):
self.addEventListeners()
self.periodic_maintenance_handlers = [
ScaledTimeoutHandler(60 * 30, 60 * 2,
ScaledTimeoutHandler(
config.site_announce_interval_max * 60,
config.site_announce_interval_min * 60,
handler=self.periodicMaintenanceHandler_announce,
scaler=self.getAnnounceRating),
ScaledTimeoutHandler(60 * 30, 60 * 5,
ScaledTimeoutHandler(
config.site_peer_check_interval_max * 60,
config.site_peer_check_interval_min * 60,
handler=self.periodicMaintenanceHandler_peer_check,
scaler=self.getAnnounceRating),
ScaledTimeoutHandler(
config.site_update_check_interval_max * 60,
config.site_update_check_interval_min * 60,
handler=self.periodicMaintenanceHandler_general,
scaler=self.getActivityRating)
]
@ -1279,6 +1291,16 @@ class Site(object):
v = [activity_rating, peer_rating, tracker_rating]
return sum(v) / float(len(v))
def getPreferableConnectablePeerCount(self):
if not self.isServing():
return 0
count = lerp(
config.site_connectable_peer_count_min,
config.site_connectable_peer_count_max,
self.getActivityRating(force_safe=True))
return count
# The engine tries to maintain the number of active connections:
# >= getPreferableActiveConnectionCount()
# and
@ -1496,6 +1518,33 @@ class Site(object):
self.log.debug("Connected: %s, Need to close: %s, Closed: %s" % (
len(connected_peers), need_to_close, closed))
def lookForConnectablePeers(self):
num_tries = 2
need_connectable_peers = self.getPreferableConnectablePeerCount()
connectable_peers = 0
reachable_peers = []
for peer in list(self.peers.values()):
if peer.isConnected() or peer.isConnectable():
connectable_peers += 1
elif peer.isReachable():
reachable_peers.append(peer)
if connectable_peers >= need_connectable_peers:
return True
random.shuffle(reachable_peers)
for peer in reachable_peers:
if peer.isConnected() or peer.isConnectable() or peer.removed:
continue
peer.ping()
if peer.isConnected():
peer.pex()
num_tries -= 1
if num_tries < 1:
break
@util.Noparallel(queue=True)
def runPeriodicMaintenance(self, startup=False, force=False):
if not self.isServing():
@ -1519,11 +1568,8 @@ class Site(object):
self.log.debug("periodicMaintenanceHandler_general: startup=%s, force=%s" % (startup, force))
if not startup:
self.cleanupPeers()
self.persistent_peer_req = self.needConnections(update_site_on_reconnect=True)
self.persistent_peer_req.result_connected.wait(timeout=2.0)
#self.persistent_peer_req = self.needConnections(update_site_on_reconnect=True)
#self.persistent_peer_req.result_connected.wait(timeout=2.0)
#self.announcer.announcePex()
@ -1533,6 +1579,22 @@ class Site(object):
return True
def periodicMaintenanceHandler_peer_check(self, startup=False, force=False):
if not self.isServing():
return False
if not self.peers:
return False
self.log.debug("periodicMaintenanceHandler_peer_check: startup=%s, force=%s" % (startup, force))
if not startup:
self.cleanupPeers()
self.lookForConnectablePeers()
return True
def periodicMaintenanceHandler_announce(self, startup=False, force=False):
if not self.isServing():
return False

View file

@ -156,6 +156,7 @@ class PeerConnector(object):
if not peer.isConnected():
peer.connect()
if peer.isConnected():
peer.ping()
self.processReqs2()
def peerConnectorController(self):