From ef69dcd331b4123c3bec267b23c26b0b4807afb1 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Wed, 27 Oct 2021 01:06:12 +0700 Subject: [PATCH] Implement Send Back LRU cache to reduce useless network transfers --- src/Config.py | 3 +++ src/Site/Site.py | 28 +++++++++++++++++++++++----- src/Site/SiteManager.py | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/src/Config.py b/src/Config.py index 67f65f7b..26fbafb2 100644 --- a/src/Config.py +++ b/src/Config.py @@ -268,6 +268,9 @@ class Config(object): self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit') self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers') + self.parser.add_argument('--send_back_lru_size', help='Size of the send back LRU cache', default=5000, type=int, metavar='send_back_lru_size') + self.parser.add_argument('--send_back_limit', help='Send no more than so many files at once back to peer, when we discovered that the peer held older file versions', default=3, type=int, metavar='send_back_limit') + self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False) self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip') diff --git a/src/Site/Site.py b/src/Site/Site.py index d4f2ad62..73bb01dc 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -212,7 +212,6 @@ class Site(object): self.peer_connector = SiteHelpers.PeerConnector(self) # Connect more peers in background by request self.persistent_peer_req = None # The persistent peer requirement, managed by maintenance handler - if not self.settings.get("wrapper_key"): # To auth websocket permissions self.settings["wrapper_key"] = CryptHash.random() self.log.debug("New wrapper key: %s" % self.settings["wrapper_key"]) @@ -308,6 +307,12 @@ class Site(object): thread = self.greenlet_manager.spawnLater(*args, **kwargs) return thread + def checkSendBackLRU(self, peer, inner_path, remote_modified): + return SiteManager.site_manager.checkSendBackLRU(self, peer, inner_path, remote_modified) + + def addToSendBackLRU(self, peer, inner_path, modified): + return SiteManager.site_manager.addToSendBackLRU(self, peer, inner_path, modified) + def getSettingsCache(self): back = {} back["bad_files"] = self.bad_files @@ -619,7 +624,8 @@ class Site(object): queried.append(peer) modified_contents = [] send_back = [] - send_back_limit = 5 + send_back_limit = config.send_back_limit + send_back_skipped = 0 my_modified = self.content_manager.listModified(since) num_old_files = 0 for inner_path, modified in res["modified_files"].items(): # Check if the peer has newer files than we @@ -631,7 +637,10 @@ class Site(object): modified_contents.append(inner_path) self.bad_files[inner_path] = self.bad_files.get(inner_path, 1) if has_older: - send_back.append(inner_path) + if self.checkSendBackLRU(peer, inner_path, modified): + send_back_skipped += 1 + else: + send_back.append(inner_path) if modified_contents: self.log.info("CheckModifications: %s new modified files from %s" % (len(modified_contents), peer)) @@ -653,7 +662,10 @@ class Site(object): self.log.info("CheckModifications: %s: %s < %s" % ( inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0) )) - self.spawn(self.publisher, inner_path, [peer], [], 1) + self.spawn(self.publisher, inner_path, [peer], [], 1, save_to_send_back_lru=True) + + if send_back_skipped: + self.log.info("CheckModifications: %s has older versions of %s files, skipped according to send back LRU" % (peer, send_back_skipped)) self.log.debug("CheckModifications: Waiting for %s pooledDownloadContent" % len(threads)) gevent.joinall(threads) @@ -842,7 +854,7 @@ class Site(object): gevent.joinall(content_threads) # Publish worker - def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None, max_retries=2): + def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None, max_retries=2, save_to_send_back_lru=False): file_size = self.storage.getSize(inner_path) content_json_modified = self.content_manager.contents[inner_path]["modified"] body = self.storage.read(inner_path) @@ -877,6 +889,12 @@ class Site(object): self.log.error("Publish error: %s" % Debug.formatException(err)) result = {"exception": Debug.formatException(err)} + # We add to the send back lru not only on success, but also on errors. + # Some peers returns None. (Why?) + # Anyway, we tried our best in delivering possibly lost updates. + if save_to_send_back_lru: + self.addToSendBackLRU(peer, inner_path, content_json_modified) + if result and "ok" in result: published.append(peer) if cb_progress and len(published) <= limit: diff --git a/src/Site/SiteManager.py b/src/Site/SiteManager.py index 1b065e93..035e9279 100644 --- a/src/Site/SiteManager.py +++ b/src/Site/SiteManager.py @@ -4,6 +4,7 @@ import re import os import time import atexit +import collections import gevent @@ -27,6 +28,21 @@ class SiteManager(object): gevent.spawn(self.saveTimer) atexit.register(lambda: self.save(recalculate_size=True)) + # ZeroNet has a bug of desyncing between: + # * time sent in a response of listModified + # and + # * time checked on receiving a file. + # This leads to the following scenario: + # * Request listModified. + # * Detect that the remote peer missing an update + # * Send a newer version of the file back to the peer. + # * The peer responses "ok: File not changed" + # ..... + # * Request listModified the next time and do all the same again. + # So we keep the list of sent back entries to prevent sending multiple useless updates: + # "{site.address} - {peer.key} - {inner_path}" -> mtime + self.send_back_lru = collections.OrderedDict() + # Load all sites from data/sites.json @util.Noparallel() def load(self, cleanup=True, startup=False): @@ -220,6 +236,23 @@ class SiteManager(object): self.load(startup=True) return self.sites + # Return False if we never sent to + # or if the file that was sent was older than + # so that send back logic is suppressed for . + # True if can be sent back to . + def checkSendBackLRU(self, site, peer, inner_path, remote_modified): + key = site.address + ' - ' + peer.key + ' - ' + inner_path + sent_modified = self.send_back_lru.get(key, 0) + return remote_modified < sent_modified + + def addToSendBackLRU(self, site, peer, inner_path, modified): + key = site.address + ' - ' + peer.key + ' - ' + inner_path + if self.send_back_lru.get(key, None) is None: + self.send_back_lru[key] = modified + while len(self.send_back_lru) > config.send_back_lru_size: + self.send_back_lru.popitem(last=False) + else: + self.send_back_lru.move_to_end(key, last=True) site_manager = SiteManager() # Singletone