diff --git a/src/Config.py b/src/Config.py index 67a1761b..15ed822b 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.2" - self.rev = 473 + self.rev = 477 self.argv = argv self.action = None self.createParser() diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 23d90334..8f1c79b4 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -67,6 +67,8 @@ class FileRequest(object): self.actionListModified(params) elif cmd == "getHashfield": self.actionGetHashfield(params) + elif cmd == "findHashIds": + self.actionFindHashIds(params) elif cmd == "ping": self.actionPing() else: @@ -272,6 +274,17 @@ class FileRequest(object): self.response({"hashfield_raw": site.content_manager.hashfield.tostring()}) + def actionFindHashIds(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + return False + + found = site.worker_manager.findOptionalHashIds(params["hash_ids"]) + back = {} + for hash_id, peers in found.iteritems(): + back[hash_id] = [helper.packAddress(peer.ip, peer.port) for peer in peers] + self.response({"peers": back}) # Send a simple Pong! answer def actionPing(self): diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index a0f65005..e169c7a2 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -1,6 +1,5 @@ import logging import time -import array import gevent @@ -17,8 +16,8 @@ if config.use_tempfiles: # Communicate remote peers class Peer(object): __slots__ = ( - "ip", "port", "site", "key", "connection", "last_found", "last_response", "last_ping", "last_hashfield", - "hashfield", "added", "connection_error", "hash_failed", "download_bytes", "download_time" + "ip", "port", "site", "key", "connection", "time_found", "time_response", "time_hashfield", "time_added", + "last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" ) def __init__(self, ip, port, site=None): @@ -29,11 +28,11 @@ class Peer(object): self.connection = None self.hashfield = PeerHashfield() # Got optional files hash_id - self.last_hashfield = 0 # Last time hashfiled downloaded - self.last_found = time.time() # Time of last found in the torrent tracker - self.last_response = None # Time of last successful response from peer + self.time_hashfield = None # Last time hashfiled downloaded + self.time_found = time.time() # Time of last found in the torrent tracker + self.time_response = None # Time of last successful response from peer + self.time_added = time.time() self.last_ping = None # Last response time for ping - self.added = time.time() self.connection_error = 0 # Series of connection error self.hash_failed = 0 # Number of bad files from peer @@ -86,7 +85,7 @@ class Peer(object): # Found a peer on tracker def found(self): - self.last_found = time.time() + self.time_found = time.time() # Send a command to peer def request(self, cmd, params={}, stream_to=None): @@ -98,16 +97,16 @@ class Peer(object): for retry in range(1, 3): # Retry 3 times try: - response = self.connection.request(cmd, params, stream_to) - if not response: + res = self.connection.request(cmd, params, stream_to) + if not res: raise Exception("Send error") - if "error" in response: - self.log("%s error: %s" % (cmd, response["error"])) + if "error" in res: + self.log("%s error: %s" % (cmd, res["error"])) self.onConnectionError() else: # Successful request, reset connection error num self.connection_error = 0 - self.last_response = time.time() - return response + self.time_response = time.time() + return res except Exception, err: if type(err).__name__ == "Notify": # Greenlet killed by worker self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd)) @@ -136,21 +135,21 @@ class Peer(object): s = time.time() while True: # Read in 512k parts - back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location}) + res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location}) - if not back or "body" not in back: # Error + if not res or "body" not in res: # Error return False - buff.write(back["body"]) - back["body"] = None # Save memory - if back["location"] == back["size"]: # End of file + buff.write(res["body"]) + res["body"] = None # Save memory + if res["location"] == res["size"]: # End of file break else: - location = back["location"] + location = res["location"] - self.download_bytes += back["location"] + self.download_bytes += res["location"] self.download_time += (time.time() - s) - self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"] + self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"] buff.seek(0) return buff @@ -164,20 +163,20 @@ class Peer(object): s = time.time() while True: # Read in 512k parts - back = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff) + res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff) - if not back: # Error - self.log("Invalid response: %s" % back) + if not res: # Error + self.log("Invalid response: %s" % res) return False - if back["location"] == back["size"]: # End of file + if res["location"] == res["size"]: # End of file break else: - location = back["location"] + location = res["location"] - self.download_bytes += back["location"] + self.download_bytes += res["location"] self.download_time += (time.time() - s) - self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"] + self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"] buff.seek(0) return buff @@ -187,9 +186,9 @@ class Peer(object): for retry in range(1, 3): # Retry 3 times s = time.time() with gevent.Timeout(10.0, False): # 10 sec timeout, don't raise exception - response = self.request("ping") + res = self.request("ping") - if response and "body" in response and response["body"] == "Pong!": + if res and "body" in res and res["body"] == "Pong!": response_time = time.time() - s break # All fine, exit from for loop # Timeout reached or bad response @@ -210,11 +209,11 @@ class Peer(object): site = self.site # If no site defined request peers for this site # give him/her 5 connectible peers packed_peers = [peer.packMyAddress() for peer in self.site.getConnectablePeers(5)] - response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num}) - if not response or "error" in response: + res = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num}) + if not res or "error" in res: return False added = 0 - for peer in response.get("peers", []): + for peer in res.get("peers", []): address = helper.unpackAddress(peer) if site.addPeer(*address): added += 1 @@ -222,21 +221,31 @@ class Peer(object): self.log("Added peers using pex: %s" % added) return added - # Request optional files hashfield from peer - def getHashfield(self): - self.last_hashfield = time.time() - res = self.request("getHashfield", {"site": self.site.address}) - if res and "error" not in res: - self.hashfield.replaceFromString(res["hashfield_raw"]) - return self.hashfield - else: - return False - # List modified files since the date # Return: {inner_path: modification date,...} def listModified(self, since): return self.request("listModified", {"since": since, "site": self.site.address}) + def updateHashfield(self, force=False): + # Don't update hashfield again in 15 min + if self.time_hashfield and time.time() - self.time_hashfield > 60 * 15 and not force: + return False + + self.time_hashfield = time.time() + res = self.request("getHashfield", {"site": self.site.address}) + if not res or "error" in res: + return False + self.hashfield.replaceFromString(res["hashfield_raw"]) + + return self.hashfield + + # Return: {hash1: ["ip:port", "ip:port",...],...} + def findHashIds(self, hash_ids): + res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids}) + if not res or "error" in res: + return False + return {key: map(helper.unpackAddress, val) for key, val in res["peers"].iteritems()} + # Stop and remove from site def remove(self): self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) @@ -245,34 +254,6 @@ class Peer(object): if self.connection: self.connection.close() - # - HASHFIELD - - - def updateHashfield(self, force=False): - # Don't update hashfield again in 15 min - if self.last_hashfield and time.time() - self.last_hashfield > 60 * 15 and not force: - return False - - response = self.request("getHashfield", {"site": self.site.address}) - if not response or "error" in response: - return False - self.last_hashfield = time.time() - self.hashfield = response["hashfield"] - - return self.hashfield - - def setHashfield(self, hashfield_dump): - self.hashfield.fromstring(hashfield_dump) - - def hasHash(self, hash_id): - return hash_id in self.hashfield - - # Return: ["ip:port", "ip:port",...] - def findHash(self, hash_id): - response = self.request("findHash", {"site": self.site.address, "hash_id": hash_id}) - if not response or "error" in response: - return False - return [helper.unpackAddress(peer) for peer in response["peers"]] - # - EVENTS - # On connection error diff --git a/src/Peer/PeerHashfield.py b/src/Peer/PeerHashfield.py index 593932d3..0a48dd61 100644 --- a/src/Peer/PeerHashfield.py +++ b/src/Peer/PeerHashfield.py @@ -23,6 +23,13 @@ class PeerHashfield(): else: return False + def appendHashId(self, hash_id): + if hash_id not in self: + self.append(hash_id) + return True + else: + return False + def removeHash(self, hash): hash_id = int(hash[0:4], 16) if hash_id in self: @@ -39,4 +46,4 @@ class PeerHashfield(): def replaceFromString(self, hashfield_raw): self.storage = self.createStoreage() - self.fromstring(hashfield_raw) \ No newline at end of file + self.fromstring(hashfield_raw) diff --git a/src/Site/Site.py b/src/Site/Site.py index f236421e..0ed7f621 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -39,7 +39,7 @@ class Site: self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself) - self.last_announce = 0 # Last announce time to tracker + self.time_announce = 0 # Last announce time to tracker self.last_tracker_id = random.randint(0, 10) # Last announced tracker id self.worker_manager = WorkerManager(self) # Handle site download from other peers self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) @@ -589,9 +589,9 @@ class Site: # Add myself and get other peers from tracker def announce(self, force=False, num=5, pex=True): - if time.time() < self.last_announce + 30 and not force: + if time.time() < self.time_announce + 30 and not force: return # No reannouncing within 30 secs - self.last_announce = time.time() + self.time_announce = time.time() trackers = config.trackers if num == 1: # Only announce on one tracker, increment the queried tracker id @@ -712,7 +712,7 @@ class Site: continue if peer.connection and not peer.connection.connected: peer.connection = None # Dead connection - if time.time() - peer.last_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour + if time.time() - peer.time_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour peer.remove() removed += 1 if removed > 5: # Don't remove too much at once diff --git a/src/Test/TestPeer.py b/src/Test/TestPeer.py index ac6521d7..cfa572c3 100644 --- a/src/Test/TestPeer.py +++ b/src/Test/TestPeer.py @@ -8,7 +8,7 @@ from cStringIO import StringIO @pytest.mark.usefixtures("resetSettings") @pytest.mark.usefixtures("resetTempSettings") -class TestFileRequest: +class TestPeer: def testPing(self, file_server, site, site_temp): file_server.ip_incoming = {} # Reset flood protection file_server.sites[site.address] = site @@ -53,9 +53,13 @@ class TestFileRequest: def testHashfield(self, site): sample_hash = site.content_manager.contents["content.json"]["files_optional"].values()[0]["sha512"] + + assert not site.content_manager.hashfield + site.storage.verifyFiles(quick_check=True) # Find what optional files we have # Check if hashfield has any files + assert site.content_manager.hashfield assert len(site.content_manager.hashfield) > 0 # Check exsist hash @@ -89,8 +93,35 @@ class TestFileRequest: # Testing hashfield sync assert len(peer_file_server.hashfield) == 0 - assert peer_file_server.getHashfield() + assert peer_file_server.updateHashfield() assert len(peer_file_server.hashfield) > 0 connection.close() client.stop() + + def testFindHash(self, file_server, site, site_temp): + file_server.ip_incoming = {} # Reset flood protection + file_server.sites[site.address] = site + client = FileServer("127.0.0.1", 1545) + client.sites[site_temp.address] = site_temp + site_temp.connection_server = client + + # Add file_server as peer to client + peer_file_server = site_temp.addPeer("127.0.0.1", 1544) + + assert peer_file_server.findHashIds([1234]) == {} + + # Add fake peer with requred hash + fake_peer_1 = site.addPeer("1.2.3.4", 1544) + fake_peer_1.hashfield.append(1234) + fake_peer_2 = site.addPeer("1.2.3.5", 1545) + fake_peer_2.hashfield.append(1234) + fake_peer_2.hashfield.append(1235) + fake_peer_3 = site.addPeer("1.2.3.6", 1546) + fake_peer_3.hashfield.append(1235) + fake_peer_3.hashfield.append(1236) + + assert peer_file_server.findHashIds([1234, 1235]) == { + 1234: [('1.2.3.4', 1544), ('1.2.3.5', 1545)], + 1235: [('1.2.3.5', 1545), ('1.2.3.6', 1546)] + } diff --git a/src/Test/TestSiteDownload.py b/src/Test/TestSiteDownload.py index 7630f847..70d5bfec 100644 --- a/src/Test/TestSiteDownload.py +++ b/src/Test/TestSiteDownload.py @@ -1,9 +1,14 @@ +import time + import pytest import mock +import gevent from Connection import ConnectionServer from Config import config from File import FileRequest +from File import FileServer +from Site import Site import Spy @@ -11,7 +16,8 @@ import Spy @pytest.mark.usefixtures("resetSettings") class TestSiteDownload: def testDownload(self, file_server, site, site_temp): - client = ConnectionServer("127.0.0.1", 1545) + file_server.ip_incoming = {} # Reset flood protection + assert site.storage.directory == config.data_dir + "/" + site.address assert site_temp.storage.directory == config.data_dir + "-temp/" + site.address @@ -20,6 +26,7 @@ class TestSiteDownload: file_server.sites[site.address] = site # Init client server + client = ConnectionServer("127.0.0.1", 1545) site_temp.connection_server = client site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net @@ -48,7 +55,33 @@ class TestSiteDownload: assert len(site_temp.content_manager.contents) == len(site.content_manager.contents) - 1 assert not bad_files - # Optional file + [connection.close() for connection in file_server.connections] + + + # Test when connected peer has the optional file + def testOptionalDownload(self, file_server, site, site_temp): + file_server.ip_incoming = {} # Reset flood protection + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + client = ConnectionServer("127.0.0.1", 1545) + site_temp.connection_server = client + site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net + + site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + # Download optional data/optional.txt + site.storage.verifyFiles(quick_check=True) # Find what optional files we have + optional_file_info = site_temp.content_manager.getFileInfo("data/optional.txt") + assert site.content_manager.hashfield.hasHash(optional_file_info["sha512"]) + assert not site_temp.content_manager.hashfield.hasHash(optional_file_info["sha512"]) + assert not site_temp.storage.isFile("data/optional.txt") assert site.storage.isFile("data/optional.txt") site_temp.needFile("data/optional.txt") @@ -59,6 +92,7 @@ class TestSiteDownload: optional_file_info = site_temp.content_manager.getFileInfo( "data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/peanut-butter-jelly-time.gif" ) + assert site.content_manager.hashfield.hasHash(optional_file_info["sha512"]) assert not site_temp.content_manager.hashfield.hasHash(optional_file_info["sha512"]) site_temp.needFile("data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/peanut-butter-jelly-time.gif") @@ -66,3 +100,46 @@ class TestSiteDownload: assert site_temp.content_manager.hashfield.hasHash(optional_file_info["sha512"]) assert site_temp.storage.deleteFiles() + [connection.close() for connection in file_server.connections] + + # Test when connected peer does not has the file, so ask him if he know someone who has it + def testFindOptional(self, file_server, site, site_temp): + file_server.ip_incoming = {} # Reset flood protection + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init full source server (has optional files) + site_full = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") + file_server_full = FileServer("127.0.0.1", 1546) + site_full.connection_server = file_server_full + gevent.spawn(lambda: ConnectionServer.start(file_server_full)) + time.sleep(0) # Port opening + file_server_full.sites[site_full.address] = site_full # Add site + site_full.storage.verifyFiles(quick_check=True) # Check optional files + site_full_peer = site.addPeer("127.0.0.1", 1546) # Add it to source server + assert site_full_peer.updateHashfield() # Update hashfield + + # Init client server + site_temp.connection_server = ConnectionServer("127.0.0.1", 1545) + site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net + site_temp.addPeer("127.0.0.1", 1544) # Add source server + + # Download normal files + site_temp.download(blind_includes=True).join(timeout=5) + + # Download optional data/optional.txt + optional_file_info = site_temp.content_manager.getFileInfo("data/optional.txt") + assert not site_temp.storage.isFile("data/optional.txt") + assert not site.content_manager.hashfield.hasHash(optional_file_info["sha512"]) # Source server don't know he has the file + assert site_full_peer.hashfield.hasHash(optional_file_info["sha512"]) # Source full peer on source server has the file + assert site_full.content_manager.hashfield.hasHash(optional_file_info["sha512"]) # Source full server he has the file + + with Spy.Spy(FileRequest, "route") as requests: + site_temp.needFile("data/optional.txt") + print requests + + assert site_temp.storage.deleteFiles() + file_server_full.stop() + [connection.close() for connection in file_server.connections] diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index f4ee0e51..41ff5b53 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -1,10 +1,13 @@ import time import logging import random +import collections import gevent from Worker import Worker +from util import helper +import util MAX_WORKERS = 10 # Max concurent workers @@ -15,7 +18,7 @@ class WorkerManager: self.site = site self.workers = {} # Key: ip:port, Value: Worker.Worker self.tasks = [] - # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, + # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids} self.started_task_num = 0 # Last added task num self.running = True @@ -60,12 +63,19 @@ class WorkerManager: elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers: # Task started more than 15 sec ago or no workers - self.log.debug("Task taking more than 15 secs, find more peers: %s" % task["inner_path"]) + workers = self.findWorkers(task) + self.log.debug( + "Task taking more than 15 secs, workers: %s find more peers: %s" % + (len(workers), task["inner_path"]) + ) task["site"].announce(num=1) # Find more peers - if task["peers"]: # Release the peer lock - self.log.debug("Task peer lock release: %s" % task["inner_path"]) - task["peers"] = [] - self.startWorkers() + if task["optional_hash_id"]: + self.startFindOptional() + else: + if task["peers"]: # Release the peer lock + self.log.debug("Task peer lock release: %s" % task["inner_path"]) + task["peers"] = [] + self.startWorkers() break # One reannounce per loop self.log.debug("checkTasks stopped running") @@ -90,7 +100,7 @@ class WorkerManager: priority += 1 # boost included content.json files priority a bit elif inner_path.endswith(".json"): priority += 2 # boost data json files priority more - return priority - task["workers_num"]*5 # Prefer more priority and less workers + return priority - task["workers_num"] * 5 # Prefer more priority and less workers # Returns the next free or less worked task def getTask(self, peer): @@ -100,6 +110,8 @@ class WorkerManager: continue # This peer not allowed to pick this task if peer in task["failed"]: continue # Peer already tried to solve this, but failed + if task["optional_hash_id"] and task["peers"] is None: + continue # No peers found yet for the optional task return task # New peers added to site @@ -135,6 +147,114 @@ class WorkerManager: if worker: self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), MAX_WORKERS)) + # Find peers for optional hash in local hash tables and add to task peers + def findOptionalTasks(self, optional_tasks): + found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...} + + for peer in self.site.peers.values(): + if not peer.hashfield: + continue + + for task in optional_tasks: + optional_hash_id = task["optional_hash_id"] + if optional_hash_id in peer.hashfield: + found[optional_hash_id].append(peer) + if task["peers"] and peer not in task["peers"]: + task["peers"].append(peer) + else: + task["peers"] = [peer] + + return found + + # Find peers for optional hash ids in local hash tables + def findOptionalHashIds(self, optional_hash_ids): + found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...} + + for peer in self.site.peers.values(): + if not peer.hashfield: + continue + for optional_hash_id in optional_hash_ids: + if optional_hash_id in peer.hashfield: + found[optional_hash_id].append(peer) + + return found + + # Add peers to tasks from found result + def addOptionalPeers(self, found_ips): + found = collections.defaultdict(list) + for hash_id, peer_ips in found_ips.iteritems(): + task = [task for task in self.tasks if task["optional_hash_id"] == hash_id] + if task: # Found task, lets take the first + task = task[0] + for peer_ip in peer_ips: + peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True) + if not peer: + continue + if task["peers"] is None: + task["peers"] = [] + if peer not in task["peers"]: + task["peers"].append(peer) + peer.hashfield.appendHashId(hash_id) # Peer has this file + found[hash_id].append(peer) + + return found + + # Start find peers for optional files + @util.Noparallel(blocking=False) + def startFindOptional(self): + time.sleep(0.01) # Wait for more file requests + optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] + optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) + self.log.debug("Finding peers for optional files: %s" % optional_hash_ids) + found = self.findOptionalTasks(optional_tasks) + + if found: + found_peers = set([peer for peers in found.values() for peer in peers]) + self.startWorkers(found_peers) + + if len(found) < len(optional_hash_ids): + self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found))) + + # Query hashfield from connected peers + threads = [] + peers = self.site.getConnectedPeers() + if not peers: + peers = self.site.getConnectablePeers() + for peer in peers: + if not peer.time_hashfield: + threads.append(gevent.spawn(peer.updateHashfield)) + gevent.joinall(threads, timeout=5) + + found = self.findOptionalTasks(optional_tasks) + + if found: + found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers]) + self.startWorkers(found_peers) + + if len(found) < len(optional_hash_ids): + self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found))) + + # Try to query connected peers + threads = [] + peers = self.site.getConnectedPeers() + if not peers: + peers = self.site.getConnectablePeers() + + for peer in peers: + threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids))) + + gevent.joinall(threads, timeout=5) + + found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value]) + found = self.addOptionalPeers(found_ips) + + if found: + found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers]) + self.startWorkers(found_peers) + + if len(found) < len(optional_hash_ids): + self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found))) + # Stop all worker def stopWorkers(self): for worker in self.workers.values(): @@ -181,16 +301,27 @@ class WorkerManager: peers = [peer] # Only download from this peer else: peers = None + file_info = self.site.content_manager.getFileInfo(inner_path) + if file_info and file_info["optional"]: + optional_hash_id = helper.toHashId(file_info["sha512"]) + else: + optional_hash_id = None task = { - "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, - "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority, "failed": [] + "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": optional_hash_id, + "time_added": time.time(), "time_started": None, "time_action": None, "peers": peers, "priority": priority, "failed": [] } + self.tasks.append(task) + self.started_task_num += 1 self.log.debug( - "New task: %s, peer lock: %s, priority: %s, tasks: %s" % - (task["inner_path"], peers, priority, self.started_task_num) + "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks: %s" % + (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num) ) + + if optional_hash_id: + self.startFindOptional() + self.startWorkers(peers) return evt diff --git a/src/util/helper.py b/src/util/helper.py index 8b46be77..ad9b9872 100644 --- a/src/util/helper.py +++ b/src/util/helper.py @@ -2,6 +2,7 @@ import os import socket import struct import re +import collections def atomicWrite(dest, content, mode="w"): @@ -44,3 +45,17 @@ def getDirname(path): # Return: data/site/content.json -> content.json def getFilename(path): return re.sub("^.*/", "", path) + + +# Convert hash to hashid for hashfield +def toHashId(hash): + return int(hash[0:4], 16) + + +# Merge dict values +def mergeDicts(dicts): + back = collections.defaultdict(set) + for d in dicts: + for key, val in d.iteritems(): + back[key].update(val) + return dict(back)