diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py
index 9aee507d..11f2758e 100644
--- a/plugins/Stats/StatsPlugin.py
+++ b/plugins/Stats/StatsPlugin.py
@@ -97,13 +97,12 @@ class UiRequestPlugin(object):
# Sites
yield "
Sites:"
yield "
"
- yield "address | peers | connected | content.json |
"
+ yield "address | connected | peers | content.json |
"
for site in self.server.sites.values():
yield self.formatTableRow([
("%s", site.address),
- ("%s", len(site.peers)),
- ("%s/%s", ( len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected]), len(site.peers) ) ),
("%s", [peer.connection.id for peer in site.peers.values() if peer.connection and peer.connection.connected]),
+ ("%s/%s", ( len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected]), len(site.peers) ) ),
("%s", len(site.content_manager.contents)),
])
yield "
"
diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py
index 8d4fdacf..a6224df7 100644
--- a/src/File/FileRequest.py
+++ b/src/File/FileRequest.py
@@ -1,4 +1,4 @@
-import os, msgpack, shutil, gevent
+import os, msgpack, shutil, gevent, socket, struct, random
from cStringIO import StringIO
from Debug import Debug
from Config import config
@@ -16,6 +16,10 @@ class FileRequest:
self.log = server.log
+ def unpackAddress(self, packed):
+ return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0])
+
+
def send(self, msg):
self.connection.send(msg)
@@ -35,6 +39,8 @@ class FileRequest:
self.actionGetFile(params)
elif cmd == "update":
self.actionUpdate(params)
+ elif cmd == "pex":
+ self.actionPex(params)
elif cmd == "ping":
self.actionPing()
else:
@@ -104,13 +110,36 @@ class FileRequest:
if config.debug_socket: self.log.debug("File %s sent" % file_path)
# Add peer to site if not added before
- # site.addPeer(self.connection.ip, self.connection.port)
+ site.addPeer(self.connection.ip, self.connection.port)
except Exception, err:
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
self.response({"error": "File read error: %s" % Debug.formatException(err)})
return False
+ # Peer exchange request
+ def actionPex(self, params):
+ site = self.sites.get(params["site"])
+ if not site or not site.settings["serving"]: # Site unknown or not serving
+ self.response({"error": "Unknown site"})
+ return False
+
+ got_peer_keys = []
+ added = 0
+ site.addPeer(self.connection.ip, self.connection.port) # Add requester peer to site
+ for peer in params["peers"]: # Add sent peers to site
+ address = self.unpackAddress(peer)
+ got_peer_keys.append("%s:%s" % address)
+ if (site.addPeer(*address)): added += 1
+ # Send back peers that is not in the sent list
+ peers = site.peers.values()
+ random.shuffle(peers)
+ packed_peers = [peer.packAddress() for peer in peers if peer.key not in got_peer_keys][0:params["need"]]
+ if added:
+ self.log.debug("Added %s peers to %s using PEX, sending back %s" % (added, site, len(packed_peers)))
+ self.response({"peers": packed_peers})
+
+
# Send a simple Pong! answer
def actionPing(self):
self.response("Pong!")
diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py
index eb7ec3a5..755eddbc 100644
--- a/src/Peer/Peer.py
+++ b/src/Peer/Peer.py
@@ -1,4 +1,4 @@
-import os, logging, gevent, time, msgpack, sys
+import os, logging, gevent, time, msgpack, sys, random, socket, struct
from cStringIO import StringIO
from Config import config
from Debug import Debug
@@ -49,6 +49,16 @@ class Peer:
def __repr__(self):
return "<%s>" % self.__str__()
+
+ # Peer ip:port to packed 6byte format
+ def packAddress(self):
+ return socket.inet_aton(self.ip)+struct.pack("H", self.port)
+
+
+ def unpackAddress(self, packed):
+ return (socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0])
+
+
# Found a peer on tracker
def found(self):
self.last_found = time.time()
@@ -135,6 +145,24 @@ class Peer:
return response_time
+ # Request peer exchange from peer
+ def pex(self, site=None, need_num=5):
+ if not site: site = self.site # If no site definied request peers for this site
+ peers = self.site.peers.values()
+ random.shuffle(peers)
+ packed_peers = [peer.packAddress() for peer in peers][0:need_num]
+ response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num})
+ if not response or "error" in response:
+ return False
+ added = 0
+ for peer in response.get("peers", []):
+ address = self.unpackAddress(peer)
+ if (site.addPeer(*address)): added += 1
+ if added:
+ self.log.debug("Added peers using PEX: %s" % added)
+ return added
+
+
# Stop and remove from site
def remove(self):
self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
diff --git a/src/Site/Site.py b/src/Site/Site.py
index 62cf7391..567d2528 100644
--- a/src/Site/Site.py
+++ b/src/Site/Site.py
@@ -272,7 +272,7 @@ class Site:
if not ip: return False
key = "%s:%s" % (ip, port)
if key in self.peers: # Already has this ip
- self.peers[key].found()
+ #self.peers[key].found()
if return_peer: # Always return peer
return self.peers[key]
else:
@@ -283,6 +283,28 @@ class Site:
return peer
+ # Gather peer from connected peers
+ @util.Noparallel(blocking=False)
+ def announcePex(self, query_num=3, need_num=5):
+ peers = [peer for peer in self.peers.values() if peer.connection and peer.connection.connected] # Connected peers
+ if len(peers) == 0: # Small number of connected peers for this site, connect to any
+ peers = self.peers.values()
+ need_num = 10
+
+ random.shuffle(peers)
+ done = 0
+ added = 0
+ for peer in peers:
+ res = peer.pex(need_num=need_num)
+ if res != False:
+ done += 1
+ added += res
+ if added:
+ self.worker_manager.onPeers()
+ self.updateWebsocket(peers_added=added)
+ if done == query_num: break
+
+
# Add myself and get other peers from tracker
def announce(self, force=False):
if time.time() < self.last_announce+60 and not force: return # No reannouncing within 60 secs
@@ -364,6 +386,12 @@ class Site:
else:
self.log.error("Announced to %s trackers in %.3fs, failed" % (announced, time.time()-s))
+ if not [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]: # If no connected peer yet then wait for connections
+ gevent.spawn_later(3, self.announcePex, need_num=10) # Spawn 3 secs later
+ # self.onFileDone.once(lambda inner_path: self.announcePex(need_num=10), "announcePex_%s" % self.address) # After first file downloaded try to find more peers using pex
+ else: # Else announce immediately
+ self.announcePex()
+
# Need open connections
def needConnections(self):
diff --git a/src/util/Event.py b/src/util/Event.py
index ae7d0a53..850e920d 100644
--- a/src/util/Event.py
+++ b/src/util/Event.py
@@ -16,7 +16,7 @@ class Event(list):
func.once = True
func.name = None
if name: # Dont function with same name twice
- names = [f.name for f in self]
+ names = [f.name for f in self if "once" in dir(f)]
if name not in names:
func.name = name
self.append(func)