diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 01b3d249..799f3ec5 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -135,15 +135,15 @@ class UiRequestPlugin(object): ]) yield "" % site.address for key, peer in site.peers.items(): - if peer.last_found: - last_found = int(time.time()-peer.last_found)/60 + if peer.time_found: + time_found = int(time.time()-peer.time_found)/60 else: - last_found = "--" + time_found = "--" if peer.connection: connection_id = peer.connection.id else: connection_id = None - yield "(#%s, err: %s, found: %s min ago) %22s -
" % (connection_id, peer.connection_error, last_found, key) + yield "(#%s, err: %s, found: %s min ago) %22s -
" % (connection_id, peer.connection_error, time_found, key) yield "
" yield "" diff --git a/src/Config.py b/src/Config.py index 7fbafb8a..6610cbcb 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.2" - self.rev = 505 + self.rev = 536 self.argv = argv self.action = None self.createParser() diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 216cbe86..418e7637 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -1,5 +1,6 @@ # Included modules import os +import time from cStringIO import StringIO # Third party modules @@ -69,6 +70,8 @@ class FileRequest(object): self.actionGetHashfield(params) elif cmd == "findHashIds": self.actionFindHashIds(params) + elif cmd == "setHashfield": + self.actionSetHashfield(params) elif cmd == "ping": self.actionPing() else: @@ -268,9 +271,11 @@ class FileRequest(object): return False # Add peer to site if not added before - connected_peer = site.addPeer(self.connection.ip, self.connection.port) - if connected_peer: # Just added - connected_peer.connect(self.connection) # Assign current connection to peer + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) + if not peer.connection: # Just added + peer.connect(self.connection) # Assign current connection to peer + + peer.time_my_hashfield_sent = time.time() # Don't send again if not changed self.response({"hashfield_raw": site.content_manager.hashfield.tostring()}) @@ -297,6 +302,18 @@ class FileRequest(object): ) self.response({"peers": back}) + def actionSetHashfield(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + return False + + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) # Add or get peer + if not peer.connection: + peer.connect(self.connection) + peer.hashfield.replaceFromString(params["hashfield_raw"]) + self.response({"ok": "Updated"}) + # Send a simple Pong! answer def actionPing(self): self.response("Pong!") diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 473224c9..209c4a48 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -172,7 +172,7 @@ class FileServer(ConnectionServer): import gc first_announce = True # First start while 1: - # Sites healthcare + # Sites healthcare every 20 min if config.trackers_file: config.loadTrackersFile() for address, site in self.sites.items(): @@ -196,6 +196,9 @@ class FileServer(ConnectionServer): if self.port_opened is False: site.needConnections() + if first_announce: # Send my optional files to peers + site.sendMyHashfield() + time.sleep(2) # Prevent too quick request site = None @@ -208,6 +211,7 @@ class FileServer(ConnectionServer): config.loadTrackersFile() for address, site in self.sites.items(): site.announce(num=1, pex=False) + site.sendMyHashfield(num_send=1) time.sleep(2) first_announce = False diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index e169c7a2..61dd26c8 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -17,7 +17,7 @@ if config.use_tempfiles: class Peer(object): __slots__ = ( "ip", "port", "site", "key", "connection", "time_found", "time_response", "time_hashfield", "time_added", - "last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" + "time_my_hashfield_sent", "last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" ) def __init__(self, ip, port, site=None): @@ -28,7 +28,8 @@ class Peer(object): self.connection = None self.hashfield = PeerHashfield() # Got optional files hash_id - self.time_hashfield = None # Last time hashfiled downloaded + self.time_hashfield = None # Last time peer's hashfiled downloaded + self.time_my_hashfield_sent = None # Last time my hashfield sent to peer self.time_found = time.time() # Time of last found in the torrent tracker self.time_response = None # Time of last successful response from peer self.time_added = time.time() @@ -87,7 +88,7 @@ class Peer(object): def found(self): self.time_found = time.time() - # Send a command to peer + # Send a command to peer and return response value def request(self, cmd, params={}, stream_to=None): if not self.connection or self.connection.closed: self.connect() @@ -239,6 +240,7 @@ class Peer(object): return self.hashfield + # Find peers for hashids # Return: {hash1: ["ip:port", "ip:port",...],...} def findHashIds(self, hash_ids): res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids}) @@ -246,6 +248,21 @@ class Peer(object): return False return {key: map(helper.unpackAddress, val) for key, val in res["peers"].iteritems()} + # Send my hashfield to peer + # Return: True if sent + def sendMyHashfield(self): + if self.connection and self.connection.handshake.get("rev", 0) < 510: + return False # Not supported + if self.time_my_hashfield_sent and self.site.content_manager.hashfield.time_changed <= self.time_my_hashfield_sent: + return False # Peer already has the latest hashfield + + res = self.request("setHashfield", {"site": self.site.address, "hashfield_raw": self.site.content_manager.hashfield.tostring()}) + if not res or "error" in res: + return False + else: + self.time_my_hashfield_sent = time.time() + return True + # Stop and remove from site def remove(self): self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) diff --git a/src/Peer/PeerHashfield.py b/src/Peer/PeerHashfield.py index 0a48dd61..0137ff90 100644 --- a/src/Peer/PeerHashfield.py +++ b/src/Peer/PeerHashfield.py @@ -1,9 +1,11 @@ import array +import time class PeerHashfield(): def __init__(self): self.storage = self.createStoreage() + self.time_changed = time.time() def createStoreage(self): storage = array.array("H") @@ -17,23 +19,26 @@ class PeerHashfield(): def appendHash(self, hash): hash_id = int(hash[0:4], 16) - if hash_id not in self: - self.append(hash_id) + if hash_id not in self.storage: + self.storage.append(hash_id) + self.time_changed = time.time() return True else: return False def appendHashId(self, hash_id): - if hash_id not in self: - self.append(hash_id) + if hash_id not in self.storage: + self.storage.append(hash_id) + self.time_changed = time.time() return True else: return False def removeHash(self, hash): hash_id = int(hash[0:4], 16) - if hash_id in self: - self.remove(hash_id) + if hash_id in self.storage: + self.storage.remove(hash_id) + self.time_changed = time.time() return True else: return False @@ -42,8 +47,20 @@ class PeerHashfield(): return int(hash[0:4], 16) def hasHash(self, hash): - return int(hash[0:4], 16) in self + return int(hash[0:4], 16) in self.storage def replaceFromString(self, hashfield_raw): self.storage = self.createStoreage() - self.fromstring(hashfield_raw) + self.storage.fromstring(hashfield_raw) + self.time_changed = time.time() + +if __name__ == "__main__": + field = PeerHashfield() + s = time.time() + for i in range(10000): + field.appendHashId(i) + print time.time()-s + s = time.time() + for i in range(10000): + field.hasHash("AABB") + print time.time()-s \ No newline at end of file diff --git a/src/Site/Site.py b/src/Site/Site.py index 0ed7f621..19c0a8bf 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -210,6 +210,13 @@ class Site: peers_try = [] # Try these peers queried = [] # Successfully queried from these peers + # Wait for peers + if not self.peers: + for wait in range(10): + time.sleep(5+wait) + self.log.debug("Waiting for peers...") + if self.peers: break + peers = self.peers.values() random.shuffle(peers) for peer in peers: # Try to find connected good peers, but we must have at least 5 peers @@ -218,7 +225,7 @@ class Site: elif len(peers_try) < 5: # Backup peers, add to end of the try list peers_try.append(peer) - if since is None: # No since definied, download from last modification time-1day + if since is None: # No since defined, download from last modification time-1day since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24 self.log.debug("Try to get listModifications from peers: %s since: %s" % (peers_try, since)) @@ -548,8 +555,8 @@ class Site: try: url = "http://" + address + "?" + urllib.urlencode(params) # Load url - with gevent.Timeout(10, False): # Make sure of timeout - req = urllib2.urlopen(url, timeout=8) + with gevent.Timeout(30, False): # Make sure of timeout + req = urllib2.urlopen(url, timeout=25) response = req.read() req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions req.close() @@ -593,7 +600,10 @@ class Site: return # No reannouncing within 30 secs self.time_announce = time.time() - trackers = config.trackers + if config.disable_udp: + trackers = [tracker for tracker in config.trackers if not tracker.startswith("udp://")] + else: + trackers = config.trackers if num == 1: # Only announce on one tracker, increment the queried tracker id self.last_tracker_id += 1 self.last_tracker_id = self.last_tracker_id % len(trackers) @@ -622,7 +632,7 @@ class Site: if len(threads) > num: # Announce limit break - gevent.joinall(threads) # Wait for announce finish + gevent.joinall(threads, timeout=10) # Wait for announce finish for thread in threads: if thread.value: @@ -630,7 +640,10 @@ class Site: slow.append("%.2fs %s://%s" % (thread.value, thread.protocol, thread.address)) announced += 1 else: - errors.append("%s://%s" % (thread.protocol, thread.address)) + if thread.ready(): + errors.append("%s://%s" % (thread.protocol, thread.address)) + else: # Still running + slow.append("10s+ %s://%s" % (thread.protocol, thread.address)) # Save peers num self.settings["peers"] = len(self.peers) @@ -721,6 +734,19 @@ class Site: if removed: self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers))) + # Send hashfield to peers + def sendMyHashfield(self, num_send=3): + if not self.content_manager.hashfield: # No optional files + return False + num_sent = 0 + connected_peers = self.getConnectedPeers() + for peer in connected_peers: + if peer.sendMyHashfield(): + num_sent += 1 + if num_sent >= num_send: + return True + return False + # - Events - # Add event listeners diff --git a/src/Test/TestPeer.py b/src/Test/TestPeer.py index cfa572c3..1140c02f 100644 --- a/src/Test/TestPeer.py +++ b/src/Test/TestPeer.py @@ -1,9 +1,12 @@ +import time +from cStringIO import StringIO import pytest from File import FileServer +from File import FileRequest from Crypt import CryptHash -from cStringIO import StringIO +import Spy @pytest.mark.usefixtures("resetSettings") @@ -77,27 +80,54 @@ class TestPeer: assert site.content_manager.hashfield.getHashId(new_hash) not in site.content_manager.hashfield def testHashfieldExchange(self, file_server, site, site_temp): - file_server.ip_incoming = {} # Reset flood protection - file_server.sites[site.address] = site - client = FileServer("127.0.0.1", 1545) - client.sites[site_temp.address] = site_temp - site_temp.connection_server = client - connection = client.getConnection("127.0.0.1", 1544) + server1 = file_server + server1.ip_incoming = {} # Reset flood protection + server1.sites[site.address] = site + server2 = FileServer("127.0.0.1", 1545) + server2.sites[site_temp.address] = site_temp + site_temp.connection_server = server2 site.storage.verifyFiles(quick_check=True) # Find what optional files we have # Add file_server as peer to client - peer_file_server = site_temp.addPeer("127.0.0.1", 1544) + server2_peer1 = site_temp.addPeer("127.0.0.1", 1544) # Check if hashfield has any files assert len(site.content_manager.hashfield) > 0 # Testing hashfield sync - assert len(peer_file_server.hashfield) == 0 - assert peer_file_server.updateHashfield() - assert len(peer_file_server.hashfield) > 0 + assert len(server2_peer1.hashfield) == 0 + assert server2_peer1.updateHashfield() # Query hashfield from peer + assert len(server2_peer1.hashfield) > 0 - connection.close() - client.stop() + # Test force push new hashfield + site_temp.content_manager.hashfield.appendHash("AABB") + server1_peer2 = site.addPeer("127.0.0.1", 1545, return_peer=True) + with Spy.Spy(FileRequest, "route") as requests: + assert len(server1_peer2.hashfield) == 0 + server2_peer1.sendMyHashfield() + assert len(server1_peer2.hashfield) == 1 + server2_peer1.sendMyHashfield() # Hashfield not changed, should be ignored + + assert len(requests) == 1 + + time.sleep(0.01) # To make hashfield change date different + + site_temp.content_manager.hashfield.appendHash("AACC") + server2_peer1.sendMyHashfield() # Push hashfield + + assert len(server1_peer2.hashfield) == 2 + assert len(requests) == 2 + + site_temp.content_manager.hashfield.appendHash("AADD") + + assert server1_peer2.updateHashfield(force=True) # Request hashfield + assert len(server1_peer2.hashfield) == 3 + assert len(requests) == 3 + + assert not server2_peer1.sendMyHashfield() # Not changed, should be ignored + assert len(requests) == 3 + + server2.stop() def testFindHash(self, file_server, site, site_temp): file_server.ip_incoming = {} # Reset flood protection diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index 148cd997..4d762ccd 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -14,6 +14,7 @@ from Crypt import CryptHash status_texts = { 200: "200 OK", + 206: "206 Partial Content", 400: "400 Bad Request", 403: "403 Forbidden", 404: "404 Not Found", @@ -367,10 +368,29 @@ class UiRequest(object): # TODO: Dont allow external access: extra_headers= # [("Content-Security-Policy", "default-src 'unsafe-inline' data: http://localhost:43110 ws://localhost:43110")] + range = self.env.get("HTTP_RANGE") + range_start = None if send_header: - self.sendHeader(content_type=content_type) + extra_headers = {} + file_size = os.path.getsize(file_path) + extra_headers["Accept-Ranges"] = "bytes" + if range: + range_start = int(re.match(".*?([0-9]+)", range).group(1)) + if re.match(".*?-([0-9]+)", range): + range_end = int(re.match(".*?-([0-9]+)", range).group(1))+1 + else: + range_end = file_size + extra_headers["Content-Length"] = range_end - range_start + extra_headers["Content-Range"] = "bytes %s-%s/%s" % (range_start, range_end-1, file_size) + if range: + status = 206 + else: + status = 200 + self.sendHeader(status, content_type=content_type, extra_headers=extra_headers.items()) if self.env["REQUEST_METHOD"] != "OPTIONS": file = open(file_path, "rb") + if range_start: + file.seek(range_start) while 1: try: block = file.read(block_size)