Don't push body of content.json with updates if larger than 10kb
This commit is contained in:
parent
e5d3b0e7b8
commit
4222c31b3e
4 changed files with 103 additions and 29 deletions
|
@ -4,7 +4,6 @@ import time
|
|||
import json
|
||||
import collections
|
||||
import itertools
|
||||
import socket
|
||||
|
||||
# Third party modules
|
||||
import gevent
|
||||
|
@ -110,14 +109,32 @@ class FileRequest(object):
|
|||
return False
|
||||
|
||||
inner_path = params.get("inner_path", "")
|
||||
current_content_modified = site.content_manager.contents.get(inner_path, {}).get("modified", 0)
|
||||
body = params["body"]
|
||||
|
||||
if not inner_path.endswith("content.json"):
|
||||
self.response({"error": "Only content.json update allowed"})
|
||||
self.connection.badAction(5)
|
||||
return
|
||||
|
||||
should_validate_content = True
|
||||
if "modified" in params and params["modified"] <= current_content_modified:
|
||||
should_validate_content = False
|
||||
valid = None # Same or earlier content as we have
|
||||
elif not body: # No body sent, we have to download it first
|
||||
self.log.debug("Missing body from update, downloading...")
|
||||
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer
|
||||
try:
|
||||
content = json.loads(params["body"].decode())
|
||||
body = peer.getFile(site.address, inner_path).read()
|
||||
except Exception as err:
|
||||
self.log.debug("Can't download updated file %s: %s" % (inner_path, err))
|
||||
self.response({"error": "File invalid update: Can't download updaed file"})
|
||||
self.connection.badAction(5)
|
||||
return
|
||||
|
||||
if should_validate_content:
|
||||
try:
|
||||
content = json.loads(body.decode())
|
||||
except Exception as err:
|
||||
self.log.debug("Update for %s is invalid JSON: %s" % (inner_path, err))
|
||||
self.response({"error": "File invalid JSON"})
|
||||
|
@ -139,7 +156,7 @@ class FileRequest(object):
|
|||
if valid is True: # Valid and changed
|
||||
site.log.info("Update for %s looks valid, saving..." % inner_path)
|
||||
self.server.files_parsing[file_uri] = True
|
||||
site.storage.write(inner_path, params["body"])
|
||||
site.storage.write(inner_path, body)
|
||||
del params["body"]
|
||||
|
||||
site.onFileDone(inner_path) # Trigger filedone
|
||||
|
@ -219,7 +236,6 @@ class FileRequest(object):
|
|||
if not streaming:
|
||||
file.read_bytes = read_bytes
|
||||
|
||||
|
||||
if params["location"] > file_size:
|
||||
self.connection.badAction(5)
|
||||
raise RequestError("Bad file location")
|
||||
|
|
|
@ -21,8 +21,9 @@ if config.use_tempfiles:
|
|||
@PluginManager.acceptPlugins
|
||||
class Peer(object):
|
||||
__slots__ = (
|
||||
"ip", "port", "site", "key", "connection", "connection_server", "time_found", "time_response", "time_hashfield", "time_added", "has_hashfield", "is_tracker_connection",
|
||||
"time_my_hashfield_sent", "last_ping", "reputation", "last_content_json_update", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time"
|
||||
"ip", "port", "site", "key", "connection", "connection_server", "time_found", "time_response", "time_hashfield",
|
||||
"time_added", "has_hashfield", "is_tracker_connection", "time_my_hashfield_sent", "last_ping", "reputation",
|
||||
"last_content_json_update", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time"
|
||||
)
|
||||
|
||||
def __init__(self, ip, port, site=None, connection_server=None):
|
||||
|
@ -357,6 +358,19 @@ class Peer(object):
|
|||
self.time_my_hashfield_sent = time.time()
|
||||
return True
|
||||
|
||||
def publish(self, address, inner_path, body, modified, diffs=[]):
|
||||
if len(body) > 10 * 1024 and self.connection and self.connection.handshake.get("rev", 0) >= 4095:
|
||||
# To save bw we don't push big content.json to peers
|
||||
body = b""
|
||||
|
||||
return self.request("update", {
|
||||
"site": address,
|
||||
"inner_path": inner_path,
|
||||
"body": body,
|
||||
"modified": modified,
|
||||
"diffs": diffs
|
||||
})
|
||||
|
||||
# Stop and remove from site
|
||||
def remove(self, reason="Removing"):
|
||||
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
||||
|
|
|
@ -527,12 +527,7 @@ class Site(object):
|
|||
for retry in range(2):
|
||||
try:
|
||||
with gevent.Timeout(timeout, False):
|
||||
result = peer.request("update", {
|
||||
"site": self.address,
|
||||
"inner_path": inner_path,
|
||||
"body": body,
|
||||
"diffs": diffs
|
||||
})
|
||||
result = peer.publish(self.address, inner_path, body, content_json_modified, diffs)
|
||||
if result:
|
||||
break
|
||||
except Exception as err:
|
||||
|
|
|
@ -405,9 +405,58 @@ class TestSiteDownload:
|
|||
site.content_manager.sign("content.json", privatekey="5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv")
|
||||
site.publish(diffs=diffs)
|
||||
site_temp.download(blind_includes=True).join(timeout=5)
|
||||
assert len([request for request in requests if request[0] in ("getFile", "streamFile")]) == 0
|
||||
assert len([request for request in requests if request[1] in ("getFile", "streamFile")]) == 0
|
||||
|
||||
assert site_temp.storage.open("data/data.json").read() == data_new
|
||||
|
||||
assert site_temp.storage.deleteFiles()
|
||||
[connection.close() for connection in file_server.connections]
|
||||
|
||||
def testBigUpdate(self, file_server, site, site_temp):
|
||||
# Init source server
|
||||
site.connection_server = file_server
|
||||
file_server.sites[site.address] = site
|
||||
|
||||
# Init client server
|
||||
client = FileServer(file_server.ip, 1545)
|
||||
client.sites[site_temp.address] = site_temp
|
||||
site_temp.connection_server = client
|
||||
|
||||
# Connect peers
|
||||
site_temp.addPeer(file_server.ip, 1544)
|
||||
|
||||
# Download site from site to site_temp
|
||||
site_temp.download(blind_includes=True).join(timeout=5)
|
||||
|
||||
# Update file
|
||||
data_original = site.storage.open("data/data.json").read()
|
||||
data_new = data_original.replace(b'"ZeroBlog"', b'"PatchedZeroBlog"')
|
||||
assert data_original != data_new
|
||||
|
||||
site.storage.open("data/data.json-new", "wb").write(data_new)
|
||||
|
||||
assert site.storage.open("data/data.json-new").read() == data_new
|
||||
assert site_temp.storage.open("data/data.json").read() != data_new
|
||||
|
||||
# Generate diff
|
||||
diffs = site.content_manager.getDiffs("content.json")
|
||||
assert not site.storage.isFile("data/data.json-new") # New data file removed
|
||||
assert site.storage.open("data/data.json").read() == data_new # -new postfix removed
|
||||
assert "data/data.json" in diffs
|
||||
|
||||
content_json = site.storage.loadJson("content.json")
|
||||
content_json["title"] = "BigZeroBlog" * 1024 * 10
|
||||
site.storage.writeJson("content.json", content_json)
|
||||
site.content_manager.loadContent("content.json", force=True)
|
||||
|
||||
# Publish with patch
|
||||
site.log.info("Publish new data.json with patch")
|
||||
with Spy.Spy(FileRequest, "route") as requests:
|
||||
site.content_manager.sign("content.json", privatekey="5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv")
|
||||
assert site.storage.getSize("content.json") > 10 * 1024 # Make it a big content.json
|
||||
site.publish(diffs=diffs)
|
||||
site_temp.download(blind_includes=True).join(timeout=5)
|
||||
file_requests = [request for request in requests if request[1] in ("getFile", "streamFile")]
|
||||
assert len(file_requests) == 1
|
||||
|
||||
assert site_temp.storage.open("data/data.json").read() == data_new
|
||||
|
|
Loading…
Reference in a new issue