diff --git a/src/Site/Site.py b/src/Site/Site.py index cf0beddb..b5caebf2 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -317,24 +317,38 @@ class Site(object): return valid def pooledDownloadContent(self, inner_paths, pool_size=100, only_if_bad=False): - self.log.debug("New downloadContent pool: len: %s" % len(inner_paths)) + self.log.debug("New downloadContent pool: len: %s, only if bad: %s" % (len(inner_paths), only_if_bad)) self.worker_manager.started_task_num += len(inner_paths) pool = gevent.pool.Pool(pool_size) + num_skipped = 0 + site_size_limit = self.getSizeLimit() * 1024 * 1024 for inner_path in inner_paths: if not only_if_bad or inner_path in self.bad_files: pool.spawn(self.downloadContent, inner_path) + else: + num_skipped += 1 self.worker_manager.started_task_num -= 1 - self.log.debug("Ended downloadContent pool len: %s" % len(inner_paths)) + if self.settings["size"] > site_size_limit * 0.95: + self.log.warning("Site size limit almost reached, aborting downloadContent pool") + for aborted_inner_path in inner_paths: + if aborted_inner_path in self.bad_files: + del self.bad_files[aborted_inner_path] + self.worker_manager.removeSolvedFileTasks(mark_as_good=False) + break + self.log.debug("Ended downloadContent pool len: %s, skipped: %s" % (len(inner_paths), num_skipped)) def pooledDownloadFile(self, inner_paths, pool_size=100, only_if_bad=False): - self.log.debug("New downloadFile pool: len: %s" % len(inner_paths)) + self.log.debug("New downloadFile pool: len: %s, only if bad: %s" % (len(inner_paths), only_if_bad)) self.worker_manager.started_task_num += len(inner_paths) pool = gevent.pool.Pool(pool_size) + num_skipped = 0 for inner_path in inner_paths: if not only_if_bad or inner_path in self.bad_files: pool.spawn(self.needFile, inner_path, update=True) + else: + num_skipped += 1 self.worker_manager.started_task_num -= 1 - self.log.debug("Ended downloadFile pool len: %s" % len(inner_paths)) + self.log.debug("Ended downloadFile pool len: %s, skipped: %s" % (len(inner_paths), num_skipped)) # Update worker, try to find client that supports listModifications command def updater(self, peers_try, queried, since): @@ -369,7 +383,7 @@ class Site(object): if modified_contents: self.log.debug("%s new modified file from %s" % (len(modified_contents), peer)) modified_contents.sort(key=lambda inner_path: 0 - res["modified_files"][inner_path]) # Download newest first - gevent.spawn(self.pooledDownloadContent, modified_contents) + gevent.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True) # Check modified content.json files from peers and add modified files to bad_files # Return: Successfully queried peers [Peer, Peer...] @@ -415,7 +429,7 @@ class Site(object): if queried: break - self.log.debug("Queried listModifications from: %s in %.3fs" % (queried, time.time() - s)) + self.log.debug("Queried listModifications from: %s in %.3fs since %s" % (queried, time.time() - s, since)) time.sleep(0.1) return queried @@ -757,6 +771,7 @@ class Site(object): def addPeer(self, ip, port, return_peer=False, connection=None, source="other"): if not ip or ip == "0.0.0.0": return False + key = "%s:%s" % (ip, port) peer = self.peers.get(key) if peer: # Already has this ip