From 8e710beab194152cae188dc3819f5f948a0cc51f Mon Sep 17 00:00:00 2001 From: HelloZeroNet <hello@noloop.me> Date: Fri, 30 Oct 2015 02:08:02 +0100 Subject: [PATCH] Rev536, Fix stats page, Support ranged http requests for better video browser compatibility, setHashfield command, One by one send hashfield to connected peers if changed, Keep count hashfield changetime, PeerHashfield optimalizations, Wait for peers on checkmodification, Give more time to query trackers, Do not count udp trackers as error if udp disabled, Test hashfield push --- plugins/Stats/StatsPlugin.py | 8 +++--- src/Config.py | 2 +- src/File/FileRequest.py | 23 +++++++++++++-- src/File/FileServer.py | 6 +++- src/Peer/Peer.py | 23 +++++++++++++-- src/Peer/PeerHashfield.py | 33 +++++++++++++++------ src/Site/Site.py | 38 ++++++++++++++++++++---- src/Test/TestPeer.py | 56 +++++++++++++++++++++++++++--------- src/Ui/UiRequest.py | 22 +++++++++++++- 9 files changed, 171 insertions(+), 40 deletions(-) diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 01b3d249..799f3ec5 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -135,15 +135,15 @@ class UiRequestPlugin(object): ]) yield "<tr><td id='peers_%s' style='display: none; white-space: pre'>" % site.address for key, peer in site.peers.items(): - if peer.last_found: - last_found = int(time.time()-peer.last_found)/60 + if peer.time_found: + time_found = int(time.time()-peer.time_found)/60 else: - last_found = "--" + time_found = "--" if peer.connection: connection_id = peer.connection.id else: connection_id = None - yield "(#%s, err: %s, found: %s min ago) %22s -<br>" % (connection_id, peer.connection_error, last_found, key) + yield "(#%s, err: %s, found: %s min ago) %22s -<br>" % (connection_id, peer.connection_error, time_found, key) yield "<br></td></tr>" yield "</table>" diff --git a/src/Config.py b/src/Config.py index 7fbafb8a..6610cbcb 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.2" - self.rev = 505 + self.rev = 536 self.argv = argv self.action = None self.createParser() diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 216cbe86..418e7637 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -1,5 +1,6 @@ # Included modules import os +import time from cStringIO import StringIO # Third party modules @@ -69,6 +70,8 @@ class FileRequest(object): self.actionGetHashfield(params) elif cmd == "findHashIds": self.actionFindHashIds(params) + elif cmd == "setHashfield": + self.actionSetHashfield(params) elif cmd == "ping": self.actionPing() else: @@ -268,9 +271,11 @@ class FileRequest(object): return False # Add peer to site if not added before - connected_peer = site.addPeer(self.connection.ip, self.connection.port) - if connected_peer: # Just added - connected_peer.connect(self.connection) # Assign current connection to peer + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) + if not peer.connection: # Just added + peer.connect(self.connection) # Assign current connection to peer + + peer.time_my_hashfield_sent = time.time() # Don't send again if not changed self.response({"hashfield_raw": site.content_manager.hashfield.tostring()}) @@ -297,6 +302,18 @@ class FileRequest(object): ) self.response({"peers": back}) + def actionSetHashfield(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + return False + + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) # Add or get peer + if not peer.connection: + peer.connect(self.connection) + peer.hashfield.replaceFromString(params["hashfield_raw"]) + self.response({"ok": "Updated"}) + # Send a simple Pong! answer def actionPing(self): self.response("Pong!") diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 473224c9..209c4a48 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -172,7 +172,7 @@ class FileServer(ConnectionServer): import gc first_announce = True # First start while 1: - # Sites healthcare + # Sites healthcare every 20 min if config.trackers_file: config.loadTrackersFile() for address, site in self.sites.items(): @@ -196,6 +196,9 @@ class FileServer(ConnectionServer): if self.port_opened is False: site.needConnections() + if first_announce: # Send my optional files to peers + site.sendMyHashfield() + time.sleep(2) # Prevent too quick request site = None @@ -208,6 +211,7 @@ class FileServer(ConnectionServer): config.loadTrackersFile() for address, site in self.sites.items(): site.announce(num=1, pex=False) + site.sendMyHashfield(num_send=1) time.sleep(2) first_announce = False diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index e169c7a2..61dd26c8 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -17,7 +17,7 @@ if config.use_tempfiles: class Peer(object): __slots__ = ( "ip", "port", "site", "key", "connection", "time_found", "time_response", "time_hashfield", "time_added", - "last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" + "time_my_hashfield_sent", "last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" ) def __init__(self, ip, port, site=None): @@ -28,7 +28,8 @@ class Peer(object): self.connection = None self.hashfield = PeerHashfield() # Got optional files hash_id - self.time_hashfield = None # Last time hashfiled downloaded + self.time_hashfield = None # Last time peer's hashfiled downloaded + self.time_my_hashfield_sent = None # Last time my hashfield sent to peer self.time_found = time.time() # Time of last found in the torrent tracker self.time_response = None # Time of last successful response from peer self.time_added = time.time() @@ -87,7 +88,7 @@ class Peer(object): def found(self): self.time_found = time.time() - # Send a command to peer + # Send a command to peer and return response value def request(self, cmd, params={}, stream_to=None): if not self.connection or self.connection.closed: self.connect() @@ -239,6 +240,7 @@ class Peer(object): return self.hashfield + # Find peers for hashids # Return: {hash1: ["ip:port", "ip:port",...],...} def findHashIds(self, hash_ids): res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids}) @@ -246,6 +248,21 @@ class Peer(object): return False return {key: map(helper.unpackAddress, val) for key, val in res["peers"].iteritems()} + # Send my hashfield to peer + # Return: True if sent + def sendMyHashfield(self): + if self.connection and self.connection.handshake.get("rev", 0) < 510: + return False # Not supported + if self.time_my_hashfield_sent and self.site.content_manager.hashfield.time_changed <= self.time_my_hashfield_sent: + return False # Peer already has the latest hashfield + + res = self.request("setHashfield", {"site": self.site.address, "hashfield_raw": self.site.content_manager.hashfield.tostring()}) + if not res or "error" in res: + return False + else: + self.time_my_hashfield_sent = time.time() + return True + # Stop and remove from site def remove(self): self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) diff --git a/src/Peer/PeerHashfield.py b/src/Peer/PeerHashfield.py index 0a48dd61..0137ff90 100644 --- a/src/Peer/PeerHashfield.py +++ b/src/Peer/PeerHashfield.py @@ -1,9 +1,11 @@ import array +import time class PeerHashfield(): def __init__(self): self.storage = self.createStoreage() + self.time_changed = time.time() def createStoreage(self): storage = array.array("H") @@ -17,23 +19,26 @@ class PeerHashfield(): def appendHash(self, hash): hash_id = int(hash[0:4], 16) - if hash_id not in self: - self.append(hash_id) + if hash_id not in self.storage: + self.storage.append(hash_id) + self.time_changed = time.time() return True else: return False def appendHashId(self, hash_id): - if hash_id not in self: - self.append(hash_id) + if hash_id not in self.storage: + self.storage.append(hash_id) + self.time_changed = time.time() return True else: return False def removeHash(self, hash): hash_id = int(hash[0:4], 16) - if hash_id in self: - self.remove(hash_id) + if hash_id in self.storage: + self.storage.remove(hash_id) + self.time_changed = time.time() return True else: return False @@ -42,8 +47,20 @@ class PeerHashfield(): return int(hash[0:4], 16) def hasHash(self, hash): - return int(hash[0:4], 16) in self + return int(hash[0:4], 16) in self.storage def replaceFromString(self, hashfield_raw): self.storage = self.createStoreage() - self.fromstring(hashfield_raw) + self.storage.fromstring(hashfield_raw) + self.time_changed = time.time() + +if __name__ == "__main__": + field = PeerHashfield() + s = time.time() + for i in range(10000): + field.appendHashId(i) + print time.time()-s + s = time.time() + for i in range(10000): + field.hasHash("AABB") + print time.time()-s \ No newline at end of file diff --git a/src/Site/Site.py b/src/Site/Site.py index 0ed7f621..19c0a8bf 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -210,6 +210,13 @@ class Site: peers_try = [] # Try these peers queried = [] # Successfully queried from these peers + # Wait for peers + if not self.peers: + for wait in range(10): + time.sleep(5+wait) + self.log.debug("Waiting for peers...") + if self.peers: break + peers = self.peers.values() random.shuffle(peers) for peer in peers: # Try to find connected good peers, but we must have at least 5 peers @@ -218,7 +225,7 @@ class Site: elif len(peers_try) < 5: # Backup peers, add to end of the try list peers_try.append(peer) - if since is None: # No since definied, download from last modification time-1day + if since is None: # No since defined, download from last modification time-1day since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24 self.log.debug("Try to get listModifications from peers: %s since: %s" % (peers_try, since)) @@ -548,8 +555,8 @@ class Site: try: url = "http://" + address + "?" + urllib.urlencode(params) # Load url - with gevent.Timeout(10, False): # Make sure of timeout - req = urllib2.urlopen(url, timeout=8) + with gevent.Timeout(30, False): # Make sure of timeout + req = urllib2.urlopen(url, timeout=25) response = req.read() req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions req.close() @@ -593,7 +600,10 @@ class Site: return # No reannouncing within 30 secs self.time_announce = time.time() - trackers = config.trackers + if config.disable_udp: + trackers = [tracker for tracker in config.trackers if not tracker.startswith("udp://")] + else: + trackers = config.trackers if num == 1: # Only announce on one tracker, increment the queried tracker id self.last_tracker_id += 1 self.last_tracker_id = self.last_tracker_id % len(trackers) @@ -622,7 +632,7 @@ class Site: if len(threads) > num: # Announce limit break - gevent.joinall(threads) # Wait for announce finish + gevent.joinall(threads, timeout=10) # Wait for announce finish for thread in threads: if thread.value: @@ -630,7 +640,10 @@ class Site: slow.append("%.2fs %s://%s" % (thread.value, thread.protocol, thread.address)) announced += 1 else: - errors.append("%s://%s" % (thread.protocol, thread.address)) + if thread.ready(): + errors.append("%s://%s" % (thread.protocol, thread.address)) + else: # Still running + slow.append("10s+ %s://%s" % (thread.protocol, thread.address)) # Save peers num self.settings["peers"] = len(self.peers) @@ -721,6 +734,19 @@ class Site: if removed: self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers))) + # Send hashfield to peers + def sendMyHashfield(self, num_send=3): + if not self.content_manager.hashfield: # No optional files + return False + num_sent = 0 + connected_peers = self.getConnectedPeers() + for peer in connected_peers: + if peer.sendMyHashfield(): + num_sent += 1 + if num_sent >= num_send: + return True + return False + # - Events - # Add event listeners diff --git a/src/Test/TestPeer.py b/src/Test/TestPeer.py index cfa572c3..1140c02f 100644 --- a/src/Test/TestPeer.py +++ b/src/Test/TestPeer.py @@ -1,9 +1,12 @@ +import time +from cStringIO import StringIO import pytest from File import FileServer +from File import FileRequest from Crypt import CryptHash -from cStringIO import StringIO +import Spy @pytest.mark.usefixtures("resetSettings") @@ -77,27 +80,54 @@ class TestPeer: assert site.content_manager.hashfield.getHashId(new_hash) not in site.content_manager.hashfield def testHashfieldExchange(self, file_server, site, site_temp): - file_server.ip_incoming = {} # Reset flood protection - file_server.sites[site.address] = site - client = FileServer("127.0.0.1", 1545) - client.sites[site_temp.address] = site_temp - site_temp.connection_server = client - connection = client.getConnection("127.0.0.1", 1544) + server1 = file_server + server1.ip_incoming = {} # Reset flood protection + server1.sites[site.address] = site + server2 = FileServer("127.0.0.1", 1545) + server2.sites[site_temp.address] = site_temp + site_temp.connection_server = server2 site.storage.verifyFiles(quick_check=True) # Find what optional files we have # Add file_server as peer to client - peer_file_server = site_temp.addPeer("127.0.0.1", 1544) + server2_peer1 = site_temp.addPeer("127.0.0.1", 1544) # Check if hashfield has any files assert len(site.content_manager.hashfield) > 0 # Testing hashfield sync - assert len(peer_file_server.hashfield) == 0 - assert peer_file_server.updateHashfield() - assert len(peer_file_server.hashfield) > 0 + assert len(server2_peer1.hashfield) == 0 + assert server2_peer1.updateHashfield() # Query hashfield from peer + assert len(server2_peer1.hashfield) > 0 - connection.close() - client.stop() + # Test force push new hashfield + site_temp.content_manager.hashfield.appendHash("AABB") + server1_peer2 = site.addPeer("127.0.0.1", 1545, return_peer=True) + with Spy.Spy(FileRequest, "route") as requests: + assert len(server1_peer2.hashfield) == 0 + server2_peer1.sendMyHashfield() + assert len(server1_peer2.hashfield) == 1 + server2_peer1.sendMyHashfield() # Hashfield not changed, should be ignored + + assert len(requests) == 1 + + time.sleep(0.01) # To make hashfield change date different + + site_temp.content_manager.hashfield.appendHash("AACC") + server2_peer1.sendMyHashfield() # Push hashfield + + assert len(server1_peer2.hashfield) == 2 + assert len(requests) == 2 + + site_temp.content_manager.hashfield.appendHash("AADD") + + assert server1_peer2.updateHashfield(force=True) # Request hashfield + assert len(server1_peer2.hashfield) == 3 + assert len(requests) == 3 + + assert not server2_peer1.sendMyHashfield() # Not changed, should be ignored + assert len(requests) == 3 + + server2.stop() def testFindHash(self, file_server, site, site_temp): file_server.ip_incoming = {} # Reset flood protection diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index 148cd997..4d762ccd 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -14,6 +14,7 @@ from Crypt import CryptHash status_texts = { 200: "200 OK", + 206: "206 Partial Content", 400: "400 Bad Request", 403: "403 Forbidden", 404: "404 Not Found", @@ -367,10 +368,29 @@ class UiRequest(object): # TODO: Dont allow external access: extra_headers= # [("Content-Security-Policy", "default-src 'unsafe-inline' data: http://localhost:43110 ws://localhost:43110")] + range = self.env.get("HTTP_RANGE") + range_start = None if send_header: - self.sendHeader(content_type=content_type) + extra_headers = {} + file_size = os.path.getsize(file_path) + extra_headers["Accept-Ranges"] = "bytes" + if range: + range_start = int(re.match(".*?([0-9]+)", range).group(1)) + if re.match(".*?-([0-9]+)", range): + range_end = int(re.match(".*?-([0-9]+)", range).group(1))+1 + else: + range_end = file_size + extra_headers["Content-Length"] = range_end - range_start + extra_headers["Content-Range"] = "bytes %s-%s/%s" % (range_start, range_end-1, file_size) + if range: + status = 206 + else: + status = 200 + self.sendHeader(status, content_type=content_type, extra_headers=extra_headers.items()) if self.env["REQUEST_METHOD"] != "OPTIONS": file = open(file_path, "rb") + if range_start: + file.seek(range_start) while 1: try: block = file.read(block_size)