Trackerless peer exchange between peers, fix event once bug
This commit is contained in:
parent
4f7e048413
commit
44d5ac784d
5 changed files with 92 additions and 8 deletions
|
@ -97,13 +97,12 @@ class UiRequestPlugin(object):
|
|||
# Sites
|
||||
yield "<br><br><b>Sites</b>:"
|
||||
yield "<table>"
|
||||
yield "<tr><th>address</th> <th>peers</th> <th colspan=2>connected</th> <th>content.json</th> </tr>"
|
||||
yield "<tr><th>address</th> <th>connected</th> <th>peers</th> <th>content.json</th> </tr>"
|
||||
for site in self.server.sites.values():
|
||||
yield self.formatTableRow([
|
||||
("%s", site.address),
|
||||
("%s", len(site.peers)),
|
||||
("%s/%s", ( len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected]), len(site.peers) ) ),
|
||||
("%s", [peer.connection.id for peer in site.peers.values() if peer.connection and peer.connection.connected]),
|
||||
("%s/%s", ( len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected]), len(site.peers) ) ),
|
||||
("%s", len(site.content_manager.contents)),
|
||||
])
|
||||
yield "</table>"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import os, msgpack, shutil, gevent
|
||||
import os, msgpack, shutil, gevent, socket, struct, random
|
||||
from cStringIO import StringIO
|
||||
from Debug import Debug
|
||||
from Config import config
|
||||
|
@ -16,6 +16,10 @@ class FileRequest:
|
|||
self.log = server.log
|
||||
|
||||
|
||||
def unpackAddress(self, packed):
|
||||
return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0])
|
||||
|
||||
|
||||
def send(self, msg):
|
||||
self.connection.send(msg)
|
||||
|
||||
|
@ -35,6 +39,8 @@ class FileRequest:
|
|||
self.actionGetFile(params)
|
||||
elif cmd == "update":
|
||||
self.actionUpdate(params)
|
||||
elif cmd == "pex":
|
||||
self.actionPex(params)
|
||||
elif cmd == "ping":
|
||||
self.actionPing()
|
||||
else:
|
||||
|
@ -104,13 +110,36 @@ class FileRequest:
|
|||
if config.debug_socket: self.log.debug("File %s sent" % file_path)
|
||||
|
||||
# Add peer to site if not added before
|
||||
# site.addPeer(self.connection.ip, self.connection.port)
|
||||
site.addPeer(self.connection.ip, self.connection.port)
|
||||
except Exception, err:
|
||||
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
|
||||
self.response({"error": "File read error: %s" % Debug.formatException(err)})
|
||||
return False
|
||||
|
||||
|
||||
# Peer exchange request
|
||||
def actionPex(self, params):
|
||||
site = self.sites.get(params["site"])
|
||||
if not site or not site.settings["serving"]: # Site unknown or not serving
|
||||
self.response({"error": "Unknown site"})
|
||||
return False
|
||||
|
||||
got_peer_keys = []
|
||||
added = 0
|
||||
site.addPeer(self.connection.ip, self.connection.port) # Add requester peer to site
|
||||
for peer in params["peers"]: # Add sent peers to site
|
||||
address = self.unpackAddress(peer)
|
||||
got_peer_keys.append("%s:%s" % address)
|
||||
if (site.addPeer(*address)): added += 1
|
||||
# Send back peers that is not in the sent list
|
||||
peers = site.peers.values()
|
||||
random.shuffle(peers)
|
||||
packed_peers = [peer.packAddress() for peer in peers if peer.key not in got_peer_keys][0:params["need"]]
|
||||
if added:
|
||||
self.log.debug("Added %s peers to %s using PEX, sending back %s" % (added, site, len(packed_peers)))
|
||||
self.response({"peers": packed_peers})
|
||||
|
||||
|
||||
# Send a simple Pong! answer
|
||||
def actionPing(self):
|
||||
self.response("Pong!")
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import os, logging, gevent, time, msgpack, sys
|
||||
import os, logging, gevent, time, msgpack, sys, random, socket, struct
|
||||
from cStringIO import StringIO
|
||||
from Config import config
|
||||
from Debug import Debug
|
||||
|
@ -49,6 +49,16 @@ class Peer:
|
|||
def __repr__(self):
|
||||
return "<%s>" % self.__str__()
|
||||
|
||||
|
||||
# Peer ip:port to packed 6byte format
|
||||
def packAddress(self):
|
||||
return socket.inet_aton(self.ip)+struct.pack("H", self.port)
|
||||
|
||||
|
||||
def unpackAddress(self, packed):
|
||||
return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0])
|
||||
|
||||
|
||||
# Found a peer on tracker
|
||||
def found(self):
|
||||
self.last_found = time.time()
|
||||
|
@ -135,6 +145,24 @@ class Peer:
|
|||
return response_time
|
||||
|
||||
|
||||
# Request peer exchange from peer
|
||||
def pex(self, site=None, need_num=5):
|
||||
if not site: site = self.site # If no site definied request peers for this site
|
||||
peers = self.site.peers.values()
|
||||
random.shuffle(peers)
|
||||
packed_peers = [peer.packAddress() for peer in peers][0:need_num]
|
||||
response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num})
|
||||
if not response or "error" in response:
|
||||
return False
|
||||
added = 0
|
||||
for peer in response.get("peers", []):
|
||||
address = self.unpackAddress(peer)
|
||||
if (site.addPeer(*address)): added += 1
|
||||
if added:
|
||||
self.log.debug("Added peers using PEX: %s" % added)
|
||||
return added
|
||||
|
||||
|
||||
# Stop and remove from site
|
||||
def remove(self):
|
||||
self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
||||
|
|
|
@ -272,7 +272,7 @@ class Site:
|
|||
if not ip: return False
|
||||
key = "%s:%s" % (ip, port)
|
||||
if key in self.peers: # Already has this ip
|
||||
self.peers[key].found()
|
||||
#self.peers[key].found()
|
||||
if return_peer: # Always return peer
|
||||
return self.peers[key]
|
||||
else:
|
||||
|
@ -283,6 +283,28 @@ class Site:
|
|||
return peer
|
||||
|
||||
|
||||
# Gather peer from connected peers
|
||||
@util.Noparallel(blocking=False)
|
||||
def announcePex(self, query_num=3, need_num=5):
|
||||
peers = [peer for peer in self.peers.values() if peer.connection and peer.connection.connected] # Connected peers
|
||||
if len(peers) == 0: # Small number of connected peers for this site, connect to any
|
||||
peers = self.peers.values()
|
||||
need_num = 10
|
||||
|
||||
random.shuffle(peers)
|
||||
done = 0
|
||||
added = 0
|
||||
for peer in peers:
|
||||
res = peer.pex(need_num=need_num)
|
||||
if res != False:
|
||||
done += 1
|
||||
added += res
|
||||
if added:
|
||||
self.worker_manager.onPeers()
|
||||
self.updateWebsocket(peers_added=added)
|
||||
if done == query_num: break
|
||||
|
||||
|
||||
# Add myself and get other peers from tracker
|
||||
def announce(self, force=False):
|
||||
if time.time() < self.last_announce+60 and not force: return # No reannouncing within 60 secs
|
||||
|
@ -364,6 +386,12 @@ class Site:
|
|||
else:
|
||||
self.log.error("Announced to %s trackers in %.3fs, failed" % (announced, time.time()-s))
|
||||
|
||||
if not [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]: # If no connected peer yet then wait for connections
|
||||
gevent.spawn_later(3, self.announcePex, need_num=10) # Spawn 3 secs later
|
||||
# self.onFileDone.once(lambda inner_path: self.announcePex(need_num=10), "announcePex_%s" % self.address) # After first file downloaded try to find more peers using pex
|
||||
else: # Else announce immediately
|
||||
self.announcePex()
|
||||
|
||||
|
||||
# Need open connections
|
||||
def needConnections(self):
|
||||
|
|
|
@ -16,7 +16,7 @@ class Event(list):
|
|||
func.once = True
|
||||
func.name = None
|
||||
if name: # Dont function with same name twice
|
||||
names = [f.name for f in self]
|
||||
names = [f.name for f in self if "once" in dir(f)]
|
||||
if name not in names:
|
||||
func.name = name
|
||||
self.append(func)
|
||||
|
|
Loading…
Reference in a new issue