From 4222c31b3e515538fdd48274f8041e0ffea21ba5 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Thu, 30 May 2019 04:26:41 +0200 Subject: [PATCH] Don't push body of content.json with updates if larger than 10kb --- src/File/FileRequest.py | 56 +++++++++++++++++++++++------------- src/Peer/Peer.py | 18 ++++++++++-- src/Site/Site.py | 7 +---- src/Test/TestSiteDownload.py | 51 +++++++++++++++++++++++++++++++- 4 files changed, 103 insertions(+), 29 deletions(-) diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 8fdfaf43..d90b56c9 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -4,7 +4,6 @@ import time import json import collections import itertools -import socket # Third party modules import gevent @@ -110,36 +109,54 @@ class FileRequest(object): return False inner_path = params.get("inner_path", "") + current_content_modified = site.content_manager.contents.get(inner_path, {}).get("modified", 0) + body = params["body"] if not inner_path.endswith("content.json"): self.response({"error": "Only content.json update allowed"}) self.connection.badAction(5) return - try: - content = json.loads(params["body"].decode()) - except Exception as err: - self.log.debug("Update for %s is invalid JSON: %s" % (inner_path, err)) - self.response({"error": "File invalid JSON"}) - self.connection.badAction(5) - return - - file_uri = "%s/%s:%s" % (site.address, inner_path, content["modified"]) - - if self.server.files_parsing.get(file_uri): # Check if we already working on it - valid = None # Same file - else: + should_validate_content = True + if "modified" in params and params["modified"] <= current_content_modified: + should_validate_content = False + valid = None # Same or earlier content as we have + elif not body: # No body sent, we have to download it first + self.log.debug("Missing body from update, downloading...") + peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer try: - valid = site.content_manager.verifyFile(inner_path, content) + body = peer.getFile(site.address, inner_path).read() except Exception as err: - self.log.debug("Update for %s is invalid: %s" % (inner_path, err)) - error = err - valid = False + self.log.debug("Can't download updated file %s: %s" % (inner_path, err)) + self.response({"error": "File invalid update: Can't download updaed file"}) + self.connection.badAction(5) + return + + if should_validate_content: + try: + content = json.loads(body.decode()) + except Exception as err: + self.log.debug("Update for %s is invalid JSON: %s" % (inner_path, err)) + self.response({"error": "File invalid JSON"}) + self.connection.badAction(5) + return + + file_uri = "%s/%s:%s" % (site.address, inner_path, content["modified"]) + + if self.server.files_parsing.get(file_uri): # Check if we already working on it + valid = None # Same file + else: + try: + valid = site.content_manager.verifyFile(inner_path, content) + except Exception as err: + self.log.debug("Update for %s is invalid: %s" % (inner_path, err)) + error = err + valid = False if valid is True: # Valid and changed site.log.info("Update for %s looks valid, saving..." % inner_path) self.server.files_parsing[file_uri] = True - site.storage.write(inner_path, params["body"]) + site.storage.write(inner_path, body) del params["body"] site.onFileDone(inner_path) # Trigger filedone @@ -219,7 +236,6 @@ class FileRequest(object): if not streaming: file.read_bytes = read_bytes - if params["location"] > file_size: self.connection.badAction(5) raise RequestError("Bad file location") diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index c1d3652f..b5b22436 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -21,8 +21,9 @@ if config.use_tempfiles: @PluginManager.acceptPlugins class Peer(object): __slots__ = ( - "ip", "port", "site", "key", "connection", "connection_server", "time_found", "time_response", "time_hashfield", "time_added", "has_hashfield", "is_tracker_connection", - "time_my_hashfield_sent", "last_ping", "reputation", "last_content_json_update", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" + "ip", "port", "site", "key", "connection", "connection_server", "time_found", "time_response", "time_hashfield", + "time_added", "has_hashfield", "is_tracker_connection", "time_my_hashfield_sent", "last_ping", "reputation", + "last_content_json_update", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" ) def __init__(self, ip, port, site=None, connection_server=None): @@ -357,6 +358,19 @@ class Peer(object): self.time_my_hashfield_sent = time.time() return True + def publish(self, address, inner_path, body, modified, diffs=[]): + if len(body) > 10 * 1024 and self.connection and self.connection.handshake.get("rev", 0) >= 4095: + # To save bw we don't push big content.json to peers + body = b"" + + return self.request("update", { + "site": address, + "inner_path": inner_path, + "body": body, + "modified": modified, + "diffs": diffs + }) + # Stop and remove from site def remove(self, reason="Removing"): self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) diff --git a/src/Site/Site.py b/src/Site/Site.py index e118724e..34bea97d 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -527,12 +527,7 @@ class Site(object): for retry in range(2): try: with gevent.Timeout(timeout, False): - result = peer.request("update", { - "site": self.address, - "inner_path": inner_path, - "body": body, - "diffs": diffs - }) + result = peer.publish(self.address, inner_path, body, content_json_modified, diffs) if result: break except Exception as err: diff --git a/src/Test/TestSiteDownload.py b/src/Test/TestSiteDownload.py index 2446da64..1d4ba4c1 100644 --- a/src/Test/TestSiteDownload.py +++ b/src/Test/TestSiteDownload.py @@ -405,9 +405,58 @@ class TestSiteDownload: site.content_manager.sign("content.json", privatekey="5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv") site.publish(diffs=diffs) site_temp.download(blind_includes=True).join(timeout=5) - assert len([request for request in requests if request[0] in ("getFile", "streamFile")]) == 0 + assert len([request for request in requests if request[1] in ("getFile", "streamFile")]) == 0 assert site_temp.storage.open("data/data.json").read() == data_new assert site_temp.storage.deleteFiles() [connection.close() for connection in file_server.connections] + + def testBigUpdate(self, file_server, site, site_temp): + # Init source server + site.connection_server = file_server + file_server.sites[site.address] = site + + # Init client server + client = FileServer(file_server.ip, 1545) + client.sites[site_temp.address] = site_temp + site_temp.connection_server = client + + # Connect peers + site_temp.addPeer(file_server.ip, 1544) + + # Download site from site to site_temp + site_temp.download(blind_includes=True).join(timeout=5) + + # Update file + data_original = site.storage.open("data/data.json").read() + data_new = data_original.replace(b'"ZeroBlog"', b'"PatchedZeroBlog"') + assert data_original != data_new + + site.storage.open("data/data.json-new", "wb").write(data_new) + + assert site.storage.open("data/data.json-new").read() == data_new + assert site_temp.storage.open("data/data.json").read() != data_new + + # Generate diff + diffs = site.content_manager.getDiffs("content.json") + assert not site.storage.isFile("data/data.json-new") # New data file removed + assert site.storage.open("data/data.json").read() == data_new # -new postfix removed + assert "data/data.json" in diffs + + content_json = site.storage.loadJson("content.json") + content_json["title"] = "BigZeroBlog" * 1024 * 10 + site.storage.writeJson("content.json", content_json) + site.content_manager.loadContent("content.json", force=True) + + # Publish with patch + site.log.info("Publish new data.json with patch") + with Spy.Spy(FileRequest, "route") as requests: + site.content_manager.sign("content.json", privatekey="5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv") + assert site.storage.getSize("content.json") > 10 * 1024 # Make it a big content.json + site.publish(diffs=diffs) + site_temp.download(blind_includes=True).join(timeout=5) + file_requests = [request for request in requests if request[1] in ("getFile", "streamFile")] + assert len(file_requests) == 1 + + assert site_temp.storage.open("data/data.json").read() == data_new