From 645f3ba34ab37b508d4da7d0d222d963d7908a3c Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Tue, 26 Oct 2021 17:38:40 +0700 Subject: [PATCH] Reorganization of Peer class and peer-related Site's methods --- plugins/TrackerShare/TrackerSharePlugin.py | 2 +- src/Connection/Connection.py | 9 +- src/Connection/ConnectionServer.py | 14 +++ src/Peer/Peer.py | 66 +++++++++--- src/Site/Site.py | 118 ++++++++++----------- src/Site/SiteAnnouncer.py | 2 +- src/Site/SiteHelpers.py | 22 +++- 7 files changed, 150 insertions(+), 83 deletions(-) diff --git a/plugins/TrackerShare/TrackerSharePlugin.py b/plugins/TrackerShare/TrackerSharePlugin.py index 4fe888fd..42c79422 100644 --- a/plugins/TrackerShare/TrackerSharePlugin.py +++ b/plugins/TrackerShare/TrackerSharePlugin.py @@ -462,7 +462,7 @@ if "tracker_storage" not in locals(): class SiteAnnouncerPlugin(object): def getTrackers(self): tracker_storage.setSiteAnnouncer(self) - tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers(onlyFullyConnected=True)) + tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers(only_fully_connected=True)) trackers = super(SiteAnnouncerPlugin, self).getTrackers() shared_trackers = list(tracker_storage.getTrackers().keys()) if shared_trackers: diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 504daf58..40519b7f 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -124,12 +124,13 @@ class Connection(object): self.event_connected = gevent.event.AsyncResult() self.type = "out" + + unreachability = self.server.getIpUnreachability(self.ip) + if unreachability: + raise Exception(unreachability) + if self.ip_type == "onion": - if not self.server.tor_manager or not self.server.tor_manager.enabled: - raise Exception("Can't connect to onion addresses, no Tor controller present") self.sock = self.server.tor_manager.createSocket(self.ip, self.port) - elif config.tor == "always" and helper.isPrivateIp(self.ip) and self.ip not in config.ip_local: - raise Exception("Can't connect to local IPs in Tor: always mode") elif config.trackers_proxy != "disable" and config.tor != "always" and self.is_tracker_connection: if config.trackers_proxy == "tor": self.sock = self.server.tor_manager.createSocket(self.ip, self.port) diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 03a93daa..16834ff5 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -519,3 +519,17 @@ class ConnectionServer(object): mid = int(len(corrections) / 2 - 1) median = (corrections[mid - 1] + corrections[mid] + corrections[mid + 1]) / 3 return median + + # Checks if a network address can be reachable in the current configuration + # and returs a string describing why it cannot. + # If the network address can be reachable, returns False. + def getIpUnreachability(self, ip): + ip_type = helper.getIpType(ip) + if ip_type == 'onion' and not self.tor_manager.enabled: + return "Can't connect to onion addresses, no Tor controller present" + if config.tor == "always" and helper.isPrivateIp(ip) and ip not in config.ip_local: + return "Can't connect to local IPs in Tor: always mode" + return False + + def isIpReachable(self, ip): + return self.getIpUnreachability(ip) == False diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index 72e9c47f..28a7220b 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -32,6 +32,10 @@ class Peer(object): self.site = site self.key = "%s:%s" % (ip, port) + self.ip_type = helper.getIpType(ip) + + self.removed = False + self.log_level = logging.DEBUG self.connection_error_log_level = logging.DEBUG @@ -41,7 +45,7 @@ class Peer(object): self.time_hashfield = None # Last time peer's hashfiled downloaded self.time_my_hashfield_sent = None # Last time my hashfield sent to peer self.time_found = time.time() # Time of last found in the torrent tracker - self.time_response = None # Time of last successful response from peer + self.time_response = 0 # Time of last successful response from peer self.time_added = time.time() self.last_ping = None # Last response time for ping self.last_pex = 0 # Last query/response time for pex @@ -49,6 +53,7 @@ class Peer(object): self.reputation = 0 # More likely to connect if larger self.last_content_json_update = 0.0 # Modify date of last received content.json self.protected = 0 + self.reachable = False self.connection_error = 0 # Series of connection error self.hash_failed = 0 # Number of bad files from peer @@ -57,6 +62,8 @@ class Peer(object): self.protectedRequests = ["getFile", "streamFile", "update", "listModified"] + self.updateReachable() + def __getattr__(self, key): if key == "hashfield": self.has_hashfield = True @@ -97,7 +104,36 @@ class Peer(object): return self.connection and self.connection.connected def isTtlExpired(self, ttl): - return (time.time() - self.time_found) > ttl + last_activity = max(self.time_found, self.time_response) + return (time.time() - last_activity) > ttl + + def isReachable(self): + return self.reachable + + def updateReachable(self): + connection_server = self.getConnectionServer() + if not self.port: + self.reachable = False + else: + self.reachable = connection_server.isIpReachable(self.ip) + + # Peer proved to to be connectable recently + def isConnectable(self): + if self.connection_error >= 1: # The last connection attempt failed + return False + if time.time() - self.time_response > 60 * 60 * 2: # Last successful response more than 2 hours ago + return False + return self.isReachable() + + def getConnectionServer(self): + if self.connection_server: + connection_server = self.connection_server + elif self.site: + connection_server = self.site.connection_server + else: + import main + connection_server = main.file_server + return connection_server # Connect to host def connect(self, connection=None): @@ -120,13 +156,7 @@ class Peer(object): self.connection = None try: - if self.connection_server: - connection_server = self.connection_server - elif self.site: - connection_server = self.site.connection_server - else: - import main - connection_server = main.file_server + connection_server = self.getConnectionServer() self.connection = connection_server.getConnection(self.ip, self.port, site=self.site, is_tracker_connection=self.is_tracker_connection) if self.connection and self.connection.connected: self.reputation += 1 @@ -183,6 +213,7 @@ class Peer(object): if source in ("tracker", "local"): self.site.peers_recent.appendleft(self) self.time_found = time.time() + self.updateReachable() # Send a command to peer and return response value def request(self, cmd, params={}, stream_to=None): @@ -355,6 +386,8 @@ class Peer(object): # List modified files since the date # Return: {inner_path: modification date,...} def listModified(self, since): + if self.removed: + return False return self.request("listModified", {"since": since, "site": self.site.address}) def updateHashfield(self, force=False): @@ -430,12 +463,11 @@ class Peer(object): # Stop and remove from site def remove(self, reason="Removing"): - self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) - if self.site and self.key in self.site.peers: - del(self.site.peers[self.key]) - - if self.site and self in self.site.peers_recent: - self.site.peers_recent.remove(self) + self.removed = True + self.log("Removing peer with reason: <%s>. Connection error: %s, Hash failed: %s" % (reason, self.connection_error, self.hash_failed)) + if self.site: + self.site.deregisterPeer(self) + self.site = None self.disconnect(reason) @@ -443,6 +475,8 @@ class Peer(object): # On connection error def onConnectionError(self, reason="Unknown"): + if not self.getConnectionServer().isInternetOnline(): + return self.connection_error += 1 if self.site and len(self.site.peers) > 200: limit = 3 @@ -450,7 +484,7 @@ class Peer(object): limit = 6 self.reputation -= 1 if self.connection_error >= limit: # Dead peer - self.remove("Peer connection: %s" % reason) + self.remove("Connection error limit reached: %s. Provided message: %s" % (limit, reason)) # Done working with peer def onWorkerDone(self): diff --git a/src/Site/Site.py b/src/Site/Site.py index 456d02f1..d4f2ad62 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -913,7 +913,7 @@ class Site(object): self.peer_connector.newReq(limit, limit / 2, 10), # some of them... self.peer_connector.newReq(1, 1, 60) # or at least one... ] - self.waitForPeers(reqs) + self.waitForPeers(reqs, update_websocket=False) peers = self.getConnectedPeers() random.shuffle(peers) @@ -1197,6 +1197,15 @@ class Site(object): return peer + # Called from peer.remove to erase links to peer + def deregisterPeer(self, peer): + self.peers.pop(peer.key, None) + try: + self.peers_recent.remove(peer) + except: + pass + self.peer_connector.deregisterPeer(peer) + def announce(self, *args, **kwargs): if self.isServing(): self.announcer.announce(*args, **kwargs) @@ -1295,8 +1304,9 @@ class Site(object): req = self.peer_connector.newReq(0, num) return req - # Wait for peers to ne known and/or connected and send updates to the UI - def waitForPeers(self, reqs): + # Wait for peers to be discovered and/or connected according to reqs + # and send updates to the UI + def waitForPeers(self, reqs, update_websocket=True): if not reqs: return 0 i = 0 @@ -1315,84 +1325,72 @@ class Site(object): waiting_req.waitHeartbeat(timeout=1.0) if i > 0 and nr_connected_peers != waiting_req.nr_connected_peers: nr_connected_peers = waiting_req.nr_connected_peers - self.updateWebsocket(connecting_to_peers=nr_connected_peers) + if update_websocket: + self.updateWebsocket(connecting_to_peers=nr_connected_peers) i += 1 - self.updateWebsocket(connected_to_peers=max(nr_connected_peers, 0)) - if i > 1: - # If we waited some time, pause now for displaying connected_to_peers message in the UI. - # This sleep is solely needed for site status updates on ZeroHello to be more cool-looking. - gevent.sleep(1) + if update_websocket: + self.updateWebsocket(connected_to_peers=max(nr_connected_peers, 0)) + if i > 1: + # If we waited some time, pause now for displaying connected_to_peers message in the UI. + # This sleep is solely needed for site status updates on ZeroHello to be more cool-looking. + gevent.sleep(1) return nr_connected_peers ############################################################################ - # Return: Probably peers verified to be connectable recently + # Return: Peers verified to be connectable recently, or if not enough, other peers as well def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True): peers = list(self.peers.values()) - found = [] + random.shuffle(peers) + connectable_peers = [] + reachable_peers = [] for peer in peers: - if peer.key.endswith(":0"): - continue # Not connectable - if not peer.connection: - continue # No connection - if peer.ip.endswith(".onion") and not self.connection_server.tor_manager.enabled: - continue # Onion not supported if peer.key in ignore: - continue # The requester has this peer - if time.time() - peer.connection.last_recv_time > 60 * 60 * 2: # Last message more than 2 hours ago - peer.connection = None # Cleanup: Dead connection continue if not allow_private and helper.isPrivateIp(peer.ip): continue - found.append(peer) - if len(found) >= need_num: + if peer.isConnectable(): + connectable_peers.append(peer) + elif peer.isReachable(): + reachable_peers.append(peer) + if len(connectable_peers) >= need_num: break # Found requested number of peers - if len(found) < need_num: # Return not that good peers - found += [ - peer for peer in peers - if not peer.key.endswith(":0") and - peer.key not in ignore and - (allow_private or not helper.isPrivateIp(peer.ip)) - ][0:need_num - len(found)] + if len(connectable_peers) < need_num: # Return not that good peers + connectable_peers += reachable_peers[0:need_num - len(connectable_peers)] - return found + return connectable_peers # Return: Recently found peers + def getReachablePeers(self): + return [peer for peer in self.peers.values() if peer.isReachable()] + + # Return: Recently found peers, sorted by reputation. + # If there not enough recently found peers, adds other known peers with highest reputation def getRecentPeers(self, need_num): need_num = int(need_num) - found = list(set(self.peers_recent)) + found = set(self.peers_recent) self.log.debug( "Recent peers %s of %s (need: %s)" % (len(found), len(self.peers), need_num) ) - if len(found) >= need_num or len(found) >= len(self.peers): - return sorted( - found, + if len(found) < need_num and len(found) < len(self.peers): + # Add random peers + peers = self.getReachablePeers() + peers = sorted( + list(peers), key=lambda peer: peer.reputation, reverse=True - )[0:need_num] + ) + while len(found) < need_num and len(peers) > 0: + found.add(peers.pop()) - # Add random peers - need_more = need_num - len(found) - if not self.connection_server.tor_manager.enabled: - peers = [peer for peer in self.peers.values() if not peer.ip.endswith(".onion")] - else: - peers = list(self.peers.values()) - - self.log.debug("getRecentPeers: peers = %s" % peers) - self.log.debug("getRecentPeers: need_more = %s" % need_more) - - peers = peers[0:need_more * 50] - - found_more = sorted( - peers, + return sorted( + list(found), key=lambda peer: peer.reputation, reverse=True - )[0:need_more * 2] - - found += found_more + )[0:need_num] return found[0:need_num] @@ -1400,8 +1398,8 @@ class Site(object): # By default the result may contain peers chosen optimistically: # If the connection is being established and 20 seconds have not yet passed # since the connection start time, those peers are included in the result. - # Set onlyFullyConnected=True for restricting only by fully connected peers. - def getConnectedPeers(self, onlyFullyConnected=False): + # Set only_fully_connected=True for restricting only by fully connected peers. + def getConnectedPeers(self, only_fully_connected=False): back = [] if not self.connection_server: return [] @@ -1413,7 +1411,7 @@ class Site(object): if not connection.connected and time.time() - connection.start_time > 20: # Still not connected after 20s continue - if not connection.connected and onlyFullyConnected: # Only fully connected peers + if not connection.connected and only_fully_connected: # Only fully connected peers continue peer = self.peers.get("%s:%s" % (connection.ip, connection.port)) @@ -1434,7 +1432,9 @@ class Site(object): return removed = 0 - if len(peers) > 1000: + if len(peers) > 10000: + ttl = 60 * 2 + elif len(peers) > 1000: ttl = 60 * 60 * 1 elif len(peers) > 100: ttl = 60 * 60 * 4 @@ -1458,7 +1458,7 @@ class Site(object): self.removeDeadPeers() limit = max(self.getActiveConnectionCountLimit(), self.waitingForConnections()) - connected_peers = self.getConnectedPeers(onlyFullyConnected=True) + connected_peers = self.getConnectedPeers(only_fully_connected=True) need_to_close = len(connected_peers) - limit if need_to_close > 0: @@ -1537,7 +1537,7 @@ class Site(object): return False sent = 0 - connected_peers = self.getConnectedPeers(onlyFullyConnected=True) + connected_peers = self.getConnectedPeers(only_fully_connected=True) for peer in connected_peers: if peer.sendMyHashfield(): sent += 1 @@ -1559,7 +1559,7 @@ class Site(object): s = time.time() queried = 0 - connected_peers = self.getConnectedPeers(onlyFullyConnected=True) + connected_peers = self.getConnectedPeers(only_fully_connected=True) for peer in connected_peers: if peer.time_hashfield: continue diff --git a/src/Site/SiteAnnouncer.py b/src/Site/SiteAnnouncer.py index 31004d02..1cb0a445 100644 --- a/src/Site/SiteAnnouncer.py +++ b/src/Site/SiteAnnouncer.py @@ -311,7 +311,7 @@ class SiteAnnouncer(object): for _ in range(5): if not self.site.isServing(): return - peers = self.site.getConnectedPeers(onlyFullyConnected=True) + peers = self.site.getConnectedPeers(only_fully_connected=True) if len(peers) > 0: break time.sleep(2) diff --git a/src/Site/SiteHelpers.py b/src/Site/SiteHelpers.py index c095ad66..53105f65 100644 --- a/src/Site/SiteHelpers.py +++ b/src/Site/SiteHelpers.py @@ -124,7 +124,7 @@ class PeerConnector(object): self.spawnPeerConnectorController(); def processReqs2(self): - self.nr_connected_peers = len(self.site.getConnectedPeers(onlyFullyConnected=True)) + self.nr_connected_peers = len(self.site.getConnectedPeers(only_fully_connected=True)) self.processReqs(nr_connected_peers=self.nr_connected_peers) # For adding new peers when ConnectorController is working. @@ -140,6 +140,12 @@ class PeerConnector(object): if peer not in self.peers: self.peers.append(peer) + def deregisterPeer(self, peer): + try: + self.peers.remove(peer) + except: + pass + def sleep(self, t): self.site.connection_server.sleep(t) @@ -187,6 +193,8 @@ class PeerConnector(object): self.sleep(10) continue + added = 0 + # try connecting to peers while self.keepGoing() and len(self.peer_connector_workers) < self.peer_connector_worker_limit: if len(self.peers) < 1: @@ -204,13 +212,23 @@ class PeerConnector(object): thread = self.site.spawn(self.peerConnectorWorker, peer) self.peer_connector_workers[peer] = thread thread.link(lambda thread, peer=peer: self.peer_connector_workers.pop(peer, None)) + added += 1 + + if not self.keepGoing(): + break + + if not added: + # Looks like all known peers are either connected or being connected, + # so we weren't able to start connecting any peer in this iteration. + # Waiting for the announcer to discover some peers. + self.sleep(20) # wait for more room in self.peer_connector_workers while self.keepGoing() and len(self.peer_connector_workers) >= self.peer_connector_worker_limit: self.sleep(2) if not self.site.connection_server.isInternetOnline(): - self.sleep(20) + self.sleep(30) self.peers = list() self.peer_connector_controller = None