Limit the pex request frequency, interval is 120 secs for each peer
This commit is contained in:
parent
dff52d691a
commit
93a95f511a
3 changed files with 45 additions and 30 deletions
|
@ -44,6 +44,7 @@ class Peer(object):
|
||||||
self.time_response = None # Time of last successful response from peer
|
self.time_response = None # Time of last successful response from peer
|
||||||
self.time_added = time.time()
|
self.time_added = time.time()
|
||||||
self.last_ping = None # Last response time for ping
|
self.last_ping = None # Last response time for ping
|
||||||
|
self.last_pex = 0 # Last query/response time for pex
|
||||||
self.is_tracker_connection = False # Tracker connection instead of normal peer
|
self.is_tracker_connection = False # Tracker connection instead of normal peer
|
||||||
self.reputation = 0 # More likely to connect if larger
|
self.reputation = 0 # More likely to connect if larger
|
||||||
self.last_content_json_update = 0.0 # Modify date of last received content.json
|
self.last_content_json_update = 0.0 # Modify date of last received content.json
|
||||||
|
@ -305,10 +306,15 @@ class Peer(object):
|
||||||
return response_time
|
return response_time
|
||||||
|
|
||||||
# Request peer exchange from peer
|
# Request peer exchange from peer
|
||||||
def pex(self, site=None, need_num=5):
|
def pex(self, site=None, need_num=5, request_interval=60*2):
|
||||||
if not site:
|
if not site:
|
||||||
site = self.site # If no site defined request peers for this site
|
site = self.site # If no site defined request peers for this site
|
||||||
|
|
||||||
|
if self.last_pex + request_interval >= time.time():
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.last_pex = time.time()
|
||||||
|
|
||||||
# give back 5 connectible peers
|
# give back 5 connectible peers
|
||||||
packed_peers = helper.packPeers(self.site.getConnectablePeers(5, allow_private=False))
|
packed_peers = helper.packPeers(self.site.getConnectablePeers(5, allow_private=False))
|
||||||
request = {"site": site.address, "peers": packed_peers["ipv4"], "need": need_num}
|
request = {"site": site.address, "peers": packed_peers["ipv4"], "need": need_num}
|
||||||
|
@ -317,6 +323,7 @@ class Peer(object):
|
||||||
if packed_peers["ipv6"]:
|
if packed_peers["ipv6"]:
|
||||||
request["peers_ipv6"] = packed_peers["ipv6"]
|
request["peers_ipv6"] = packed_peers["ipv6"]
|
||||||
res = self.request("pex", request)
|
res = self.request("pex", request)
|
||||||
|
self.last_pex = time.time()
|
||||||
if not res or "error" in res:
|
if not res or "error" in res:
|
||||||
return False
|
return False
|
||||||
added = 0
|
added = 0
|
||||||
|
|
|
@ -302,39 +302,47 @@ class SiteAnnouncer(object):
|
||||||
self.updateWebsocket(trackers="announced")
|
self.updateWebsocket(trackers="announced")
|
||||||
|
|
||||||
@util.Noparallel(blocking=False)
|
@util.Noparallel(blocking=False)
|
||||||
def announcePex(self, query_num=2, need_num=10):
|
def announcePex(self, query_num=2, need_num=10, establish_connections=True):
|
||||||
if not self.site.isServing():
|
peers = []
|
||||||
return
|
try:
|
||||||
|
peer_count = 20 + query_num * 2
|
||||||
|
|
||||||
self.updateWebsocket(pex="announcing")
|
# Wait for some peers to connect
|
||||||
|
for _ in range(5):
|
||||||
|
if not self.site.isServing():
|
||||||
|
return
|
||||||
|
peers = self.site.getConnectedPeers(onlyFullyConnected=True)
|
||||||
|
if len(peers) > 0:
|
||||||
|
break
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
peers = self.site.getConnectedPeers()
|
if len(peers) < peer_count and establish_connections:
|
||||||
if len(peers) == 0: # Wait 3s for connections
|
# Small number of connected peers for this site, connect to any
|
||||||
time.sleep(3)
|
peers = list(self.site.getRecentPeers(peer_count))
|
||||||
peers = self.site.getConnectedPeers()
|
|
||||||
|
|
||||||
if len(peers) == 0: # Small number of connected peers for this site, connect to any
|
if len(peers) > 0:
|
||||||
peers = list(self.site.getRecentPeers(20))
|
self.updateWebsocket(pex="announcing")
|
||||||
need_num = 10
|
|
||||||
|
|
||||||
random.shuffle(peers)
|
random.shuffle(peers)
|
||||||
done = 0
|
done = 0
|
||||||
total_added = 0
|
total_added = 0
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
num_added = peer.pex(need_num=need_num)
|
if not establish_connections and not peer.isConnected():
|
||||||
if num_added is not False:
|
continue
|
||||||
done += 1
|
num_added = peer.pex(need_num=need_num)
|
||||||
total_added += num_added
|
if num_added is not False:
|
||||||
if num_added:
|
done += 1
|
||||||
self.site.worker_manager.onPeers()
|
total_added += num_added
|
||||||
self.site.updateWebsocket(peers_added=num_added)
|
if num_added:
|
||||||
else:
|
self.site.worker_manager.onPeers()
|
||||||
|
self.site.updateWebsocket(peers_added=num_added)
|
||||||
|
if done == query_num:
|
||||||
|
break
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
if done == query_num:
|
self.log.debug("Pex result: from %s peers got %s new peers." % (done, total_added))
|
||||||
break
|
finally:
|
||||||
self.log.debug("Pex result: from %s peers got %s new peers." % (done, total_added))
|
if len(peers) > 0:
|
||||||
|
self.updateWebsocket(pex="announced")
|
||||||
self.updateWebsocket(pex="announced")
|
|
||||||
|
|
||||||
def updateWebsocket(self, **kwargs):
|
def updateWebsocket(self, **kwargs):
|
||||||
if kwargs:
|
if kwargs:
|
||||||
|
|
|
@ -183,7 +183,7 @@ class PeerConnector(object):
|
||||||
if len(self.peers) <= self.nr_connected_peers:
|
if len(self.peers) <= self.nr_connected_peers:
|
||||||
# Looks like all known peers are connected.
|
# Looks like all known peers are connected.
|
||||||
# Waiting for the announcer to discover some peers.
|
# Waiting for the announcer to discover some peers.
|
||||||
self.site.announcer.announcePex()
|
self.site.announcer.announcePex(establish_connections=False)
|
||||||
self.sleep(10)
|
self.sleep(10)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue