diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index edbfcbc0..6d6fa40b 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -160,62 +160,56 @@ class Peer(object): return None # Failed after 4 retry # Get a file content from peer - def getFile(self, site, inner_path, file_size=None): - # Use streamFile if client supports it - if config.stream_downloads and self.connection and self.connection.handshake and self.connection.handshake["rev"] > 310: - return self.streamFile(site, inner_path) + def getFile(self, site, inner_path, file_size=None, pos_from=0, pos_to=None, streaming=False): + if file_size and file_size > 5 * 1024 * 1024: + max_read_size = 1024 * 1024 + else: + max_read_size = 512 * 1024 + + if pos_to: + read_bytes = min(max_read_size, pos_to - pos_from) + else: + read_bytes = max_read_size + + location = pos_from - location = 0 if config.use_tempfiles: buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') else: buff = StringIO() s = time.time() - while True: # Read in 512k parts - res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location, "file_size": file_size}) + while True: # Read in smaller parts + if config.stream_downloads or read_bytes > 256 * 1024 or streaming: + res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location, "read_bytes": read_bytes, "file_size": file_size}, stream_to=buff) + if not res or "location" not in res: # Error + return False + else: + self.log("Send: %s" % inner_path) + res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location, "read_bytes": read_bytes, "file_size": file_size}) + if not res or "location" not in res: # Error + return False + self.log("Recv: %s" % inner_path) + buff.write(res["body"]) + res["body"] = None # Save memory - if not res or "body" not in res: # Error - return False - - buff.write(res["body"]) - res["body"] = None # Save memory - if res["location"] == res["size"]: # End of file + if res["location"] == res["size"] or res["location"] == pos_to: # End of file break else: location = res["location"] + if pos_to: + read_bytes = min(max_read_size, pos_to - location) - self.download_bytes += res["location"] + if pos_to: + recv = pos_to - pos_from + else: + recv = res["location"] + + self.download_bytes += recv self.download_time += (time.time() - s) if self.site: - self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"] - buff.seek(0) - return buff - - # Download file out of msgpack context to save memory and cpu - def streamFile(self, site, inner_path): - location = 0 - if config.use_tempfiles: - buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') - else: - buff = StringIO() - - s = time.time() - while True: # Read in 512k parts - res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff) - - if not res or "location" not in res: # Error - self.log("Invalid response: %s" % res) - return False - - if res["location"] == res["size"]: # End of file - break - else: - location = res["location"] - - self.download_bytes += res["location"] - self.download_time += (time.time() - s) - self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"] + self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + recv + self.log("Downloaded: %s, pos: %s, read_bytes: %s" % (inner_path, buff.tell(), read_bytes)) buff.seek(0) return buff