From 986dedfa7f4ea8e0c1cc6c4bbfe3fccdf15640c6 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Thu, 25 Mar 2021 12:41:53 +0700 Subject: [PATCH] Trying to fix incomplete updates Partially fixes https://github.com/HelloZeroNet/ZeroNet/issues/2476 Approx. every 10th update check is now performed with `since = 0` --- src/Site/Site.py | 111 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 33 deletions(-) diff --git a/src/Site/Site.py b/src/Site/Site.py index 5e15172b..0a8be32d 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -169,6 +169,12 @@ class Site(object): self.background_publishers = {} self.background_publishers_lock = gevent.lock.RLock() + # FZS = forced zero "since" + self.fzs_range = 20 + self.fzs_interval = 30 * 60 + self.fzs_count = random.randint(0, self.fzs_range / 4) + self.fzs_timestamp = 0 + self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers_recent = collections.deque(maxlen=150) @@ -530,14 +536,18 @@ class Site(object): self.log.debug("Ended downloadFile pool len: %s, skipped: %s" % (len(inner_paths), num_skipped)) # Update worker, try to find client that supports listModifications command - def updater(self, peers_try, queried, since): + def updater(self, peers_try, queried, need_queries, since): threads = [] while 1: - if not peers_try or len(queried) >= 3: # Stop after 3 successful query + if not peers_try or len(queried) >= need_queries: # Stop after 3 successful query break peer = peers_try.pop(0) + + if peer in queried: + continue + if config.verbose: - self.log.debug("CheckModifications: Try to get updates from: %s Left: %s" % (peer, peers_try)) + self.log.debug("CheckModifications: Trying to get updates from: %s Left: %s" % (peer, peers_try)) res = None with gevent.Timeout(20, exception=False): @@ -548,6 +558,8 @@ class Site(object): queried.append(peer) modified_contents = [] + send_back = [] + send_back_limit = 5 my_modified = self.content_manager.listModified(since) num_old_files = 0 for inner_path, modified in res["modified_files"].items(): # Check if the peer has newer files than we @@ -558,26 +570,56 @@ class Site(object): # We dont have this file or we have older modified_contents.append(inner_path) self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 - if has_older and num_old_files < 5: - num_old_files += 1 - self.log.debug("CheckModifications: %s client has older version of %s, publishing there (%s/5)..." % (peer, inner_path, num_old_files)) - gevent.spawn(self.publisher, inner_path, [peer], [], 1) + if has_older: + send_back.append(inner_path) + if modified_contents: - self.log.debug("CheckModifications: %s new modified file from %s" % (len(modified_contents), peer)) + self.log.info("CheckModifications: %s new modified files from %s" % (len(modified_contents), peer)) modified_contents.sort(key=lambda inner_path: 0 - res["modified_files"][inner_path]) # Download newest first + for inner_path in modified_contents: + self.log.info("CheckModifications: %s: %s > %s" % ( + inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0) + )) t = gevent.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True) threads.append(t) - if config.verbose: - self.log.debug("CheckModifications: Waiting for %s pooledDownloadContent" % len(threads)) + + if send_back: + self.log.info("CheckModifications: %s has older versions of %s files" % (peer, len(send_back))) + if len(send_back) > send_back_limit: + self.log.info("CheckModifications: choosing %s files to publish back" % (send_back_limit)) + random.shuffle(send_back) + send_back = send_back[0:send_back_limit] + for inner_path in send_back: + self.log.info("CheckModifications: %s: %s < %s" % ( + inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0) + )) + gevent.spawn(self.publisher, inner_path, [peer], [], 1) + + self.log.debug("CheckModifications: Waiting for %s pooledDownloadContent" % len(threads)) gevent.joinall(threads) + # We need, with some rate, to perform the full check of modifications, + # "since the beginning of time", instead of the partial one. + def getForcedZeroSince(self): + now = time.time() + if self.fzs_timestamp + self.fzs_interval > now: + return False + self.fzs_count -= 1 + if self.fzs_count < 1: + self.fzs_count = random.randint(0, self.fzs_range) + self.fzs_timestamp = now + return True + return False + # Check modified content.json files from peers and add modified files to bad_files # Return: Successfully queried peers [Peer, Peer...] def checkModifications(self, since=None): s = time.time() peers_try = [] # Try these peers queried = [] # Successfully queried from these peers - limit = 5 + peer_limit = 10 + updater_limit = 3 + need_queries = 3 # Wait for peers if not self.peers: @@ -588,34 +630,37 @@ class Site(object): if self.peers: break - peers_try = self.getConnectedPeers() - peers_connected_num = len(peers_try) - if peers_connected_num < limit * 2: # Add more, non-connected peers if necessary - peers_try += self.getRecentPeers(limit * 5) + if since is None: + if self.getForcedZeroSince(): + since = 0 + else: + margin = 60 * 60 * 24 + since = self.settings.get("modified", margin) - margin - if since is None: # No since defined, download from last modification time-1day - since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24 + if since == 0: + peer_limit *= 4 + need_queries *= 4 - if config.verbose: - self.log.debug( - "CheckModifications: Try to get listModifications from peers: %s, connected: %s, since: %s" % - (peers_try, peers_connected_num, since) - ) + peers_try = self.getConnectedPeers() + self.getConnectablePeers(peer_limit) + + self.log.debug( + "CheckModifications: Trying to get listModifications from %s peers, %s connected, since: %s" % + (len(peers_try), len(self.getConnectedPeers()), since) + ) updaters = [] - for i in range(3): - updaters.append(gevent.spawn(self.updater, peers_try, queried, since)) + for i in range(updater_limit): + updaters.append(gevent.spawn(self.updater, peers_try, queried, need_queries, since)) - gevent.joinall(updaters, timeout=10) # Wait 10 sec to workers done query modifications + for r in range(10): + gevent.joinall(updaters, timeout=5+r) + if len(queried) >= need_queries or len(peers_try) == 0: + break + self.log.debug("CheckModifications: Waiting... (%s) succesfully queried: %s, left: %s" % + (r + 1, len(queried), len(peers_try))) - if not queried: # Start another 3 thread if first 3 is stuck - peers_try[0:0] = [peer for peer in self.getConnectedPeers() if peer.connection.connected] # Add connected peers - for _ in range(10): - gevent.joinall(updaters, timeout=10) # Wait another 10 sec if none of updaters finished - if queried: - break - - self.log.debug("CheckModifications: Queried listModifications from: %s in %.3fs since %s" % (queried, time.time() - s, since)) + self.log.debug("CheckModifications: Queried listModifications from %s peers in %.3fs since %s" % ( + len(queried), time.time() - s, since)) time.sleep(0.1) return queried