diff --git a/plugins/BigFile/BigfilePiecefield.py b/plugins/BigFile/BigfilePiecefield.py new file mode 100644 index 00000000..c7690279 --- /dev/null +++ b/plugins/BigFile/BigfilePiecefield.py @@ -0,0 +1,158 @@ +import array + + +def packPiecefield(data): + res = [] + if not data: + return array.array("H", "") + + if data[0] == "0": + res.append(0) + find = "1" + else: + find = "0" + last_pos = 0 + pos = 0 + while 1: + pos = data.find(find, pos) + if find == "0": + find = "1" + else: + find = "0" + if pos == -1: + res.append(len(data) - last_pos) + break + res.append(pos - last_pos) + last_pos = pos + return array.array("H", res) + + +def unpackPiecefield(data): + if not data: + return "" + + res = [] + char = "1" + for times in data: + if times > 10000: + return "" + res.append(char * times) + if char == "1": + char = "0" + else: + char = "1" + return "".join(res) + + +class BigfilePiecefield(object): + __slots__ = ["data"] + + def __init__(self): + self.data = "" + + def fromstring(self, s): + self.data = s + + def tostring(self): + return self.data + + def pack(self): + return packPiecefield(self.data).tostring() + + def unpack(self, s): + self.data = unpackPiecefield(array.array("H", s)) + + def __getitem__(self, key): + try: + return int(self.data[key]) + except IndexError: + return False + + def __setitem__(self, key, value): + data = self.data + if len(data) < key: + data = data.ljust(key+1, "0") + data = data[:key] + str(int(value)) + data[key + 1:] + self.data = data + + +class BigfilePiecefieldPacked(object): + __slots__ = ["data"] + + def __init__(self): + self.data = "" + + def fromstring(self, data): + self.data = packPiecefield(data).tostring() + + def tostring(self): + return unpackPiecefield(array.array("H", self.data)) + + def pack(self): + return array.array("H", self.data).tostring() + + def unpack(self, data): + self.data = data + + def __getitem__(self, key): + try: + return int(self.tostring()[key]) + except IndexError: + return False + + def __setitem__(self, key, value): + data = self.tostring() + if len(data) < key: + data = data.ljust(key+1, "0") + data = data[:key] + str(int(value)) + data[key + 1:] + self.fromstring(data) + + +if __name__ == "__main__": + import os + import psutil + import time + testdata = "1" * 100 + "0" * 900 + "1" * 4000 + "0" * 4999 + "1" + meminfo = psutil.Process(os.getpid()).memory_info + + for storage in [BigfilePiecefieldPacked, BigfilePiecefield]: + print "-- Testing storage: %s --" % storage + m = meminfo()[0] + s = time.time() + piecefields = {} + for i in range(10000): + piecefield = storage() + piecefield.fromstring(testdata[:i] + "0" + testdata[i + 1:]) + piecefields[i] = piecefield + + print "Create x10000: +%sKB in %.3fs (len: %s)" % ((meminfo()[0] - m) / 1024, time.time() - s, len(piecefields[0].data)) + + m = meminfo()[0] + s = time.time() + for piecefield in piecefields.values(): + val = piecefield[1000] + + print "Query one x10000: +%sKB in %.3fs" % ((meminfo()[0] - m) / 1024, time.time() - s) + + m = meminfo()[0] + s = time.time() + for piecefield in piecefields.values(): + piecefield[1000] = True + + print "Change one x10000: +%sKB in %.3fs" % ((meminfo()[0] - m) / 1024, time.time() - s) + + m = meminfo()[0] + s = time.time() + for piecefield in piecefields.values(): + packed = piecefield.pack() + + print "Pack x10000: +%sKB in %.3fs (len: %s)" % ((meminfo()[0] - m) / 1024, time.time() - s, len(packed)) + + m = meminfo()[0] + s = time.time() + for piecefield in piecefields.values(): + piecefield.unpack(packed) + + print "Unpack x10000: +%sKB in %.3fs (len: %s)" % ((meminfo()[0] - m) / 1024, time.time() - s, len(piecefields[0].data)) + + piecefields = {} diff --git a/plugins/BigFile/BigfilePlugin.py b/plugins/BigFile/BigfilePlugin.py new file mode 100644 index 00000000..94e1ec7d --- /dev/null +++ b/plugins/BigFile/BigfilePlugin.py @@ -0,0 +1,678 @@ +import time +import os +import subprocess +import shutil +import collections +import gevent +import math + +import msgpack + +from Plugin import PluginManager +from Crypt import CryptHash +from lib import merkletools +from util import helper +import util +from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked + + +# We can only import plugin host clases after the plugins are loaded +@PluginManager.afterLoad +def importPluginnedClasses(): + global VerifyError, config + from Content.ContentManager import VerifyError + from Config import config + +if "upload_nonces" not in locals(): + upload_nonces = {} + + +@PluginManager.registerTo("UiRequest") +class UiRequestPlugin(object): + def isCorsAllowed(self, path): + if path == "/ZeroNet-Internal/BigfileUpload": + return True + else: + return super(UiRequestPlugin, self).isCorsAllowed(path) + + def actionBigfileUpload(self): + nonce = self.get.get("upload_nonce") + if nonce not in upload_nonces: + return self.error403("Upload nonce error.") + + upload_info = upload_nonces[nonce] + del upload_nonces[nonce] + + self.sendHeader(200, "text/html", noscript=True, extra_headers=[ + ("Access-Control-Allow-Origin", "null"), + ("Access-Control-Allow-Credentials", "true") + ]) + + self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers + + site = upload_info["site"] + inner_path = upload_info["inner_path"] + + with site.storage.open(inner_path, "wb", create_dirs=True) as out_file: + merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile( + self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file + ) + + if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split + hash = piecemap_info["sha512_pieces"][0].encode("hex") + site.content_manager.optionalDownloaded(inner_path, hash, upload_info["size"], own=True) + + else: # Big file + file_name = helper.getFilename(inner_path) + msgpack.pack({file_name: piecemap_info}, site.storage.open(upload_info["piecemap"], "wb")) + + # Find piecemap and file relative path to content.json + file_info = site.content_manager.getFileInfo(inner_path) + content_inner_path_dir = helper.getDirname(file_info["content_inner_path"]) + piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):] + file_relative_path = inner_path[len(content_inner_path_dir):] + + # Add file to content.json + if site.storage.isFile(file_info["content_inner_path"]): + content = site.storage.loadJson(file_info["content_inner_path"]) + else: + content = {} + if "files_optional" not in content: + content["files_optional"] = {} + + content["files_optional"][file_relative_path] = { + "sha512": merkle_root, + "size": upload_info["size"], + "piecemap": piecemap_relative_path, + "piece_size": piece_size + } + + site.content_manager.optionalDownloaded(inner_path, merkle_root, upload_info["size"], own=True) + site.storage.writeJson(file_info["content_inner_path"], content) + + return { + "merkle_root": merkle_root, + "piece_num": len(piecemap_info["sha512_pieces"]), + "piece_size": piece_size, + "inner_path": inner_path + } + + def readMultipartHeaders(self, wsgi_input): + for i in range(100): + line = wsgi_input.readline() + if line == "\r\n": + break + return i + + def actionFile(self, file_path, *args, **kwargs): + if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB + path_parts = kwargs["path_parts"] + site = self.server.site_manager.get(path_parts["address"]) + kwargs["file_obj"] = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024) + + return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs) + + +@PluginManager.registerTo("UiWebsocket") +class UiWebsocketPlugin(object): + def actionBigfileUploadInit(self, to, inner_path, size): + valid_signers = self.site.content_manager.getValidSigners(inner_path) + auth_address = self.user.getAuthAddress(self.site.address) + if not self.site.settings["own"] and auth_address not in valid_signers: + self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers)) + return self.response(to, {"error": "Forbidden, you can only modify your own files"}) + + nonce = CryptHash.random() + piece_size = 1024 * 1024 + inner_path = self.site.content_manager.sanitizePath(inner_path) + file_info = self.site.content_manager.getFileInfo(inner_path) + + content_inner_path_dir = helper.getDirname(file_info["content_inner_path"]) + file_relative_path = inner_path[len(content_inner_path_dir):] + + upload_nonces[nonce] = { + "added": time.time(), + "site": self.site, + "inner_path": inner_path, + "websocket_client": self, + "size": size, + "piece_size": piece_size, + "piecemap": inner_path + ".piecemap.msgpack" + } + self.response(to, { + "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce, + "pice_size": piece_size, + "inner_path": inner_path, + "file_relative_path": file_relative_path + }) + + +@PluginManager.registerTo("ContentManager") +class ContentManagerPlugin(object): + def getFileInfo(self, inner_path): + if "|" not in inner_path: + return super(ContentManagerPlugin, self).getFileInfo(inner_path) + + inner_path, file_range = inner_path.split("|") + pos_from, pos_to = map(int, file_range.split("-")) + file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path) + return file_info + + def readFile(self, file_in, size, buff_size=1024 * 64): + part_num = 0 + recv_left = size + + while 1: + part_num += 1 + read_size = min(buff_size, recv_left) + part = file_in.read(read_size) + + if not part: + break + yield part + + if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload + time.sleep(0.001) + + recv_left -= read_size + if recv_left <= 0: + break + + def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None): + self.site.settings["has_bigfile"] = True + + recv = 0 + try: + piece_hash = CryptHash.sha512t() + piece_hashes = [] + piece_recv = 0 + + mt = merkletools.MerkleTools() + mt.hash_function = CryptHash.sha512t + + part = "" + for part in self.readFile(file_in, size): + if file_out: + file_out.write(part) + + recv += len(part) + piece_recv += len(part) + piece_hash.update(part) + if piece_recv >= piece_size: + piece_digest = piece_hash.digest() + piece_hashes.append(piece_digest) + mt.leaves.append(piece_digest) + piece_hash = CryptHash.sha512t() + piece_recv = 0 + + if len(piece_hashes) % 100 == 0 or recv == size: + self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % ( + float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024 + )) + part = "" + if len(part) > 0: + piece_digest = piece_hash.digest() + piece_hashes.append(piece_digest) + mt.leaves.append(piece_digest) + except Exception as err: + raise err + finally: + if file_out: + file_out.close() + + mt.make_tree() + return mt.get_merkle_root(), piece_size, { + "sha512_pieces": piece_hashes + } + + def hashFile(self, dir_inner_path, file_relative_path, optional=False): + inner_path = dir_inner_path + file_relative_path + + file_size = self.site.storage.getSize(inner_path) + # Only care about optional files >1MB + if not optional or file_size < 1 * 1024 * 1024: + return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional) + + back = {} + content = self.contents.get(dir_inner_path + "content.json") + + hash = None + piecemap_relative_path = None + piece_size = None + + # Don't re-hash if it's already in content.json + if content and file_relative_path in content.get("files_optional"): + file_node = content["files_optional"][file_relative_path] + if file_node["size"] == file_size: + self.log.info("- [SAME SIZE] %s" % file_relative_path) + hash = file_node.get("sha512") + piecemap_relative_path = file_node.get("piecemap") + piece_size = file_node.get("piece_size") + + if not hash or not piecemap_relative_path: # Not in content.json yet + if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB + return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional) + + self.log.info("- [HASHING] %s" % file_relative_path) + merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size) + if not hash: + hash = merkle_root + + if not piecemap_relative_path: + file_name = helper.getFilename(file_relative_path) + piecemap_relative_path = file_relative_path + ".piecemap.msgpack" + piecemap_inner_path = inner_path + ".piecemap.msgpack" + + msgpack.pack({file_name: piecemap_info}, self.site.storage.open(piecemap_inner_path, "wb")) + back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, True)) + + piece_num = int(math.ceil(float(file_size) / piece_size)) + + # Add the merkle root to hashfield + self.optionalDownloaded(inner_path, hash, file_size, own=True) + self.site.storage.piecefields[hash].fromstring("1" * piece_num) + + back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size} + return back + + def getPiecemap(self, inner_path): + file_info = self.site.content_manager.getFileInfo(inner_path) + piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"] + self.site.needFile(piecemap_inner_path, priority=20) + piecemap = msgpack.unpack(self.site.storage.open(piecemap_inner_path))[helper.getFilename(inner_path)] + piecemap["piece_size"] = file_info["piece_size"] + return piecemap + + def verifyPiece(self, inner_path, pos, piece): + piecemap = self.getPiecemap(inner_path) + piece_i = pos / piecemap["piece_size"] + if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]: + raise VerifyError("Invalid hash") + return True + + def verifyFile(self, inner_path, file, ignore_same=True): + if "|" not in inner_path: + return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same) + + inner_path, file_range = inner_path.split("|") + pos_from, pos_to = map(int, file_range.split("-")) + + return self.verifyPiece(inner_path, pos_from, file) + + def optionalDownloaded(self, inner_path, hash, size=None, own=False): + if "|" in inner_path: + inner_path, file_range = inner_path.split("|") + pos_from, pos_to = map(int, file_range.split("-")) + file_info = self.getFileInfo(inner_path) + + # Mark piece downloaded + piece_i = pos_from / file_info["piece_size"] + self.site.storage.piecefields[file_info["sha512"]][piece_i] = True + + # Only add to site size on first request + if hash in self.hashfield: + size = 0 + + return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash, size, own) + + def optionalRemove(self, inner_path, hash, size=None): + if size and size > 1024 * 1024: + file_info = self.getFileInfo(inner_path) + sha512 = file_info["sha512"] + if sha512 in self.site.storage.piecefields: + del self.site.storage.piecefields[sha512] + return super(ContentManagerPlugin, self).optionalRemove(inner_path, hash, size) + + +@PluginManager.registerTo("SiteStorage") +class SiteStoragePlugin(object): + def __init__(self, *args, **kwargs): + super(SiteStoragePlugin, self).__init__(*args, **kwargs) + self.piecefields = collections.defaultdict(BigfilePiecefield) + if "piecefields" in self.site.settings.get("cache", {}): + for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").iteritems(): + self.piecefields[sha512].unpack(piecefield_packed.decode("base64")) + self.site.settings["cache"]["piecefields"] = {} + + def createSparseFile(self, inner_path, size, sha512=None): + file_path = self.getPath(inner_path) + f = open(file_path, 'wb') + f.truncate(size) + f.close() + if os.name == "nt": + subprocess.call(["fsutil", "sparse", "setflag", file_path]) + + if sha512 and sha512 in self.piecefields: + self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path) + del self.piecefields[sha512] + + def write(self, inner_path, content): + if "|" not in inner_path: + return super(SiteStoragePlugin, self).write(inner_path, content) + + # Write to specific position by passing |{pos} after the filename + inner_path, file_range = inner_path.split("|") + pos_from, pos_to = map(int, file_range.split("-")) + file_path = self.getPath(inner_path) + + # Create dir if not exist + file_dir = os.path.dirname(file_path) + if not os.path.isdir(file_dir): + os.makedirs(file_dir) + + if not os.path.isfile(file_path): + file_info = self.site.content_manager.getFileInfo(inner_path) + self.createSparseFile(inner_path, file_info["size"]) + + # Write file + with open(file_path, "rb+") as file: + file.seek(pos_from) + if hasattr(content, 'read'): # File-like object + shutil.copyfileobj(content, file) # Write buff to disk + else: # Simple string + file.write(content) + del content + self.onUpdated(inner_path) + + def openBigfile(self, inner_path, prebuffer=0): + file_info = self.site.content_manager.getFileInfo(inner_path) + if "piecemap" not in file_info: # It's not a big file + return False + + self.site.needFile(inner_path, blocking=False) # Download piecemap + file_path = self.getPath(inner_path) + sha512 = file_info["sha512"] + piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"])) + if os.path.isfile(file_path): + if sha512 not in self.piecefields: + if open(file_path).read(128) == "\0" * 128: + piece_data = "0" + else: + piece_data = "1" + self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data)) + self.piecefields[sha512].fromstring(piece_data * piece_num) + else: + self.log.debug("Creating bigfile: %s" % inner_path) + self.createSparseFile(inner_path, file_info["size"], sha512) + self.piecefields[sha512].fromstring(piece_data * "0") + return BigFile(self.site, inner_path, prebuffer=prebuffer) + + +class BigFile(object): + def __init__(self, site, inner_path, prebuffer=0): + self.site = site + self.inner_path = inner_path + file_path = site.storage.getPath(inner_path) + file_info = self.site.content_manager.getFileInfo(inner_path) + self.piece_size = file_info["piece_size"] + self.sha512 = file_info["sha512"] + self.size = file_info["size"] + self.prebuffer = prebuffer + self.read_bytes = 0 + + self.piecefield = self.site.storage.piecefields[self.sha512] + self.f = open(file_path, "rb+") + + def read(self, buff=64 * 1024): + pos = self.f.tell() + read_until = pos + buff + requests = [] + # Request all required blocks + while 1: + piece_i = pos / self.piece_size + if piece_i * self.piece_size >= read_until: + break + pos_from = piece_i * self.piece_size + pos_to = pos_from + self.piece_size + if not self.piecefield[piece_i]: + requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10)) + pos += self.piece_size + + if not all(requests): + return None + + # Request prebuffer + if self.prebuffer: + prebuffer_until = min(self.size, read_until + self.prebuffer) + priority = 3 + while 1: + piece_i = pos / self.piece_size + if piece_i * self.piece_size >= prebuffer_until: + break + pos_from = piece_i * self.piece_size + pos_to = pos_from + self.piece_size + if not self.piecefield[piece_i]: + self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority)) + priority -= 1 + pos += self.piece_size + + gevent.joinall(requests) + self.read_bytes += buff + + # Increase buffer for long reads + if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024: + self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path) + self.prebuffer = 5 * 1024 * 1024 + + return self.f.read(buff) + + def seek(self, pos): + return self.f.seek(pos) + + def tell(self): + self.f.tell() + + def close(self): + self.f.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +@PluginManager.registerTo("WorkerManager") +class WorkerManagerPlugin(object): + def addTask(self, inner_path, *args, **kwargs): + file_info = kwargs.get("file_info") + if file_info and "piecemap" in file_info: # Bigfile + self.site.settings["has_bigfile"] = True + + piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"] + piecemap_task = None + if not self.site.storage.isFile(piecemap_inner_path): + # Start download piecemap + piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30) + if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= config.autodownload_bigfile_size_limit: + gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces + + if "|" in inner_path: + # Start download piece + task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs) + + inner_path, file_range = inner_path.split("|") + pos_from, pos_to = map(int, file_range.split("-")) + task["piece_i"] = pos_from / file_info["piece_size"] + task["sha512"] = file_info["sha512"] + else: + if inner_path in self.site.bad_files: + del self.site.bad_files[inner_path] + if piecemap_task: + task = piecemap_task + else: + fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified + fake_evt.set(True) + task = {"evt": fake_evt} + + if not self.site.storage.isFile(inner_path): + self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"]) + piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"])) + self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num) + else: + task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs) + return task + + def taskAddPeer(self, task, peer): + if "piece_i" in task: + if not peer.piecefields[task["sha512"]][task["piece_i"]]: + if task["sha512"] not in peer.piecefields: + gevent.spawn(peer.updatePiecefields, force=True) + elif not task["peers"]: + gevent.spawn(peer.updatePiecefields) + + return False # Deny to add peers to task if file not in piecefield + return super(WorkerManagerPlugin, self).taskAddPeer(task, peer) + + +@PluginManager.registerTo("FileRequest") +class FileRequestPlugin(object): + def isReadable(self, site, inner_path, file, pos): + # Peek into file + if file.read(10) == "\0" * 10: + # Looks empty, but makes sures we don't have that piece + file_info = site.content_manager.getFileInfo(inner_path) + piece_i = pos / file_info["piece_size"] + if not site.storage.piecefields[file_info["sha512"]][piece_i]: + return False + # Seek back to position we want to read + file.seek(pos) + return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos) + + def actionGetPiecefields(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + return False + + # Add peer to site if not added before + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) + if not peer.connection: # Just added + peer.connect(self.connection) # Assign current connection to peer + + piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.iteritems()} + self.response({"piecefields_packed": piecefields_packed}) + + def actionSetPiecefields(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + self.connection.badAction(5) + return False + + # Add or get peer + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection) + if not peer.connection: + peer.connect(self.connection) + + peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked) + for sha512, piecefield_packed in params["piecefields_packed"].iteritems(): + peer.piecefields[sha512].unpack(piecefield_packed) + site.settings["has_bigfile"] = True + + self.response({"ok": "Updated"}) + + +@PluginManager.registerTo("Peer") +class PeerPlugin(object): + def __getattr__(self, key): + if key == "piecefields": + self.piecefields = collections.defaultdict(BigfilePiecefieldPacked) + return self.piecefields + elif key == "time_piecefields_updated": + self.time_piecefields_updated = None + return self.time_piecefields_updated + else: + return super(PeerPlugin, self).__getattr__(key) + + @util.Noparallel(ignore_args=True) + def updatePiecefields(self, force=False): + if self.connection and self.connection.handshake.get("rev", 0) < 2190: + return False # Not supported + + # Don't update piecefield again in 1 min + if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force: + return False + + self.time_piecefields_updated = time.time() + res = self.request("getPiecefields", {"site": self.site.address}) + if not res or "error" in res: + return False + + self.piecefields = collections.defaultdict(BigfilePiecefieldPacked) + for sha512, piecefield_packed in res["piecefields_packed"].iteritems(): + self.piecefields[sha512].unpack(piecefield_packed) + + return self.piecefields + + def sendMyHashfield(self, *args, **kwargs): + return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs) + + def updateHashfield(self, *args, **kwargs): + if self.site.settings.get("has_bigfile"): + thread = gevent.spawn(self.updatePiecefields, *args, **kwargs) + back = super(PeerPlugin, self).updateHashfield(*args, **kwargs) + thread.join() + return back + else: + return super(PeerPlugin, self).updateHashfield(*args, **kwargs) + + def getFile(self, site, inner_path, *args, **kwargs): + if "|" in inner_path: + inner_path, file_range = inner_path.split("|") + pos_from, pos_to = map(int, file_range.split("-")) + kwargs["pos_from"] = pos_from + kwargs["pos_to"] = pos_to + return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs) + + +@PluginManager.registerTo("Site") +class SitePlugin(object): + def isFileDownloadAllowed(self, inner_path, file_info): + if "piecemap" in file_info: + file_info = file_info.copy() + file_info["size"] = file_info["piece_size"] + return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info) + + def getSettingsCache(self): + back = super(SitePlugin, self).getSettingsCache() + if self.storage.piecefields: + back["piecefields"] = {sha512: piecefield.pack().encode("base64") for sha512, piecefield in self.storage.piecefields.iteritems()} + return back + + def needFile(self, inner_path, *args, **kwargs): + if inner_path.endswith("|all"): + @util.Pooled(20) + def pooledNeedBigfile(*args, **kwargs): + return self.needFile(*args, **kwargs) + + inner_path = inner_path.replace("|all", "") + file_info = self.needFileInfo(inner_path) + file_size = file_info["size"] + piece_size = file_info["piece_size"] + + piece_num = int(math.ceil(float(file_size) / piece_size)) + + file_threads = [] + + piecefield = self.storage.piecefields.get(file_info["sha512"]) + + for piece_i in range(piece_num): + piece_from = piece_i * piece_size + piece_to = min(file_size, piece_from + piece_size) + if not piecefield or not piecefield[piece_i]: + res = pooledNeedBigfile("%s|%s-%s" % (inner_path, piece_from, piece_to), blocking=False) + if res is not True and res is not False: + file_threads.append(res) + gevent.joinall(file_threads) + else: + return super(SitePlugin, self).needFile(inner_path, *args, **kwargs) + + +@PluginManager.registerTo("ConfigPlugin") +class ConfigPlugin(object): + def createArguments(self): + group = self.parser.add_argument_group("Bigfile plugin") + group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles until this limit if help distribute option is checked', default=1, metavar="MB", type=int) + + return super(ConfigPlugin, self).createArguments() diff --git a/plugins/BigFile/Test/TestBigfile.py b/plugins/BigFile/Test/TestBigfile.py new file mode 100644 index 00000000..a934a67c --- /dev/null +++ b/plugins/BigFile/Test/TestBigfile.py @@ -0,0 +1,467 @@ +import time +import os +from cStringIO import StringIO + +import pytest +import msgpack +import mock +from lib import merkletools + +from Connection import ConnectionServer +from Content.ContentManager import VerifyError +from File import FileServer +from File import FileRequest +from Worker import WorkerManager +from Peer import Peer +from Bigfile import BigfilePiecefield, BigfilePiecefieldPacked +from Test import Spy + + +@pytest.mark.usefixtures("resetSettings") +@pytest.mark.usefixtures("resetTempSettings") +class TestBigfile: + privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv" + + def createBigfile(self, site, inner_path="data/optional.any.iso", pieces=10): + f = site.storage.open(inner_path, "w") + for i in range(pieces * 100): + f.write(("Test%s" % i).ljust(10, "-") * 1000) + f.close() + assert site.content_manager.sign("content.json", self.privatekey) + return inner_path + + def testPiecemapCreate(self, site): + inner_path = self.createBigfile(site) + content = site.storage.loadJson("content.json") + assert "data/optional.any.iso" in content["files_optional"] + file_node = content["files_optional"][inner_path] + assert file_node["size"] == 10 * 1000 * 1000 + assert file_node["sha512"] == "47a72cde3be80b4a829e7674f72b7c6878cf6a70b0c58c6aa6c17d7e9948daf6" + assert file_node["piecemap"] == inner_path + ".piecemap.msgpack" + + piecemap = msgpack.unpack(site.storage.open(file_node["piecemap"], "rb"))["optional.any.iso"] + assert len(piecemap["sha512_pieces"]) == 10 + assert piecemap["sha512_pieces"][0] != piecemap["sha512_pieces"][1] + assert piecemap["sha512_pieces"][0].encode("hex") == "a73abad9992b3d0b672d0c2a292046695d31bebdcb1e150c8410bbe7c972eff3" + + def testVerifyPiece(self, site): + inner_path = self.createBigfile(site) + + # Verify all 10 piece + f = site.storage.open(inner_path, "rb") + for i in range(10): + piece = StringIO(f.read(1024 * 1024)) + piece.seek(0) + site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece) + f.close() + + # Try to verify piece 0 with piece 1 hash + with pytest.raises(VerifyError) as err: + i = 1 + f = site.storage.open(inner_path, "rb") + piece = StringIO(f.read(1024 * 1024)) + f.close() + site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece) + assert "Invalid hash" in str(err) + + def testSparseFile(self, site): + inner_path = "sparsefile" + + # Create a 100MB sparse file + site.storage.createSparseFile(inner_path, 100 * 1024 * 1024) + + # Write to file beginning + s = time.time() + f = site.storage.write("%s|%s-%s" % (inner_path, 0, 1024 * 1024), "hellostart" * 1024) + time_write_start = time.time() - s + + # Write to file end + s = time.time() + f = site.storage.write("%s|%s-%s" % (inner_path, 99 * 1024 * 1024, 99 * 1024 * 1024 + 1024 * 1024), "helloend" * 1024) + time_write_end = time.time() - s + + # Verify writes + f = site.storage.open(inner_path) + assert f.read(10) == "hellostart" + f.seek(99 * 1024 * 1024) + assert f.read(8) == "helloend" + f.close() + + site.storage.delete(inner_path) + + # Writing to end shold not take much longer, than writing to start + assert time_write_end <= max(0.1, time_write_start * 1.1) + + def testRangedFileRequest(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + file_server.sites[site.address] = site + client = FileServer("127.0.0.1", 1545) + client.sites[site_temp.address] = site_temp + site_temp.connection_server = client + connection = client.getConnection("127.0.0.1", 1544) + + # Add file_server as peer to client + peer_file_server = site_temp.addPeer("127.0.0.1", 1544) + + buff = peer_file_server.getFile(site_temp.address, "%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024)) + + assert len(buff.getvalue()) == 1 * 1024 * 1024 # Correct block size + assert buff.getvalue().startswith("Test524") # Correct data + buff.seek(0) + assert site.content_manager.verifyPiece(inner_path, 5 * 1024 * 1024, buff) # Correct hash + + connection.close() + client.stop() + + def testRangedFileDownload(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Make sure the file and the piecemap in the optional hashfield + file_info = site.content_manager.getFileInfo(inner_path) + assert site.content_manager.hashfield.hasHash(file_info["sha512"]) + + piecemap_hash = site.content_manager.getFileInfo(file_info["piecemap"])["sha512"] + assert site.content_manager.hashfield.hasHash(piecemap_hash) + + # Init client server + client = ConnectionServer("127.0.0.1", 1545) + site_temp.connection_server = client + peer_client = site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + bad_files = site_temp.storage.verifyFiles(quick_check=True) + assert not bad_files + + # client_piecefield = peer_client.piecefields[file_info["sha512"]].tostring() + # assert client_piecefield == "1" * 10 + + # Download 5. and 10. block + + site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024)) + site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024)) + + # Verify 0. block not downloaded + f = site_temp.storage.open(inner_path) + assert f.read(10) == "\0" * 10 + # Verify 5. and 10. block downloaded + f.seek(5 * 1024 * 1024) + assert f.read(7) == "Test524" + f.seek(9 * 1024 * 1024) + assert f.read(7) == "943---T" + + # Verify hashfield + assert set(site_temp.content_manager.hashfield) == set([18343, 30970]) # 18343: data/optional.any.iso, 30970: data/optional.any.iso.hashmap.msgpack + + def testOpenBigfile(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + client = ConnectionServer("127.0.0.1", 1545) + site_temp.connection_server = client + site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + # Open virtual file + assert not site_temp.storage.isFile(inner_path) + + with site_temp.storage.openBigfile(inner_path) as f: + with Spy.Spy(FileRequest, "route") as requests: + f.seek(5 * 1024 * 1024) + assert f.read(7) == "Test524" + + f.seek(9 * 1024 * 1024) + assert f.read(7) == "943---T" + + assert len(requests) == 4 # 1x peicemap + 1x getpiecefield + 2x for pieces + + assert set(site_temp.content_manager.hashfield) == set([18343, 30970]) + + assert site_temp.storage.piecefields[f.sha512].tostring() == "0000010001" + assert f.sha512 in site_temp.getSettingsCache()["piecefields"] + + # Test requesting already downloaded + with Spy.Spy(FileRequest, "route") as requests: + f.seek(5 * 1024 * 1024) + assert f.read(7) == "Test524" + + assert len(requests) == 0 + + # Test requesting multi-block overflow reads + with Spy.Spy(FileRequest, "route") as requests: + f.seek(5 * 1024 * 1024) # We already have this block + data = f.read(1024 * 1024 * 3) # Our read overflow to 6. and 7. block + assert data.startswith("Test524") + assert data.endswith("Test838-") + assert "\0" not in data # No null bytes allowed + + assert len(requests) == 2 # Two block download + + + @pytest.mark.parametrize("piecefield_obj", [BigfilePiecefield, BigfilePiecefieldPacked]) + def testPiecefield(self, piecefield_obj, site): + testdatas = [ + "1" * 100 + "0" * 900 + "1" * 4000 + "0" * 4999 + "1", + "010101" * 10 + "01" * 90 + "10" * 400 + "0" * 4999, + "1" * 10000, + "0" * 10000 + ] + for testdata in testdatas: + piecefield = piecefield_obj() + + piecefield.fromstring(testdata) + assert piecefield.tostring() == testdata + assert piecefield[0] == int(testdata[0]) + assert piecefield[100] == int(testdata[100]) + assert piecefield[1000] == int(testdata[1000]) + assert piecefield[len(testdata)-1] == int(testdata[len(testdata)-1]) + + packed = piecefield.pack() + piecefield_new = piecefield_obj() + piecefield_new.unpack(packed) + assert piecefield.tostring() == piecefield_new.tostring() + assert piecefield_new.tostring() == testdata + + + def testFileGet(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + site_temp.connection_server = FileServer("127.0.0.1", 1545) + site_temp.connection_server.sites[site_temp.address] = site_temp + site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + # Download second block + with site_temp.storage.openBigfile(inner_path) as f: + f.seek(1024 * 1024) + assert f.read(1024)[0] != "\0" + + # Make sure first block not download + with site_temp.storage.open(inner_path) as f: + assert f.read(1024)[0] == "\0" + + peer2 = site.addPeer("127.0.0.1", 1545, return_peer=True) + + # Should drop error on first block request + assert not peer2.getFile(site.address, "%s|0-%s" % (inner_path, 1024 * 1024 * 1)) + + # Should not drop error for second block request + assert peer2.getFile(site.address, "%s|%s-%s" % (inner_path, 1024 * 1024 * 1, 1024 * 1024 * 2)) + + + def benchmarkPeerMemory(self, site, file_server): + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + import psutil, os + meminfo = psutil.Process(os.getpid()).memory_info + + mem_s = meminfo()[0] + s = time.time() + for i in range(25000): + site.addPeer("127.0.0.1", i) + print "%.3fs MEM: + %sKB" % (time.time() - s, (meminfo()[0] - mem_s) / 1024) # 0.082s MEM: + 6800KB + print site.peers.values()[0].piecefields + + + def testUpdatePiecefield(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + server1 = file_server + server1.sites[site.address] = site + server2 = FileServer("127.0.0.1", 1545) + server2.sites[site_temp.address] = site_temp + site_temp.connection_server = server2 + + # Add file_server as peer to client + server2_peer1 = site_temp.addPeer("127.0.0.1", 1544) + + # Testing piecefield sync + assert len(server2_peer1.piecefields) == 0 + assert server2_peer1.updatePiecefields() # Query piecefields from peer + assert len(server2_peer1.piecefields) > 0 + + def testWorkerManagerPiecefieldDeny(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + server1 = file_server + server1.sites[site.address] = site + server2 = FileServer("127.0.0.1", 1545) + server2.sites[site_temp.address] = site_temp + site_temp.connection_server = server2 + + # Add file_server as peer to client + server2_peer1 = site_temp.addPeer("127.0.0.1", 1544) # Working + + site_temp.downloadContent("content.json", download_files=False) + site_temp.needFile("data/optional.any.iso.piecemap.msgpack") + + # Add fake peers with optional files downloaded + for i in range(5): + fake_peer = site_temp.addPeer("127.0.1.%s" % i, 1544) + fake_peer.hashfield = site.content_manager.hashfield + fake_peer.has_hashfield = True + + with Spy.Spy(WorkerManager, "addWorker") as requests: + site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024)) + site_temp.needFile("%s|%s-%s" % (inner_path, 6 * 1024 * 1024, 7 * 1024 * 1024)) + + # It should only request parts from peer1 as the other peers does not have the requested parts in piecefields + assert len([request[1] for request in requests if request[1] != server2_peer1]) == 0 + + + def testWorkerManagerPiecefieldDownload(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + server1 = file_server + server1.sites[site.address] = site + server2 = FileServer("127.0.0.1", 1545) + server2.sites[site_temp.address] = site_temp + site_temp.connection_server = server2 + sha512 = site.content_manager.getFileInfo(inner_path)["sha512"] + + # Create 10 fake peer for each piece + for i in range(10): + peer = Peer("127.0.0.1", 1544, site_temp, server2) + peer.piecefields[sha512][i] = "1" + peer.updateHashfield = mock.MagicMock(return_value=False) + peer.updatePiecefields = mock.MagicMock(return_value=False) + peer.findHashIds = mock.MagicMock(return_value={"nope": []}) + peer.hashfield = site.content_manager.hashfield + peer.has_hashfield = True + peer.key = "Peer:%s" % i + site_temp.peers["Peer:%s" % i] = peer + + site_temp.downloadContent("content.json", download_files=False) + site_temp.needFile("data/optional.any.iso.piecemap.msgpack") + + with Spy.Spy(Peer, "getFile") as requests: + for i in range(10): + site_temp.needFile("%s|%s-%s" % (inner_path, i * 1024 * 1024, (i + 1) * 1024 * 1024)) + + assert len(requests) == 10 + for i in range(10): + assert requests[i][0] == site_temp.peers["Peer:%s" % i] # Every part should be requested from piece owner peer + + def testDownloadStats(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + client = ConnectionServer("127.0.0.1", 1545) + site_temp.connection_server = client + site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + # Open virtual file + assert not site_temp.storage.isFile(inner_path) + + # Check size before downloads + assert site_temp.settings["size"] < 10 * 1024 * 1024 + assert site_temp.settings["optional_downloaded"] == 0 + size_piecemap = site_temp.content_manager.getFileInfo(inner_path + ".piecemap.msgpack")["size"] + size_bigfile = site_temp.content_manager.getFileInfo(inner_path)["size"] + + with site_temp.storage.openBigfile(inner_path) as f: + assert not "\0" in f.read(1024) + assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile + + with site_temp.storage.openBigfile(inner_path) as f: + # Don't count twice + assert not "\0" in f.read(1024) + assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile + + # Add second block + assert not "\0" in f.read(1024 * 1024) + assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile + + + def testPrebuffer(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + client = ConnectionServer("127.0.0.1", 1545) + site_temp.connection_server = client + site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + # Open virtual file + assert not site_temp.storage.isFile(inner_path) + + with site_temp.storage.openBigfile(inner_path, prebuffer=1024 * 1024 * 2) as f: + with Spy.Spy(FileRequest, "route") as requests: + f.seek(5 * 1024 * 1024) + assert f.read(7) == "Test524" + # assert len(requests) == 3 # 1x piecemap + 1x getpiecefield + 1x for pieces + assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 2 + + time.sleep(0.5) # Wait prebuffer download + + sha512 = site.content_manager.getFileInfo(inner_path)["sha512"] + assert site_temp.storage.piecefields[sha512].tostring() == "0000011100" + + # No prebuffer beyond end of the file + f.seek(9 * 1024 * 1024) + assert "\0" not in f.read(7) + + assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 0 + + def testDownloadAllPieces(self, file_server, site, site_temp): + inner_path = self.createBigfile(site) + + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + client = ConnectionServer("127.0.0.1", 1545) + site_temp.connection_server = client + site_temp.addPeer("127.0.0.1", 1544) + + # Download site + site_temp.download(blind_includes=True).join(timeout=5) + + # Open virtual file + assert not site_temp.storage.isFile(inner_path) + + with Spy.Spy(FileRequest, "route") as requests: + site_temp.needFile("%s|all" % inner_path) + + assert len(requests) == 12 # piecemap.msgpack, getPiecefields, 10 x piece + + # Don't re-download already got pieces + with Spy.Spy(FileRequest, "route") as requests: + site_temp.needFile("%s|all" % inner_path) + + assert len(requests) == 0 diff --git a/plugins/BigFile/Test/conftest.py b/plugins/BigFile/Test/conftest.py new file mode 100644 index 00000000..8c1df5b2 --- /dev/null +++ b/plugins/BigFile/Test/conftest.py @@ -0,0 +1 @@ +from src.Test.conftest import * \ No newline at end of file diff --git a/plugins/BigFile/Test/pytest.ini b/plugins/BigFile/Test/pytest.ini new file mode 100644 index 00000000..d09210d1 --- /dev/null +++ b/plugins/BigFile/Test/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +python_files = Test*.py +addopts = -rsxX -v --durations=6 +markers = + webtest: mark a test as a webtest. \ No newline at end of file diff --git a/plugins/BigFile/__init__.py b/plugins/BigFile/__init__.py new file mode 100644 index 00000000..005d6661 --- /dev/null +++ b/plugins/BigFile/__init__.py @@ -0,0 +1,2 @@ +import BigfilePlugin +from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked \ No newline at end of file