Reorganization of Peer class and peer-related Site's methods

This commit is contained in:
Vadim Ushakov 2021-10-26 17:38:40 +07:00
parent 93a95f511a
commit 645f3ba34a
7 changed files with 150 additions and 83 deletions

View file

@ -462,7 +462,7 @@ if "tracker_storage" not in locals():
class SiteAnnouncerPlugin(object):
def getTrackers(self):
tracker_storage.setSiteAnnouncer(self)
tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers(onlyFullyConnected=True))
tracker_storage.checkDiscoveringTrackers(self.site.getConnectedPeers(only_fully_connected=True))
trackers = super(SiteAnnouncerPlugin, self).getTrackers()
shared_trackers = list(tracker_storage.getTrackers().keys())
if shared_trackers:

View file

@ -124,12 +124,13 @@ class Connection(object):
self.event_connected = gevent.event.AsyncResult()
self.type = "out"
unreachability = self.server.getIpUnreachability(self.ip)
if unreachability:
raise Exception(unreachability)
if self.ip_type == "onion":
if not self.server.tor_manager or not self.server.tor_manager.enabled:
raise Exception("Can't connect to onion addresses, no Tor controller present")
self.sock = self.server.tor_manager.createSocket(self.ip, self.port)
elif config.tor == "always" and helper.isPrivateIp(self.ip) and self.ip not in config.ip_local:
raise Exception("Can't connect to local IPs in Tor: always mode")
elif config.trackers_proxy != "disable" and config.tor != "always" and self.is_tracker_connection:
if config.trackers_proxy == "tor":
self.sock = self.server.tor_manager.createSocket(self.ip, self.port)

View file

@ -519,3 +519,17 @@ class ConnectionServer(object):
mid = int(len(corrections) / 2 - 1)
median = (corrections[mid - 1] + corrections[mid] + corrections[mid + 1]) / 3
return median
# Checks if a network address can be reachable in the current configuration
# and returs a string describing why it cannot.
# If the network address can be reachable, returns False.
def getIpUnreachability(self, ip):
ip_type = helper.getIpType(ip)
if ip_type == 'onion' and not self.tor_manager.enabled:
return "Can't connect to onion addresses, no Tor controller present"
if config.tor == "always" and helper.isPrivateIp(ip) and ip not in config.ip_local:
return "Can't connect to local IPs in Tor: always mode"
return False
def isIpReachable(self, ip):
return self.getIpUnreachability(ip) == False

View file

@ -32,6 +32,10 @@ class Peer(object):
self.site = site
self.key = "%s:%s" % (ip, port)
self.ip_type = helper.getIpType(ip)
self.removed = False
self.log_level = logging.DEBUG
self.connection_error_log_level = logging.DEBUG
@ -41,7 +45,7 @@ class Peer(object):
self.time_hashfield = None # Last time peer's hashfiled downloaded
self.time_my_hashfield_sent = None # Last time my hashfield sent to peer
self.time_found = time.time() # Time of last found in the torrent tracker
self.time_response = None # Time of last successful response from peer
self.time_response = 0 # Time of last successful response from peer
self.time_added = time.time()
self.last_ping = None # Last response time for ping
self.last_pex = 0 # Last query/response time for pex
@ -49,6 +53,7 @@ class Peer(object):
self.reputation = 0 # More likely to connect if larger
self.last_content_json_update = 0.0 # Modify date of last received content.json
self.protected = 0
self.reachable = False
self.connection_error = 0 # Series of connection error
self.hash_failed = 0 # Number of bad files from peer
@ -57,6 +62,8 @@ class Peer(object):
self.protectedRequests = ["getFile", "streamFile", "update", "listModified"]
self.updateReachable()
def __getattr__(self, key):
if key == "hashfield":
self.has_hashfield = True
@ -97,7 +104,36 @@ class Peer(object):
return self.connection and self.connection.connected
def isTtlExpired(self, ttl):
return (time.time() - self.time_found) > ttl
last_activity = max(self.time_found, self.time_response)
return (time.time() - last_activity) > ttl
def isReachable(self):
return self.reachable
def updateReachable(self):
connection_server = self.getConnectionServer()
if not self.port:
self.reachable = False
else:
self.reachable = connection_server.isIpReachable(self.ip)
# Peer proved to to be connectable recently
def isConnectable(self):
if self.connection_error >= 1: # The last connection attempt failed
return False
if time.time() - self.time_response > 60 * 60 * 2: # Last successful response more than 2 hours ago
return False
return self.isReachable()
def getConnectionServer(self):
if self.connection_server:
connection_server = self.connection_server
elif self.site:
connection_server = self.site.connection_server
else:
import main
connection_server = main.file_server
return connection_server
# Connect to host
def connect(self, connection=None):
@ -120,13 +156,7 @@ class Peer(object):
self.connection = None
try:
if self.connection_server:
connection_server = self.connection_server
elif self.site:
connection_server = self.site.connection_server
else:
import main
connection_server = main.file_server
connection_server = self.getConnectionServer()
self.connection = connection_server.getConnection(self.ip, self.port, site=self.site, is_tracker_connection=self.is_tracker_connection)
if self.connection and self.connection.connected:
self.reputation += 1
@ -183,6 +213,7 @@ class Peer(object):
if source in ("tracker", "local"):
self.site.peers_recent.appendleft(self)
self.time_found = time.time()
self.updateReachable()
# Send a command to peer and return response value
def request(self, cmd, params={}, stream_to=None):
@ -355,6 +386,8 @@ class Peer(object):
# List modified files since the date
# Return: {inner_path: modification date,...}
def listModified(self, since):
if self.removed:
return False
return self.request("listModified", {"since": since, "site": self.site.address})
def updateHashfield(self, force=False):
@ -430,12 +463,11 @@ class Peer(object):
# Stop and remove from site
def remove(self, reason="Removing"):
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
if self.site and self.key in self.site.peers:
del(self.site.peers[self.key])
if self.site and self in self.site.peers_recent:
self.site.peers_recent.remove(self)
self.removed = True
self.log("Removing peer with reason: <%s>. Connection error: %s, Hash failed: %s" % (reason, self.connection_error, self.hash_failed))
if self.site:
self.site.deregisterPeer(self)
self.site = None
self.disconnect(reason)
@ -443,6 +475,8 @@ class Peer(object):
# On connection error
def onConnectionError(self, reason="Unknown"):
if not self.getConnectionServer().isInternetOnline():
return
self.connection_error += 1
if self.site and len(self.site.peers) > 200:
limit = 3
@ -450,7 +484,7 @@ class Peer(object):
limit = 6
self.reputation -= 1
if self.connection_error >= limit: # Dead peer
self.remove("Peer connection: %s" % reason)
self.remove("Connection error limit reached: %s. Provided message: %s" % (limit, reason))
# Done working with peer
def onWorkerDone(self):

View file

@ -913,7 +913,7 @@ class Site(object):
self.peer_connector.newReq(limit, limit / 2, 10), # some of them...
self.peer_connector.newReq(1, 1, 60) # or at least one...
]
self.waitForPeers(reqs)
self.waitForPeers(reqs, update_websocket=False)
peers = self.getConnectedPeers()
random.shuffle(peers)
@ -1197,6 +1197,15 @@ class Site(object):
return peer
# Called from peer.remove to erase links to peer
def deregisterPeer(self, peer):
self.peers.pop(peer.key, None)
try:
self.peers_recent.remove(peer)
except:
pass
self.peer_connector.deregisterPeer(peer)
def announce(self, *args, **kwargs):
if self.isServing():
self.announcer.announce(*args, **kwargs)
@ -1295,8 +1304,9 @@ class Site(object):
req = self.peer_connector.newReq(0, num)
return req
# Wait for peers to ne known and/or connected and send updates to the UI
def waitForPeers(self, reqs):
# Wait for peers to be discovered and/or connected according to reqs
# and send updates to the UI
def waitForPeers(self, reqs, update_websocket=True):
if not reqs:
return 0
i = 0
@ -1315,8 +1325,10 @@ class Site(object):
waiting_req.waitHeartbeat(timeout=1.0)
if i > 0 and nr_connected_peers != waiting_req.nr_connected_peers:
nr_connected_peers = waiting_req.nr_connected_peers
if update_websocket:
self.updateWebsocket(connecting_to_peers=nr_connected_peers)
i += 1
if update_websocket:
self.updateWebsocket(connected_to_peers=max(nr_connected_peers, 0))
if i > 1:
# If we waited some time, pause now for displaying connected_to_peers message in the UI.
@ -1326,82 +1338,68 @@ class Site(object):
############################################################################
# Return: Probably peers verified to be connectable recently
# Return: Peers verified to be connectable recently, or if not enough, other peers as well
def getConnectablePeers(self, need_num=5, ignore=[], allow_private=True):
peers = list(self.peers.values())
found = []
random.shuffle(peers)
connectable_peers = []
reachable_peers = []
for peer in peers:
if peer.key.endswith(":0"):
continue # Not connectable
if not peer.connection:
continue # No connection
if peer.ip.endswith(".onion") and not self.connection_server.tor_manager.enabled:
continue # Onion not supported
if peer.key in ignore:
continue # The requester has this peer
if time.time() - peer.connection.last_recv_time > 60 * 60 * 2: # Last message more than 2 hours ago
peer.connection = None # Cleanup: Dead connection
continue
if not allow_private and helper.isPrivateIp(peer.ip):
continue
found.append(peer)
if len(found) >= need_num:
if peer.isConnectable():
connectable_peers.append(peer)
elif peer.isReachable():
reachable_peers.append(peer)
if len(connectable_peers) >= need_num:
break # Found requested number of peers
if len(found) < need_num: # Return not that good peers
found += [
peer for peer in peers
if not peer.key.endswith(":0") and
peer.key not in ignore and
(allow_private or not helper.isPrivateIp(peer.ip))
][0:need_num - len(found)]
if len(connectable_peers) < need_num: # Return not that good peers
connectable_peers += reachable_peers[0:need_num - len(connectable_peers)]
return found
return connectable_peers
# Return: Recently found peers
def getReachablePeers(self):
return [peer for peer in self.peers.values() if peer.isReachable()]
# Return: Recently found peers, sorted by reputation.
# If there not enough recently found peers, adds other known peers with highest reputation
def getRecentPeers(self, need_num):
need_num = int(need_num)
found = list(set(self.peers_recent))
found = set(self.peers_recent)
self.log.debug(
"Recent peers %s of %s (need: %s)" %
(len(found), len(self.peers), need_num)
)
if len(found) >= need_num or len(found) >= len(self.peers):
if len(found) < need_num and len(found) < len(self.peers):
# Add random peers
peers = self.getReachablePeers()
peers = sorted(
list(peers),
key=lambda peer: peer.reputation,
reverse=True
)
while len(found) < need_num and len(peers) > 0:
found.add(peers.pop())
return sorted(
found,
list(found),
key=lambda peer: peer.reputation,
reverse=True
)[0:need_num]
# Add random peers
need_more = need_num - len(found)
if not self.connection_server.tor_manager.enabled:
peers = [peer for peer in self.peers.values() if not peer.ip.endswith(".onion")]
else:
peers = list(self.peers.values())
self.log.debug("getRecentPeers: peers = %s" % peers)
self.log.debug("getRecentPeers: need_more = %s" % need_more)
peers = peers[0:need_more * 50]
found_more = sorted(
peers,
key=lambda peer: peer.reputation,
reverse=True
)[0:need_more * 2]
found += found_more
return found[0:need_num]
# Returns the list of connected peers
# By default the result may contain peers chosen optimistically:
# If the connection is being established and 20 seconds have not yet passed
# since the connection start time, those peers are included in the result.
# Set onlyFullyConnected=True for restricting only by fully connected peers.
def getConnectedPeers(self, onlyFullyConnected=False):
# Set only_fully_connected=True for restricting only by fully connected peers.
def getConnectedPeers(self, only_fully_connected=False):
back = []
if not self.connection_server:
return []
@ -1413,7 +1411,7 @@ class Site(object):
if not connection.connected and time.time() - connection.start_time > 20: # Still not connected after 20s
continue
if not connection.connected and onlyFullyConnected: # Only fully connected peers
if not connection.connected and only_fully_connected: # Only fully connected peers
continue
peer = self.peers.get("%s:%s" % (connection.ip, connection.port))
@ -1434,7 +1432,9 @@ class Site(object):
return
removed = 0
if len(peers) > 1000:
if len(peers) > 10000:
ttl = 60 * 2
elif len(peers) > 1000:
ttl = 60 * 60 * 1
elif len(peers) > 100:
ttl = 60 * 60 * 4
@ -1458,7 +1458,7 @@ class Site(object):
self.removeDeadPeers()
limit = max(self.getActiveConnectionCountLimit(), self.waitingForConnections())
connected_peers = self.getConnectedPeers(onlyFullyConnected=True)
connected_peers = self.getConnectedPeers(only_fully_connected=True)
need_to_close = len(connected_peers) - limit
if need_to_close > 0:
@ -1537,7 +1537,7 @@ class Site(object):
return False
sent = 0
connected_peers = self.getConnectedPeers(onlyFullyConnected=True)
connected_peers = self.getConnectedPeers(only_fully_connected=True)
for peer in connected_peers:
if peer.sendMyHashfield():
sent += 1
@ -1559,7 +1559,7 @@ class Site(object):
s = time.time()
queried = 0
connected_peers = self.getConnectedPeers(onlyFullyConnected=True)
connected_peers = self.getConnectedPeers(only_fully_connected=True)
for peer in connected_peers:
if peer.time_hashfield:
continue

View file

@ -311,7 +311,7 @@ class SiteAnnouncer(object):
for _ in range(5):
if not self.site.isServing():
return
peers = self.site.getConnectedPeers(onlyFullyConnected=True)
peers = self.site.getConnectedPeers(only_fully_connected=True)
if len(peers) > 0:
break
time.sleep(2)

View file

@ -124,7 +124,7 @@ class PeerConnector(object):
self.spawnPeerConnectorController();
def processReqs2(self):
self.nr_connected_peers = len(self.site.getConnectedPeers(onlyFullyConnected=True))
self.nr_connected_peers = len(self.site.getConnectedPeers(only_fully_connected=True))
self.processReqs(nr_connected_peers=self.nr_connected_peers)
# For adding new peers when ConnectorController is working.
@ -140,6 +140,12 @@ class PeerConnector(object):
if peer not in self.peers:
self.peers.append(peer)
def deregisterPeer(self, peer):
try:
self.peers.remove(peer)
except:
pass
def sleep(self, t):
self.site.connection_server.sleep(t)
@ -187,6 +193,8 @@ class PeerConnector(object):
self.sleep(10)
continue
added = 0
# try connecting to peers
while self.keepGoing() and len(self.peer_connector_workers) < self.peer_connector_worker_limit:
if len(self.peers) < 1:
@ -204,13 +212,23 @@ class PeerConnector(object):
thread = self.site.spawn(self.peerConnectorWorker, peer)
self.peer_connector_workers[peer] = thread
thread.link(lambda thread, peer=peer: self.peer_connector_workers.pop(peer, None))
added += 1
if not self.keepGoing():
break
if not added:
# Looks like all known peers are either connected or being connected,
# so we weren't able to start connecting any peer in this iteration.
# Waiting for the announcer to discover some peers.
self.sleep(20)
# wait for more room in self.peer_connector_workers
while self.keepGoing() and len(self.peer_connector_workers) >= self.peer_connector_worker_limit:
self.sleep(2)
if not self.site.connection_server.isInternetOnline():
self.sleep(20)
self.sleep(30)
self.peers = list()
self.peer_connector_controller = None