Implement new logic for waiting for connected peers when updating or publishing a site
This commit is contained in:
parent
b4f94e5022
commit
b512c54f75
5 changed files with 331 additions and 135 deletions
|
@ -462,7 +462,7 @@ if "tracker_storage" not in locals():
|
|||
class SiteAnnouncerPlugin(object):
|
||||
def getTrackers(self):
|
||||
tracker_storage.setSiteAnnouncer(self)
|
||||
tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers())
|
||||
tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers(onlyFullyConnected=True))
|
||||
trackers = super(SiteAnnouncerPlugin, self).getTrackers()
|
||||
shared_trackers = list(tracker_storage.getTrackers().keys())
|
||||
if shared_trackers:
|
||||
|
|
|
@ -3,6 +3,7 @@ import time
|
|||
import random
|
||||
import socket
|
||||
import sys
|
||||
import weakref
|
||||
|
||||
import gevent
|
||||
import gevent.pool
|
||||
|
@ -42,6 +43,8 @@ class FileServer(ConnectionServer):
|
|||
self.update_start_time = 0
|
||||
self.update_sites_task_next_nr = 1
|
||||
|
||||
self.update_threads = weakref.WeakValueDictionary()
|
||||
|
||||
self.passive_mode = None
|
||||
self.active_mode = None
|
||||
self.active_mode_threads = {}
|
||||
|
@ -317,30 +320,23 @@ class FileServer(ConnectionServer):
|
|||
|
||||
def updateSite(self, site, check_files=False, verify_files=False):
|
||||
if not site:
|
||||
return False
|
||||
return site.update2(check_files=check_files, verify_files=verify_files)
|
||||
return
|
||||
site.update2(check_files=check_files, verify_files=verify_files)
|
||||
|
||||
def spawnUpdateSite(self, site, check_files=False, verify_files=False):
|
||||
thread = self.update_pool.spawn(self.updateSite, site,
|
||||
check_files=check_files, verify_files=verify_files)
|
||||
thread.site_address = site.address
|
||||
self.update_threads[site.address] = thread
|
||||
return thread
|
||||
|
||||
def lookupInUpdatePool(self, site_address):
|
||||
thread = self.update_threads.get(site_address, None)
|
||||
if not thread or thread.ready():
|
||||
return None
|
||||
return thread
|
||||
|
||||
def siteIsInUpdatePool(self, site_address):
|
||||
while True:
|
||||
restart = False
|
||||
for thread in list(iter(self.update_pool)):
|
||||
thread_site_address = getattr(thread, 'site_address', None)
|
||||
if not thread_site_address:
|
||||
# Possible race condition in assigning thread.site_address in spawnUpdateSite()
|
||||
# Trying again.
|
||||
self.sleep(0.1)
|
||||
restart = True
|
||||
break
|
||||
if thread_site_address == site_address:
|
||||
return True
|
||||
if not restart:
|
||||
return False
|
||||
return self.lookupInUpdatePool(site_address) is not None
|
||||
|
||||
def invalidateUpdateTime(self, invalid_interval):
|
||||
for address in self.getSiteAddresses():
|
||||
|
|
|
@ -54,6 +54,8 @@ class Peer(object):
|
|||
self.download_bytes = 0 # Bytes downloaded
|
||||
self.download_time = 0 # Time spent to download
|
||||
|
||||
self.protectedRequests = ["getFile", "streamFile", "update", "listModified"]
|
||||
|
||||
def __getattr__(self, key):
|
||||
if key == "hashfield":
|
||||
self.has_hashfield = True
|
||||
|
@ -78,10 +80,8 @@ class Peer(object):
|
|||
|
||||
logger.log(log_level, "%s:%s %s" % (self.ip, self.port, text))
|
||||
|
||||
# Site marks its Peers protected, if it has not enough peers connected.
|
||||
# This is to be used to prevent disconnecting from peers when doing
|
||||
# a periodic cleanup.
|
||||
def markProtected(self, interval=60*20):
|
||||
# Protect connection from being closed by site.cleanupPeers()
|
||||
def markProtected(self, interval=60*2):
|
||||
self.protected = max(self.protected, time.time() + interval)
|
||||
|
||||
def isProtected(self):
|
||||
|
@ -195,6 +195,8 @@ class Peer(object):
|
|||
|
||||
for retry in range(1, 4): # Retry 3 times
|
||||
try:
|
||||
if cmd in self.protectedRequests:
|
||||
self.markProtected()
|
||||
if not self.connection:
|
||||
raise Exception("No connection found")
|
||||
res = self.connection.request(cmd, params, stream_to)
|
||||
|
|
204
src/Site/Site.py
204
src/Site/Site.py
|
@ -28,6 +28,7 @@ from Plugin import PluginManager
|
|||
from File import FileServer
|
||||
from .SiteAnnouncer import SiteAnnouncer
|
||||
from . import SiteManager
|
||||
from . import SiteHelpers
|
||||
|
||||
class ScaledTimeoutHandler:
|
||||
def __init__(self, val_min, val_max, handler=None, scaler=None):
|
||||
|
@ -145,7 +146,6 @@ class BackgroundPublisher:
|
|||
self.site.log.info("Background publisher: Published %s to %s peers", self.inner_path, len(self.published))
|
||||
|
||||
|
||||
|
||||
@PluginManager.acceptPlugins
|
||||
class Site(object):
|
||||
|
||||
|
@ -209,6 +209,10 @@ class Site(object):
|
|||
|
||||
self.announcer = SiteAnnouncer(self) # Announce and get peer list from other nodes
|
||||
|
||||
self.peer_connector = SiteHelpers.PeerConnector(self) # Connect more peers in background by request
|
||||
self.persistent_peer_req = None # The persistent peer requirement, managed by maintenance handler
|
||||
|
||||
|
||||
if not self.settings.get("wrapper_key"): # To auth websocket permissions
|
||||
self.settings["wrapper_key"] = CryptHash.random()
|
||||
self.log.debug("New wrapper key: %s" % self.settings["wrapper_key"])
|
||||
|
@ -753,7 +757,6 @@ class Site(object):
|
|||
if verify_files:
|
||||
check_files = True
|
||||
|
||||
self.updateWebsocket(updating=True)
|
||||
if verify_files:
|
||||
self.updateWebsocket(verifying=True)
|
||||
elif check_files:
|
||||
|
@ -771,16 +774,32 @@ class Site(object):
|
|||
if verify_files:
|
||||
self.settings["verify_files_timestamp"] = time.time()
|
||||
|
||||
if verify_files:
|
||||
self.updateWebsocket(verified=True)
|
||||
elif check_files:
|
||||
self.updateWebsocket(checked=True)
|
||||
|
||||
if not self.isServing():
|
||||
self.updateWebsocket(updated=True)
|
||||
return False
|
||||
|
||||
if announce:
|
||||
self.updateWebsocket(updating=True)
|
||||
self.announce(mode="update", force=True)
|
||||
|
||||
reqs = [
|
||||
self.peer_connector.newReq(4, 4, 30),
|
||||
self.peer_connector.newReq(2, 2, 60),
|
||||
self.peer_connector.newReq(1, 1, 120)
|
||||
]
|
||||
nr_connected_peers = self.waitForPeers(reqs);
|
||||
if nr_connected_peers < 1:
|
||||
return
|
||||
|
||||
self.updateWebsocket(updating=True)
|
||||
|
||||
# Remove files that no longer in content.json
|
||||
self.checkBadFiles()
|
||||
|
||||
if announce:
|
||||
self.announce(mode="update", force=True)
|
||||
|
||||
# Full update, we can reset bad files
|
||||
if check_files and since == 0:
|
||||
self.bad_files = {}
|
||||
|
@ -810,12 +829,6 @@ class Site(object):
|
|||
# To be called from FileServer
|
||||
@util.Noparallel(queue=True, ignore_args=True)
|
||||
def update2(self, check_files=False, verify_files=False):
|
||||
if len(self.peers) < 50:
|
||||
self.announce(mode="update")
|
||||
self.waitForPeers(5, 5, 30);
|
||||
self.waitForPeers(2, 2, 30);
|
||||
self.waitForPeers(1, 1, 60);
|
||||
|
||||
self.update(check_files=check_files, verify_files=verify_files)
|
||||
|
||||
# Update site by redownload all content.json
|
||||
|
@ -894,41 +907,13 @@ class Site(object):
|
|||
background_publisher.finalize()
|
||||
del self.background_publishers[inner_path]
|
||||
|
||||
def waitForPeers_realJob(self, need_nr_peers, need_nr_connected_peers, time_limit):
|
||||
start_time = time.time()
|
||||
for _ in range(time_limit):
|
||||
nr_connected_peers = len(self.getConnectedPeers())
|
||||
nr_peers = len(self.peers)
|
||||
if nr_peers >= need_nr_peers and nr_connected_peers >= need_nr_connected_peers:
|
||||
return nr_connected_peers
|
||||
self.updateWebsocket(connecting_to_peers=nr_connected_peers)
|
||||
self.announce(mode="more", force=True)
|
||||
if not self.isServing():
|
||||
return nr_connected_peers
|
||||
for wait in range(10):
|
||||
self.needConnections(num=need_nr_connected_peers)
|
||||
time.sleep(2)
|
||||
nr_connected_peers = len(self.getConnectedPeers())
|
||||
nr_peers = len(self.peers)
|
||||
self.updateWebsocket(connecting_to_peers=nr_connected_peers)
|
||||
if not self.isServing():
|
||||
return nr_connected_peers
|
||||
if nr_peers >= need_nr_peers and nr_connected_peers >= need_nr_connected_peers:
|
||||
return nr_connected_peers
|
||||
if time.time() - start_time > time_limit:
|
||||
return nr_connected_peers
|
||||
|
||||
return nr_connected_peers
|
||||
|
||||
def waitForPeers(self, need_nr_peers, need_nr_connected_peers, time_limit):
|
||||
nr_connected_peers = self.waitForPeers_realJob(need_nr_peers, need_nr_connected_peers, time_limit)
|
||||
self.updateWebsocket(connected_to_peers=nr_connected_peers)
|
||||
return nr_connected_peers
|
||||
|
||||
def getPeersForForegroundPublishing(self, limit):
|
||||
# Wait for some peers to appear
|
||||
self.waitForPeers(limit, limit / 2, 10) # some of them...
|
||||
self.waitForPeers(1, 1, 60) # or at least one...
|
||||
reqs = [
|
||||
self.peer_connector.newReq(limit, limit / 2, 10), # some of them...
|
||||
self.peer_connector.newReq(1, 1, 60) # or at least one...
|
||||
]
|
||||
self.waitForPeers(reqs)
|
||||
|
||||
peers = self.getConnectedPeers()
|
||||
random.shuffle(peers)
|
||||
|
@ -1206,6 +1191,10 @@ class Site(object):
|
|||
peer = Peer(ip, port, self)
|
||||
self.peers[key] = peer
|
||||
peer.found(source)
|
||||
|
||||
self.peer_connector.processReqs()
|
||||
self.peer_connector.addPeer(peer)
|
||||
|
||||
return peer
|
||||
|
||||
def announce(self, *args, **kwargs):
|
||||
|
@ -1288,76 +1277,54 @@ class Site(object):
|
|||
limit = min(limit, config.connected_limit)
|
||||
return limit
|
||||
|
||||
def tryConnectingToMorePeers(self, more=1, pex=True, try_harder=False):
|
||||
max_peers = more * 2 + 10
|
||||
if try_harder:
|
||||
max_peers += 10000
|
||||
############################################################################
|
||||
|
||||
connected = 0
|
||||
for peer in self.getRecentPeers(max_peers):
|
||||
if not peer.isConnected():
|
||||
if pex:
|
||||
peer.pex()
|
||||
else:
|
||||
peer.ping(timeout=2.0, tryes=1)
|
||||
# Returns the maximum value of current reqs for connections
|
||||
def waitingForConnections(self):
|
||||
self.peer_connector.processReqs()
|
||||
return self.peer_connector.need_nr_connected_peers
|
||||
|
||||
if peer.isConnected():
|
||||
connected += 1
|
||||
|
||||
if connected >= more:
|
||||
break
|
||||
|
||||
return connected
|
||||
|
||||
def bringConnections(self, need=1, update_site_on_reconnect=False, pex=True, try_harder=False):
|
||||
connected = len(self.getConnectedPeers())
|
||||
connected_before = connected
|
||||
|
||||
self.log.debug("Need connections: %s, Current: %s, Total: %s" % (need, connected, len(self.peers)))
|
||||
|
||||
if connected < need:
|
||||
connected += self.tryConnectingToMorePeers(more=(need-connected), pex=pex, try_harder=try_harder)
|
||||
self.log.debug(
|
||||
"Connected before: %s, after: %s. Check site: %s." %
|
||||
(connected_before, connected, update_site_on_reconnect)
|
||||
)
|
||||
|
||||
if update_site_on_reconnect and connected_before == 0 and connected > 0 and self.connection_server.has_internet:
|
||||
self.greenlet_manager.spawn(self.update, check_files=False)
|
||||
|
||||
return connected
|
||||
|
||||
# Keep connections
|
||||
def needConnections(self, num=None, update_site_on_reconnect=False, pex=True):
|
||||
def needConnections(self, num=None, update_site_on_reconnect=False):
|
||||
if not self.connection_server.allowsCreatingConnections():
|
||||
return
|
||||
|
||||
if num is None:
|
||||
num = self.getPreferableActiveConnectionCount()
|
||||
num = min(len(self.peers), num)
|
||||
|
||||
need = min(len(self.peers), num)
|
||||
req = self.peer_connector.newReq(0, num)
|
||||
return req
|
||||
|
||||
connected = self.bringConnections(
|
||||
need=need,
|
||||
update_site_on_reconnect=update_site_on_reconnect,
|
||||
pex=pex,
|
||||
try_harder=False)
|
||||
# Wait for peers to ne known and/or connected and send updates to the UI
|
||||
def waitForPeers(self, reqs):
|
||||
if not reqs:
|
||||
return 0
|
||||
i = 0
|
||||
nr_connected_peers = -1
|
||||
while self.isServing():
|
||||
ready_reqs = list(filter(lambda req: req.ready(), reqs))
|
||||
if len(ready_reqs) == len(reqs):
|
||||
if nr_connected_peers < 0:
|
||||
nr_connected_peers = ready_reqs[0].nr_connected_peers
|
||||
break
|
||||
waiting_reqs = list(filter(lambda req: not req.ready(), reqs))
|
||||
if not waiting_reqs:
|
||||
break
|
||||
waiting_req = waiting_reqs[0]
|
||||
#self.log.debug("waiting_req: %s %s %s", waiting_req.need_nr_connected_peers, waiting_req.nr_connected_peers, waiting_req.expiration_interval)
|
||||
waiting_req.waitHeartbeat(timeout=1.0)
|
||||
if i > 0 and nr_connected_peers != waiting_req.nr_connected_peers:
|
||||
nr_connected_peers = waiting_req.nr_connected_peers
|
||||
self.updateWebsocket(connecting_to_peers=nr_connected_peers)
|
||||
i += 1
|
||||
self.updateWebsocket(connected_to_peers=max(nr_connected_peers, 0))
|
||||
if i > 1:
|
||||
# If we waited some time, pause now for displaying connected_to_peers message in the UI.
|
||||
# This sleep is solely needed for site status updates on ZeroHello to be more cool-looking.
|
||||
gevent.sleep(1)
|
||||
return nr_connected_peers
|
||||
|
||||
if connected < need:
|
||||
self.greenlet_manager.spawnLater(1.0, self.bringConnections,
|
||||
need=need,
|
||||
update_site_on_reconnect=update_site_on_reconnect,
|
||||
pex=pex,
|
||||
try_harder=True)
|
||||
|
||||
if connected < num:
|
||||
self.markConnectedPeersProtected()
|
||||
|
||||
return connected
|
||||
|
||||
def markConnectedPeersProtected(self):
|
||||
for peer in self.getConnectedPeers():
|
||||
peer.markProtected()
|
||||
############################################################################
|
||||
|
||||
# Return: Probably peers verified to be connectable recently
|
||||
def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True):
|
||||
|
@ -1429,15 +1396,26 @@ class Site(object):
|
|||
|
||||
return found[0:need_num]
|
||||
|
||||
def getConnectedPeers(self):
|
||||
# Returns the list of connected peers
|
||||
# By default the result may contain peers chosen optimistically:
|
||||
# If the connection is being established and 20 seconds have not yet passed
|
||||
# since the connection start time, those peers are included in the result.
|
||||
# Set onlyFullyConnected=True for restricting only by fully connected peers.
|
||||
def getConnectedPeers(self, onlyFullyConnected=False):
|
||||
back = []
|
||||
if not self.connection_server:
|
||||
return []
|
||||
|
||||
tor_manager = self.connection_server.tor_manager
|
||||
for connection in self.connection_server.connections:
|
||||
if len(back) >= len(self.peers): # short cut for breaking early; no peers to check left
|
||||
break
|
||||
|
||||
if not connection.connected and time.time() - connection.start_time > 20: # Still not connected after 20s
|
||||
continue
|
||||
if not connection.connected and onlyFullyConnected: # Only fully connected peers
|
||||
continue
|
||||
|
||||
peer = self.peers.get("%s:%s" % (connection.ip, connection.port))
|
||||
if peer:
|
||||
if connection.ip.endswith(".onion") and connection.target_onion and tor_manager.start_onions:
|
||||
|
@ -1479,8 +1457,8 @@ class Site(object):
|
|||
def cleanupPeers(self):
|
||||
self.removeDeadPeers()
|
||||
|
||||
limit = self.getActiveConnectionCountLimit()
|
||||
connected_peers = [peer for peer in self.getConnectedPeers() if peer.isConnected()] # Only fully connected peers
|
||||
limit = max(self.getActiveConnectionCountLimit(), self.waitingForConnections())
|
||||
connected_peers = self.getConnectedPeers(onlyFullyConnected=True)
|
||||
need_to_close = len(connected_peers) - limit
|
||||
|
||||
if need_to_close > 0:
|
||||
|
@ -1526,10 +1504,10 @@ class Site(object):
|
|||
if not startup:
|
||||
self.cleanupPeers()
|
||||
|
||||
self.needConnections(update_site_on_reconnect=True)
|
||||
self.persistent_peer_req = self.needConnections(update_site_on_reconnect=True)
|
||||
self.persistent_peer_req.result_connected.wait(timeout=2.0)
|
||||
|
||||
with gevent.Timeout(10, exception=False):
|
||||
self.announcer.announcePex()
|
||||
#self.announcer.announcePex()
|
||||
|
||||
self.processBackgroundPublishers()
|
||||
|
||||
|
@ -1559,7 +1537,7 @@ class Site(object):
|
|||
return False
|
||||
|
||||
sent = 0
|
||||
connected_peers = self.getConnectedPeers()
|
||||
connected_peers = self.getConnectedPeers(onlyFullyConnected=True)
|
||||
for peer in connected_peers:
|
||||
if peer.sendMyHashfield():
|
||||
sent += 1
|
||||
|
@ -1581,7 +1559,7 @@ class Site(object):
|
|||
|
||||
s = time.time()
|
||||
queried = 0
|
||||
connected_peers = self.getConnectedPeers()
|
||||
connected_peers = self.getConnectedPeers(onlyFullyConnected=True)
|
||||
for peer in connected_peers:
|
||||
if peer.time_hashfield:
|
||||
continue
|
||||
|
|
220
src/Site/SiteHelpers.py
Normal file
220
src/Site/SiteHelpers.py
Normal file
|
@ -0,0 +1,220 @@
|
|||
import time
|
||||
import weakref
|
||||
import gevent
|
||||
|
||||
class ConnectRequirement(object):
|
||||
next_id = 1
|
||||
def __init__(self, need_nr_peers, need_nr_connected_peers, expiration_interval=None):
|
||||
self.need_nr_peers = need_nr_peers # how many total peers we need
|
||||
self.need_nr_connected_peers = need_nr_connected_peers # how many connected peers we need
|
||||
self.result = gevent.event.AsyncResult() # resolves on need_nr_peers condition
|
||||
self.result_connected = gevent.event.AsyncResult() # resolves on need_nr_connected_peers condition
|
||||
|
||||
self.expiration_interval = expiration_interval
|
||||
self.expired = False
|
||||
if expiration_interval:
|
||||
self.expire_at = time.time() + expiration_interval
|
||||
else:
|
||||
self.expire_at = None
|
||||
|
||||
self.nr_peers = -1 # updated PeerConnector()
|
||||
self.nr_connected_peers = -1 # updated PeerConnector()
|
||||
|
||||
self.heartbeat = gevent.event.AsyncResult()
|
||||
|
||||
self.id = type(self).next_id
|
||||
type(self).next_id += 1
|
||||
|
||||
def fulfilled(self):
|
||||
return self.result.ready() and self.result_connected.ready()
|
||||
|
||||
def ready(self):
|
||||
return self.expired or self.fulfilled()
|
||||
|
||||
# Heartbeat send when any of the following happens:
|
||||
# * self.result is set
|
||||
# * self.result_connected is set
|
||||
# * self.nr_peers changed
|
||||
# * self.nr_peers_connected changed
|
||||
# * self.expired is set
|
||||
def waitHeartbeat(self, timeout=None):
|
||||
if self.heartbeat.ready():
|
||||
self.heartbeat = gevent.event.AsyncResult()
|
||||
return self.heartbeat.wait(timeout=timeout)
|
||||
|
||||
def sendHeartbeat(self):
|
||||
self.heartbeat.set_result()
|
||||
if self.heartbeat.ready():
|
||||
self.heartbeat = gevent.event.AsyncResult()
|
||||
|
||||
class PeerConnector(object):
|
||||
|
||||
def __init__(self, site):
|
||||
self.site = site
|
||||
|
||||
self.peer_reqs = weakref.WeakValueDictionary() # How many connected peers we need.
|
||||
# Separate entry for each requirement.
|
||||
# Objects of type ConnectRequirement.
|
||||
self.peer_connector_controller = None # Thread doing the orchestration in background.
|
||||
self.peer_connector_workers = dict() # Threads trying to connect to individual peers.
|
||||
self.peer_connector_worker_limit = 5 # Max nr of workers.
|
||||
self.peer_connector_announcer = None # Thread doing announces in background.
|
||||
|
||||
# Max effective values. Set by processReqs().
|
||||
self.need_nr_peers = 0
|
||||
self.need_nr_connected_peers = 0
|
||||
self.nr_peers = 0 # set by processReqs()
|
||||
self.nr_connected_peers = 0 # set by processReqs2()
|
||||
|
||||
self.peers = list()
|
||||
|
||||
def addReq(self, req):
|
||||
self.peer_reqs[req.id] = req
|
||||
self.processReqs()
|
||||
|
||||
def newReq(self, need_nr_peers, need_nr_connected_peers, expiration_interval=None):
|
||||
req = ConnectRequirement(need_nr_peers, need_nr_connected_peers, expiration_interval=expiration_interval)
|
||||
self.addReq(req)
|
||||
return req
|
||||
|
||||
def processReqs(self, nr_connected_peers=None):
|
||||
nr_peers = len(self.site.peers)
|
||||
self.nr_peers = nr_peers
|
||||
|
||||
need_nr_peers = 0
|
||||
need_nr_connected_peers = 0
|
||||
|
||||
items = list(self.peer_reqs.items())
|
||||
for key, req in items:
|
||||
send_heartbeat = False
|
||||
|
||||
if req.expire_at and req.expire_at < time.time():
|
||||
req.expired = True
|
||||
self.peer_reqs.pop(key, None)
|
||||
send_heartbeat = True
|
||||
elif req.result.ready() and req.result_connected.ready():
|
||||
pass
|
||||
else:
|
||||
if nr_connected_peers is not None:
|
||||
if req.need_nr_peers <= nr_peers and req.need_nr_connected_peers <= nr_connected_peers:
|
||||
req.result.set_result(nr_peers)
|
||||
req.result_connected.set_result(nr_connected_peers)
|
||||
send_heartbeat = True
|
||||
if req.nr_peers != nr_peers or req.nr_connected_peers != nr_connected_peers:
|
||||
req.nr_peers = nr_peers
|
||||
req.nr_connected_peers = nr_connected_peers
|
||||
send_heartbeat = True
|
||||
|
||||
if not (req.result.ready() and req.result_connected.ready()):
|
||||
need_nr_peers = max(need_nr_peers, req.need_nr_peers)
|
||||
need_nr_connected_peers = max(need_nr_connected_peers, req.need_nr_connected_peers)
|
||||
|
||||
if send_heartbeat:
|
||||
req.sendHeartbeat()
|
||||
|
||||
self.need_nr_peers = need_nr_peers
|
||||
self.need_nr_connected_peers = need_nr_connected_peers
|
||||
|
||||
if nr_connected_peers is None:
|
||||
nr_connected_peers = 0
|
||||
if need_nr_peers > nr_peers:
|
||||
self.spawnPeerConnectorAnnouncer();
|
||||
if need_nr_connected_peers > nr_connected_peers:
|
||||
self.spawnPeerConnectorController();
|
||||
|
||||
def processReqs2(self):
|
||||
self.nr_connected_peers = len(self.site.getConnectedPeers(onlyFullyConnected=True))
|
||||
self.processReqs(nr_connected_peers=self.nr_connected_peers)
|
||||
|
||||
# For adding new peers when ConnectorController is working.
|
||||
# While it is iterating over a cached list of peers, there can be a significant lag
|
||||
# for a newly discovered peer to get in sight of the controller.
|
||||
# Suppose most previously known peers are dead and we've just get a few
|
||||
# new peers from a tracker.
|
||||
# So we mix the new peer to the cached list.
|
||||
# When ConnectorController is stopped (self.peers is empty), we just do nothing here.
|
||||
def addPeer(self, peer):
|
||||
if not self.peers:
|
||||
return
|
||||
if peer not in self.peers:
|
||||
self.peers.append(peer)
|
||||
|
||||
def keepGoing(self):
|
||||
return self.site.isServing() and self.site.connection_server.allowsCreatingConnections()
|
||||
|
||||
def peerConnectorWorker(self, peer):
|
||||
if not peer.isConnected():
|
||||
peer.connect()
|
||||
if peer.isConnected():
|
||||
self.processReqs2()
|
||||
|
||||
def peerConnectorController(self):
|
||||
self.peers = list()
|
||||
addendum = 20
|
||||
while self.keepGoing():
|
||||
|
||||
if len(self.site.peers) < 1:
|
||||
# No peers and no way to manage this from this method.
|
||||
# Just give up.
|
||||
break
|
||||
|
||||
self.processReqs2()
|
||||
|
||||
if self.need_nr_connected_peers <= self.nr_connected_peers:
|
||||
# Ok, nobody waits for connected peers.
|
||||
# Done.
|
||||
break
|
||||
|
||||
if len(self.peers) < 1:
|
||||
# refill the peer list
|
||||
self.peers = self.site.getRecentPeers(self.need_nr_connected_peers * 2 + addendum)
|
||||
addendum = addendum * 2 + 50
|
||||
if len(self.peers) <= self.nr_connected_peers:
|
||||
# looks like all known peers are connected
|
||||
# start announcePex() in background and give up
|
||||
self.site.announcer.announcePex()
|
||||
break
|
||||
|
||||
# try connecting to peers
|
||||
while self.keepGoing() and len(self.peer_connector_workers) < self.peer_connector_worker_limit:
|
||||
if len(self.peers) < 1:
|
||||
break
|
||||
|
||||
peer = self.peers.pop(0)
|
||||
|
||||
if peer.isConnected():
|
||||
continue
|
||||
|
||||
thread = self.peer_connector_workers.get(peer, None)
|
||||
if thread:
|
||||
continue
|
||||
|
||||
thread = self.site.spawn(self.peerConnectorWorker, peer)
|
||||
self.peer_connector_workers[peer] = thread
|
||||
thread.link(lambda thread, peer=peer: self.peer_connector_workers.pop(peer, None))
|
||||
|
||||
# wait for more room in self.peer_connector_workers
|
||||
while self.keepGoing() and len(self.peer_connector_workers) >= self.peer_connector_worker_limit:
|
||||
gevent.sleep(2)
|
||||
|
||||
self.peers = list()
|
||||
self.peer_connector_controller = None
|
||||
|
||||
def peerConnectorAnnouncer(self):
|
||||
while self.keepGoing():
|
||||
if self.need_nr_peers <= self.nr_peers:
|
||||
break
|
||||
self.site.announce(mode="more")
|
||||
self.processReqs2()
|
||||
if self.need_nr_peers <= self.nr_peers:
|
||||
break
|
||||
gevent.sleep(10)
|
||||
self.peer_connector_announcer = None
|
||||
|
||||
def spawnPeerConnectorController(self):
|
||||
if self.peer_connector_controller is None or self.peer_connector_controller.ready():
|
||||
self.peer_connector_controller = self.site.spawn(self.peerConnectorController)
|
||||
|
||||
def spawnPeerConnectorAnnouncer(self):
|
||||
if self.peer_connector_announcer is None or self.peer_connector_announcer.ready():
|
||||
self.peer_connector_announcer = self.site.spawn(self.peerConnectorAnnouncer)
|
Loading…
Reference in a new issue