Implement Send Back LRU cache to reduce useless network transfers
This commit is contained in:
parent
32eb47c482
commit
ef69dcd331
3 changed files with 59 additions and 5 deletions
|
@ -268,6 +268,9 @@ class Config(object):
|
|||
self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit')
|
||||
self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers')
|
||||
|
||||
self.parser.add_argument('--send_back_lru_size', help='Size of the send back LRU cache', default=5000, type=int, metavar='send_back_lru_size')
|
||||
self.parser.add_argument('--send_back_limit', help='Send no more than so many files at once back to peer, when we discovered that the peer held older file versions', default=3, type=int, metavar='send_back_limit')
|
||||
|
||||
self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False)
|
||||
|
||||
self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip')
|
||||
|
|
|
@ -212,7 +212,6 @@ class Site(object):
|
|||
self.peer_connector = SiteHelpers.PeerConnector(self) # Connect more peers in background by request
|
||||
self.persistent_peer_req = None # The persistent peer requirement, managed by maintenance handler
|
||||
|
||||
|
||||
if not self.settings.get("wrapper_key"): # To auth websocket permissions
|
||||
self.settings["wrapper_key"] = CryptHash.random()
|
||||
self.log.debug("New wrapper key: %s" % self.settings["wrapper_key"])
|
||||
|
@ -308,6 +307,12 @@ class Site(object):
|
|||
thread = self.greenlet_manager.spawnLater(*args, **kwargs)
|
||||
return thread
|
||||
|
||||
def checkSendBackLRU(self, peer, inner_path, remote_modified):
|
||||
return SiteManager.site_manager.checkSendBackLRU(self, peer, inner_path, remote_modified)
|
||||
|
||||
def addToSendBackLRU(self, peer, inner_path, modified):
|
||||
return SiteManager.site_manager.addToSendBackLRU(self, peer, inner_path, modified)
|
||||
|
||||
def getSettingsCache(self):
|
||||
back = {}
|
||||
back["bad_files"] = self.bad_files
|
||||
|
@ -619,7 +624,8 @@ class Site(object):
|
|||
queried.append(peer)
|
||||
modified_contents = []
|
||||
send_back = []
|
||||
send_back_limit = 5
|
||||
send_back_limit = config.send_back_limit
|
||||
send_back_skipped = 0
|
||||
my_modified = self.content_manager.listModified(since)
|
||||
num_old_files = 0
|
||||
for inner_path, modified in res["modified_files"].items(): # Check if the peer has newer files than we
|
||||
|
@ -631,7 +637,10 @@ class Site(object):
|
|||
modified_contents.append(inner_path)
|
||||
self.bad_files[inner_path] = self.bad_files.get(inner_path, 1)
|
||||
if has_older:
|
||||
send_back.append(inner_path)
|
||||
if self.checkSendBackLRU(peer, inner_path, modified):
|
||||
send_back_skipped += 1
|
||||
else:
|
||||
send_back.append(inner_path)
|
||||
|
||||
if modified_contents:
|
||||
self.log.info("CheckModifications: %s new modified files from %s" % (len(modified_contents), peer))
|
||||
|
@ -653,7 +662,10 @@ class Site(object):
|
|||
self.log.info("CheckModifications: %s: %s < %s" % (
|
||||
inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0)
|
||||
))
|
||||
self.spawn(self.publisher, inner_path, [peer], [], 1)
|
||||
self.spawn(self.publisher, inner_path, [peer], [], 1, save_to_send_back_lru=True)
|
||||
|
||||
if send_back_skipped:
|
||||
self.log.info("CheckModifications: %s has older versions of %s files, skipped according to send back LRU" % (peer, send_back_skipped))
|
||||
|
||||
self.log.debug("CheckModifications: Waiting for %s pooledDownloadContent" % len(threads))
|
||||
gevent.joinall(threads)
|
||||
|
@ -842,7 +854,7 @@ class Site(object):
|
|||
gevent.joinall(content_threads)
|
||||
|
||||
# Publish worker
|
||||
def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None, max_retries=2):
|
||||
def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None, max_retries=2, save_to_send_back_lru=False):
|
||||
file_size = self.storage.getSize(inner_path)
|
||||
content_json_modified = self.content_manager.contents[inner_path]["modified"]
|
||||
body = self.storage.read(inner_path)
|
||||
|
@ -877,6 +889,12 @@ class Site(object):
|
|||
self.log.error("Publish error: %s" % Debug.formatException(err))
|
||||
result = {"exception": Debug.formatException(err)}
|
||||
|
||||
# We add to the send back lru not only on success, but also on errors.
|
||||
# Some peers returns None. (Why?)
|
||||
# Anyway, we tried our best in delivering possibly lost updates.
|
||||
if save_to_send_back_lru:
|
||||
self.addToSendBackLRU(peer, inner_path, content_json_modified)
|
||||
|
||||
if result and "ok" in result:
|
||||
published.append(peer)
|
||||
if cb_progress and len(published) <= limit:
|
||||
|
|
|
@ -4,6 +4,7 @@ import re
|
|||
import os
|
||||
import time
|
||||
import atexit
|
||||
import collections
|
||||
|
||||
import gevent
|
||||
|
||||
|
@ -27,6 +28,21 @@ class SiteManager(object):
|
|||
gevent.spawn(self.saveTimer)
|
||||
atexit.register(lambda: self.save(recalculate_size=True))
|
||||
|
||||
# ZeroNet has a bug of desyncing between:
|
||||
# * time sent in a response of listModified
|
||||
# and
|
||||
# * time checked on receiving a file.
|
||||
# This leads to the following scenario:
|
||||
# * Request listModified.
|
||||
# * Detect that the remote peer missing an update
|
||||
# * Send a newer version of the file back to the peer.
|
||||
# * The peer responses "ok: File not changed"
|
||||
# .....
|
||||
# * Request listModified the next time and do all the same again.
|
||||
# So we keep the list of sent back entries to prevent sending multiple useless updates:
|
||||
# "{site.address} - {peer.key} - {inner_path}" -> mtime
|
||||
self.send_back_lru = collections.OrderedDict()
|
||||
|
||||
# Load all sites from data/sites.json
|
||||
@util.Noparallel()
|
||||
def load(self, cleanup=True, startup=False):
|
||||
|
@ -220,6 +236,23 @@ class SiteManager(object):
|
|||
self.load(startup=True)
|
||||
return self.sites
|
||||
|
||||
# Return False if we never sent <inner_path> to <peer>
|
||||
# or if the file that was sent was older than <remote_modified>
|
||||
# so that send back logic is suppressed for <inner_path>.
|
||||
# True if <inner_path> can be sent back to <peer>.
|
||||
def checkSendBackLRU(self, site, peer, inner_path, remote_modified):
|
||||
key = site.address + ' - ' + peer.key + ' - ' + inner_path
|
||||
sent_modified = self.send_back_lru.get(key, 0)
|
||||
return remote_modified < sent_modified
|
||||
|
||||
def addToSendBackLRU(self, site, peer, inner_path, modified):
|
||||
key = site.address + ' - ' + peer.key + ' - ' + inner_path
|
||||
if self.send_back_lru.get(key, None) is None:
|
||||
self.send_back_lru[key] = modified
|
||||
while len(self.send_back_lru) > config.send_back_lru_size:
|
||||
self.send_back_lru.popitem(last=False)
|
||||
else:
|
||||
self.send_back_lru.move_to_end(key, last=True)
|
||||
|
||||
site_manager = SiteManager() # Singletone
|
||||
|
||||
|
|
Loading…
Reference in a new issue