diff --git a/plugins/TrackerShare/TrackerSharePlugin.py b/plugins/TrackerShare/TrackerSharePlugin.py index 99ebf692..4fe888fd 100644 --- a/plugins/TrackerShare/TrackerSharePlugin.py +++ b/plugins/TrackerShare/TrackerSharePlugin.py @@ -462,7 +462,7 @@ if "tracker_storage" not in locals(): class SiteAnnouncerPlugin(object): def getTrackers(self): tracker_storage.setSiteAnnouncer(self) - tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers()) + tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers(onlyFullyConnected=True)) trackers = super(SiteAnnouncerPlugin, self).getTrackers() shared_trackers = list(tracker_storage.getTrackers().keys()) if shared_trackers: diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 16afab24..5487d0e9 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -3,6 +3,7 @@ import time import random import socket import sys +import weakref import gevent import gevent.pool @@ -42,6 +43,8 @@ class FileServer(ConnectionServer): self.update_start_time = 0 self.update_sites_task_next_nr = 1 + self.update_threads = weakref.WeakValueDictionary() + self.passive_mode = None self.active_mode = None self.active_mode_threads = {} @@ -317,30 +320,23 @@ class FileServer(ConnectionServer): def updateSite(self, site, check_files=False, verify_files=False): if not site: - return False - return site.update2(check_files=check_files, verify_files=verify_files) + return + site.update2(check_files=check_files, verify_files=verify_files) def spawnUpdateSite(self, site, check_files=False, verify_files=False): thread = self.update_pool.spawn(self.updateSite, site, check_files=check_files, verify_files=verify_files) - thread.site_address = site.address + self.update_threads[site.address] = thread return thread + def lookupInUpdatePool(self, site_address): + thread = self.update_threads.get(site_address, None) + if not thread or thread.ready(): + return None + return thread + def siteIsInUpdatePool(self, site_address): - while True: - restart = False - for thread in list(iter(self.update_pool)): - thread_site_address = getattr(thread, 'site_address', None) - if not thread_site_address: - # Possible race condition in assigning thread.site_address in spawnUpdateSite() - # Trying again. - self.sleep(0.1) - restart = True - break - if thread_site_address == site_address: - return True - if not restart: - return False + return self.lookupInUpdatePool(site_address) is not None def invalidateUpdateTime(self, invalid_interval): for address in self.getSiteAddresses(): diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index 6e0d57af..3637c822 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -54,6 +54,8 @@ class Peer(object): self.download_bytes = 0 # Bytes downloaded self.download_time = 0 # Time spent to download + self.protectedRequests = ["getFile", "streamFile", "update", "listModified"] + def __getattr__(self, key): if key == "hashfield": self.has_hashfield = True @@ -78,10 +80,8 @@ class Peer(object): logger.log(log_level, "%s:%s %s" % (self.ip, self.port, text)) - # Site marks its Peers protected, if it has not enough peers connected. - # This is to be used to prevent disconnecting from peers when doing - # a periodic cleanup. - def markProtected(self, interval=60*20): + # Protect connection from being closed by site.cleanupPeers() + def markProtected(self, interval=60*2): self.protected = max(self.protected, time.time() + interval) def isProtected(self): @@ -195,6 +195,8 @@ class Peer(object): for retry in range(1, 4): # Retry 3 times try: + if cmd in self.protectedRequests: + self.markProtected() if not self.connection: raise Exception("No connection found") res = self.connection.request(cmd, params, stream_to) diff --git a/src/Site/Site.py b/src/Site/Site.py index 881f7d7c..5929617c 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -28,6 +28,7 @@ from Plugin import PluginManager from File import FileServer from .SiteAnnouncer import SiteAnnouncer from . import SiteManager +from . import SiteHelpers class ScaledTimeoutHandler: def __init__(self, val_min, val_max, handler=None, scaler=None): @@ -145,7 +146,6 @@ class BackgroundPublisher: self.site.log.info("Background publisher: Published %s to %s peers", self.inner_path, len(self.published)) - @PluginManager.acceptPlugins class Site(object): @@ -209,6 +209,10 @@ class Site(object): self.announcer = SiteAnnouncer(self) # Announce and get peer list from other nodes + self.peer_connector = SiteHelpers.PeerConnector(self) # Connect more peers in background by request + self.persistent_peer_req = None # The persistent peer requirement, managed by maintenance handler + + if not self.settings.get("wrapper_key"): # To auth websocket permissions self.settings["wrapper_key"] = CryptHash.random() self.log.debug("New wrapper key: %s" % self.settings["wrapper_key"]) @@ -753,7 +757,6 @@ class Site(object): if verify_files: check_files = True - self.updateWebsocket(updating=True) if verify_files: self.updateWebsocket(verifying=True) elif check_files: @@ -771,16 +774,32 @@ class Site(object): if verify_files: self.settings["verify_files_timestamp"] = time.time() + if verify_files: + self.updateWebsocket(verified=True) + elif check_files: + self.updateWebsocket(checked=True) + if not self.isServing(): - self.updateWebsocket(updated=True) return False + if announce: + self.updateWebsocket(updating=True) + self.announce(mode="update", force=True) + + reqs = [ + self.peer_connector.newReq(4, 4, 30), + self.peer_connector.newReq(2, 2, 60), + self.peer_connector.newReq(1, 1, 120) + ] + nr_connected_peers = self.waitForPeers(reqs); + if nr_connected_peers < 1: + return + + self.updateWebsocket(updating=True) + # Remove files that no longer in content.json self.checkBadFiles() - if announce: - self.announce(mode="update", force=True) - # Full update, we can reset bad files if check_files and since == 0: self.bad_files = {} @@ -810,12 +829,6 @@ class Site(object): # To be called from FileServer @util.Noparallel(queue=True, ignore_args=True) def update2(self, check_files=False, verify_files=False): - if len(self.peers) < 50: - self.announce(mode="update") - self.waitForPeers(5, 5, 30); - self.waitForPeers(2, 2, 30); - self.waitForPeers(1, 1, 60); - self.update(check_files=check_files, verify_files=verify_files) # Update site by redownload all content.json @@ -894,41 +907,13 @@ class Site(object): background_publisher.finalize() del self.background_publishers[inner_path] - def waitForPeers_realJob(self, need_nr_peers, need_nr_connected_peers, time_limit): - start_time = time.time() - for _ in range(time_limit): - nr_connected_peers = len(self.getConnectedPeers()) - nr_peers = len(self.peers) - if nr_peers >= need_nr_peers and nr_connected_peers >= need_nr_connected_peers: - return nr_connected_peers - self.updateWebsocket(connecting_to_peers=nr_connected_peers) - self.announce(mode="more", force=True) - if not self.isServing(): - return nr_connected_peers - for wait in range(10): - self.needConnections(num=need_nr_connected_peers) - time.sleep(2) - nr_connected_peers = len(self.getConnectedPeers()) - nr_peers = len(self.peers) - self.updateWebsocket(connecting_to_peers=nr_connected_peers) - if not self.isServing(): - return nr_connected_peers - if nr_peers >= need_nr_peers and nr_connected_peers >= need_nr_connected_peers: - return nr_connected_peers - if time.time() - start_time > time_limit: - return nr_connected_peers - - return nr_connected_peers - - def waitForPeers(self, need_nr_peers, need_nr_connected_peers, time_limit): - nr_connected_peers = self.waitForPeers_realJob(need_nr_peers, need_nr_connected_peers, time_limit) - self.updateWebsocket(connected_to_peers=nr_connected_peers) - return nr_connected_peers - def getPeersForForegroundPublishing(self, limit): # Wait for some peers to appear - self.waitForPeers(limit, limit / 2, 10) # some of them... - self.waitForPeers(1, 1, 60) # or at least one... + reqs = [ + self.peer_connector.newReq(limit, limit / 2, 10), # some of them... + self.peer_connector.newReq(1, 1, 60) # or at least one... + ] + self.waitForPeers(reqs) peers = self.getConnectedPeers() random.shuffle(peers) @@ -1206,6 +1191,10 @@ class Site(object): peer = Peer(ip, port, self) self.peers[key] = peer peer.found(source) + + self.peer_connector.processReqs() + self.peer_connector.addPeer(peer) + return peer def announce(self, *args, **kwargs): @@ -1288,76 +1277,54 @@ class Site(object): limit = min(limit, config.connected_limit) return limit - def tryConnectingToMorePeers(self, more=1, pex=True, try_harder=False): - max_peers = more * 2 + 10 - if try_harder: - max_peers += 10000 + ############################################################################ - connected = 0 - for peer in self.getRecentPeers(max_peers): - if not peer.isConnected(): - if pex: - peer.pex() - else: - peer.ping(timeout=2.0, tryes=1) + # Returns the maximum value of current reqs for connections + def waitingForConnections(self): + self.peer_connector.processReqs() + return self.peer_connector.need_nr_connected_peers - if peer.isConnected(): - connected += 1 - - if connected >= more: - break - - return connected - - def bringConnections(self, need=1, update_site_on_reconnect=False, pex=True, try_harder=False): - connected = len(self.getConnectedPeers()) - connected_before = connected - - self.log.debug("Need connections: %s, Current: %s, Total: %s" % (need, connected, len(self.peers))) - - if connected < need: - connected += self.tryConnectingToMorePeers(more=(need-connected), pex=pex, try_harder=try_harder) - self.log.debug( - "Connected before: %s, after: %s. Check site: %s." % - (connected_before, connected, update_site_on_reconnect) - ) - - if update_site_on_reconnect and connected_before == 0 and connected > 0 and self.connection_server.has_internet: - self.greenlet_manager.spawn(self.update, check_files=False) - - return connected - - # Keep connections - def needConnections(self, num=None, update_site_on_reconnect=False, pex=True): + def needConnections(self, num=None, update_site_on_reconnect=False): if not self.connection_server.allowsCreatingConnections(): return if num is None: num = self.getPreferableActiveConnectionCount() + num = min(len(self.peers), num) - need = min(len(self.peers), num) + req = self.peer_connector.newReq(0, num) + return req - connected = self.bringConnections( - need=need, - update_site_on_reconnect=update_site_on_reconnect, - pex=pex, - try_harder=False) + # Wait for peers to ne known and/or connected and send updates to the UI + def waitForPeers(self, reqs): + if not reqs: + return 0 + i = 0 + nr_connected_peers = -1 + while self.isServing(): + ready_reqs = list(filter(lambda req: req.ready(), reqs)) + if len(ready_reqs) == len(reqs): + if nr_connected_peers < 0: + nr_connected_peers = ready_reqs[0].nr_connected_peers + break + waiting_reqs = list(filter(lambda req: not req.ready(), reqs)) + if not waiting_reqs: + break + waiting_req = waiting_reqs[0] + #self.log.debug("waiting_req: %s %s %s", waiting_req.need_nr_connected_peers, waiting_req.nr_connected_peers, waiting_req.expiration_interval) + waiting_req.waitHeartbeat(timeout=1.0) + if i > 0 and nr_connected_peers != waiting_req.nr_connected_peers: + nr_connected_peers = waiting_req.nr_connected_peers + self.updateWebsocket(connecting_to_peers=nr_connected_peers) + i += 1 + self.updateWebsocket(connected_to_peers=max(nr_connected_peers, 0)) + if i > 1: + # If we waited some time, pause now for displaying connected_to_peers message in the UI. + # This sleep is solely needed for site status updates on ZeroHello to be more cool-looking. + gevent.sleep(1) + return nr_connected_peers - if connected < need: - self.greenlet_manager.spawnLater(1.0, self.bringConnections, - need=need, - update_site_on_reconnect=update_site_on_reconnect, - pex=pex, - try_harder=True) - - if connected < num: - self.markConnectedPeersProtected() - - return connected - - def markConnectedPeersProtected(self): - for peer in self.getConnectedPeers(): - peer.markProtected() + ############################################################################ # Return: Probably peers verified to be connectable recently def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True): @@ -1429,15 +1396,26 @@ class Site(object): return found[0:need_num] - def getConnectedPeers(self): + # Returns the list of connected peers + # By default the result may contain peers chosen optimistically: + # If the connection is being established and 20 seconds have not yet passed + # since the connection start time, those peers are included in the result. + # Set onlyFullyConnected=True for restricting only by fully connected peers. + def getConnectedPeers(self, onlyFullyConnected=False): back = [] if not self.connection_server: return [] tor_manager = self.connection_server.tor_manager for connection in self.connection_server.connections: + if len(back) >= len(self.peers): # short cut for breaking early; no peers to check left + break + if not connection.connected and time.time() - connection.start_time > 20: # Still not connected after 20s continue + if not connection.connected and onlyFullyConnected: # Only fully connected peers + continue + peer = self.peers.get("%s:%s" % (connection.ip, connection.port)) if peer: if connection.ip.endswith(".onion") and connection.target_onion and tor_manager.start_onions: @@ -1479,8 +1457,8 @@ class Site(object): def cleanupPeers(self): self.removeDeadPeers() - limit = self.getActiveConnectionCountLimit() - connected_peers = [peer for peer in self.getConnectedPeers() if peer.isConnected()] # Only fully connected peers + limit = max(self.getActiveConnectionCountLimit(), self.waitingForConnections()) + connected_peers = self.getConnectedPeers(onlyFullyConnected=True) need_to_close = len(connected_peers) - limit if need_to_close > 0: @@ -1526,10 +1504,10 @@ class Site(object): if not startup: self.cleanupPeers() - self.needConnections(update_site_on_reconnect=True) + self.persistent_peer_req = self.needConnections(update_site_on_reconnect=True) + self.persistent_peer_req.result_connected.wait(timeout=2.0) - with gevent.Timeout(10, exception=False): - self.announcer.announcePex() + #self.announcer.announcePex() self.processBackgroundPublishers() @@ -1559,7 +1537,7 @@ class Site(object): return False sent = 0 - connected_peers = self.getConnectedPeers() + connected_peers = self.getConnectedPeers(onlyFullyConnected=True) for peer in connected_peers: if peer.sendMyHashfield(): sent += 1 @@ -1581,7 +1559,7 @@ class Site(object): s = time.time() queried = 0 - connected_peers = self.getConnectedPeers() + connected_peers = self.getConnectedPeers(onlyFullyConnected=True) for peer in connected_peers: if peer.time_hashfield: continue diff --git a/src/Site/SiteHelpers.py b/src/Site/SiteHelpers.py new file mode 100644 index 00000000..65f0530b --- /dev/null +++ b/src/Site/SiteHelpers.py @@ -0,0 +1,220 @@ +import time +import weakref +import gevent + +class ConnectRequirement(object): + next_id = 1 + def __init__(self, need_nr_peers, need_nr_connected_peers, expiration_interval=None): + self.need_nr_peers = need_nr_peers # how many total peers we need + self.need_nr_connected_peers = need_nr_connected_peers # how many connected peers we need + self.result = gevent.event.AsyncResult() # resolves on need_nr_peers condition + self.result_connected = gevent.event.AsyncResult() # resolves on need_nr_connected_peers condition + + self.expiration_interval = expiration_interval + self.expired = False + if expiration_interval: + self.expire_at = time.time() + expiration_interval + else: + self.expire_at = None + + self.nr_peers = -1 # updated PeerConnector() + self.nr_connected_peers = -1 # updated PeerConnector() + + self.heartbeat = gevent.event.AsyncResult() + + self.id = type(self).next_id + type(self).next_id += 1 + + def fulfilled(self): + return self.result.ready() and self.result_connected.ready() + + def ready(self): + return self.expired or self.fulfilled() + + # Heartbeat send when any of the following happens: + # * self.result is set + # * self.result_connected is set + # * self.nr_peers changed + # * self.nr_peers_connected changed + # * self.expired is set + def waitHeartbeat(self, timeout=None): + if self.heartbeat.ready(): + self.heartbeat = gevent.event.AsyncResult() + return self.heartbeat.wait(timeout=timeout) + + def sendHeartbeat(self): + self.heartbeat.set_result() + if self.heartbeat.ready(): + self.heartbeat = gevent.event.AsyncResult() + +class PeerConnector(object): + + def __init__(self, site): + self.site = site + + self.peer_reqs = weakref.WeakValueDictionary() # How many connected peers we need. + # Separate entry for each requirement. + # Objects of type ConnectRequirement. + self.peer_connector_controller = None # Thread doing the orchestration in background. + self.peer_connector_workers = dict() # Threads trying to connect to individual peers. + self.peer_connector_worker_limit = 5 # Max nr of workers. + self.peer_connector_announcer = None # Thread doing announces in background. + + # Max effective values. Set by processReqs(). + self.need_nr_peers = 0 + self.need_nr_connected_peers = 0 + self.nr_peers = 0 # set by processReqs() + self.nr_connected_peers = 0 # set by processReqs2() + + self.peers = list() + + def addReq(self, req): + self.peer_reqs[req.id] = req + self.processReqs() + + def newReq(self, need_nr_peers, need_nr_connected_peers, expiration_interval=None): + req = ConnectRequirement(need_nr_peers, need_nr_connected_peers, expiration_interval=expiration_interval) + self.addReq(req) + return req + + def processReqs(self, nr_connected_peers=None): + nr_peers = len(self.site.peers) + self.nr_peers = nr_peers + + need_nr_peers = 0 + need_nr_connected_peers = 0 + + items = list(self.peer_reqs.items()) + for key, req in items: + send_heartbeat = False + + if req.expire_at and req.expire_at < time.time(): + req.expired = True + self.peer_reqs.pop(key, None) + send_heartbeat = True + elif req.result.ready() and req.result_connected.ready(): + pass + else: + if nr_connected_peers is not None: + if req.need_nr_peers <= nr_peers and req.need_nr_connected_peers <= nr_connected_peers: + req.result.set_result(nr_peers) + req.result_connected.set_result(nr_connected_peers) + send_heartbeat = True + if req.nr_peers != nr_peers or req.nr_connected_peers != nr_connected_peers: + req.nr_peers = nr_peers + req.nr_connected_peers = nr_connected_peers + send_heartbeat = True + + if not (req.result.ready() and req.result_connected.ready()): + need_nr_peers = max(need_nr_peers, req.need_nr_peers) + need_nr_connected_peers = max(need_nr_connected_peers, req.need_nr_connected_peers) + + if send_heartbeat: + req.sendHeartbeat() + + self.need_nr_peers = need_nr_peers + self.need_nr_connected_peers = need_nr_connected_peers + + if nr_connected_peers is None: + nr_connected_peers = 0 + if need_nr_peers > nr_peers: + self.spawnPeerConnectorAnnouncer(); + if need_nr_connected_peers > nr_connected_peers: + self.spawnPeerConnectorController(); + + def processReqs2(self): + self.nr_connected_peers = len(self.site.getConnectedPeers(onlyFullyConnected=True)) + self.processReqs(nr_connected_peers=self.nr_connected_peers) + + # For adding new peers when ConnectorController is working. + # While it is iterating over a cached list of peers, there can be a significant lag + # for a newly discovered peer to get in sight of the controller. + # Suppose most previously known peers are dead and we've just get a few + # new peers from a tracker. + # So we mix the new peer to the cached list. + # When ConnectorController is stopped (self.peers is empty), we just do nothing here. + def addPeer(self, peer): + if not self.peers: + return + if peer not in self.peers: + self.peers.append(peer) + + def keepGoing(self): + return self.site.isServing() and self.site.connection_server.allowsCreatingConnections() + + def peerConnectorWorker(self, peer): + if not peer.isConnected(): + peer.connect() + if peer.isConnected(): + self.processReqs2() + + def peerConnectorController(self): + self.peers = list() + addendum = 20 + while self.keepGoing(): + + if len(self.site.peers) < 1: + # No peers and no way to manage this from this method. + # Just give up. + break + + self.processReqs2() + + if self.need_nr_connected_peers <= self.nr_connected_peers: + # Ok, nobody waits for connected peers. + # Done. + break + + if len(self.peers) < 1: + # refill the peer list + self.peers = self.site.getRecentPeers(self.need_nr_connected_peers * 2 + addendum) + addendum = addendum * 2 + 50 + if len(self.peers) <= self.nr_connected_peers: + # looks like all known peers are connected + # start announcePex() in background and give up + self.site.announcer.announcePex() + break + + # try connecting to peers + while self.keepGoing() and len(self.peer_connector_workers) < self.peer_connector_worker_limit: + if len(self.peers) < 1: + break + + peer = self.peers.pop(0) + + if peer.isConnected(): + continue + + thread = self.peer_connector_workers.get(peer, None) + if thread: + continue + + thread = self.site.spawn(self.peerConnectorWorker, peer) + self.peer_connector_workers[peer] = thread + thread.link(lambda thread, peer=peer: self.peer_connector_workers.pop(peer, None)) + + # wait for more room in self.peer_connector_workers + while self.keepGoing() and len(self.peer_connector_workers) >= self.peer_connector_worker_limit: + gevent.sleep(2) + + self.peers = list() + self.peer_connector_controller = None + + def peerConnectorAnnouncer(self): + while self.keepGoing(): + if self.need_nr_peers <= self.nr_peers: + break + self.site.announce(mode="more") + self.processReqs2() + if self.need_nr_peers <= self.nr_peers: + break + gevent.sleep(10) + self.peer_connector_announcer = None + + def spawnPeerConnectorController(self): + if self.peer_connector_controller is None or self.peer_connector_controller.ready(): + self.peer_connector_controller = self.site.spawn(self.peerConnectorController) + + def spawnPeerConnectorAnnouncer(self): + if self.peer_connector_announcer is None or self.peer_connector_announcer.ready(): + self.peer_connector_announcer = self.site.spawn(self.peerConnectorAnnouncer)