Stop downloadcontent pool if reached 95% of site limit

This commit is contained in:
shortcutme 2018-09-17 15:27:11 +02:00
parent 577761a6bb
commit 9658c2d553
No known key found for this signature in database
GPG key ID: 5B63BAE6CB9613AE

View file

@ -317,24 +317,38 @@ class Site(object):
return valid return valid
def pooledDownloadContent(self, inner_paths, pool_size=100, only_if_bad=False): def pooledDownloadContent(self, inner_paths, pool_size=100, only_if_bad=False):
self.log.debug("New downloadContent pool: len: %s" % len(inner_paths)) self.log.debug("New downloadContent pool: len: %s, only if bad: %s" % (len(inner_paths), only_if_bad))
self.worker_manager.started_task_num += len(inner_paths) self.worker_manager.started_task_num += len(inner_paths)
pool = gevent.pool.Pool(pool_size) pool = gevent.pool.Pool(pool_size)
num_skipped = 0
site_size_limit = self.getSizeLimit() * 1024 * 1024
for inner_path in inner_paths: for inner_path in inner_paths:
if not only_if_bad or inner_path in self.bad_files: if not only_if_bad or inner_path in self.bad_files:
pool.spawn(self.downloadContent, inner_path) pool.spawn(self.downloadContent, inner_path)
else:
num_skipped += 1
self.worker_manager.started_task_num -= 1 self.worker_manager.started_task_num -= 1
self.log.debug("Ended downloadContent pool len: %s" % len(inner_paths)) if self.settings["size"] > site_size_limit * 0.95:
self.log.warning("Site size limit almost reached, aborting downloadContent pool")
for aborted_inner_path in inner_paths:
if aborted_inner_path in self.bad_files:
del self.bad_files[aborted_inner_path]
self.worker_manager.removeSolvedFileTasks(mark_as_good=False)
break
self.log.debug("Ended downloadContent pool len: %s, skipped: %s" % (len(inner_paths), num_skipped))
def pooledDownloadFile(self, inner_paths, pool_size=100, only_if_bad=False): def pooledDownloadFile(self, inner_paths, pool_size=100, only_if_bad=False):
self.log.debug("New downloadFile pool: len: %s" % len(inner_paths)) self.log.debug("New downloadFile pool: len: %s, only if bad: %s" % (len(inner_paths), only_if_bad))
self.worker_manager.started_task_num += len(inner_paths) self.worker_manager.started_task_num += len(inner_paths)
pool = gevent.pool.Pool(pool_size) pool = gevent.pool.Pool(pool_size)
num_skipped = 0
for inner_path in inner_paths: for inner_path in inner_paths:
if not only_if_bad or inner_path in self.bad_files: if not only_if_bad or inner_path in self.bad_files:
pool.spawn(self.needFile, inner_path, update=True) pool.spawn(self.needFile, inner_path, update=True)
else:
num_skipped += 1
self.worker_manager.started_task_num -= 1 self.worker_manager.started_task_num -= 1
self.log.debug("Ended downloadFile pool len: %s" % len(inner_paths)) self.log.debug("Ended downloadFile pool len: %s, skipped: %s" % (len(inner_paths), num_skipped))
# Update worker, try to find client that supports listModifications command # Update worker, try to find client that supports listModifications command
def updater(self, peers_try, queried, since): def updater(self, peers_try, queried, since):
@ -369,7 +383,7 @@ class Site(object):
if modified_contents: if modified_contents:
self.log.debug("%s new modified file from %s" % (len(modified_contents), peer)) self.log.debug("%s new modified file from %s" % (len(modified_contents), peer))
modified_contents.sort(key=lambda inner_path: 0 - res["modified_files"][inner_path]) # Download newest first modified_contents.sort(key=lambda inner_path: 0 - res["modified_files"][inner_path]) # Download newest first
gevent.spawn(self.pooledDownloadContent, modified_contents) gevent.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True)
# Check modified content.json files from peers and add modified files to bad_files # Check modified content.json files from peers and add modified files to bad_files
# Return: Successfully queried peers [Peer, Peer...] # Return: Successfully queried peers [Peer, Peer...]
@ -415,7 +429,7 @@ class Site(object):
if queried: if queried:
break break
self.log.debug("Queried listModifications from: %s in %.3fs" % (queried, time.time() - s)) self.log.debug("Queried listModifications from: %s in %.3fs since %s" % (queried, time.time() - s, since))
time.sleep(0.1) time.sleep(0.1)
return queried return queried
@ -757,6 +771,7 @@ class Site(object):
def addPeer(self, ip, port, return_peer=False, connection=None, source="other"): def addPeer(self, ip, port, return_peer=False, connection=None, source="other"):
if not ip or ip == "0.0.0.0": if not ip or ip == "0.0.0.0":
return False return False
key = "%s:%s" % (ip, port) key = "%s:%s" % (ip, port)
peer = self.peers.get(key) peer = self.peers.get(key)
if peer: # Already has this ip if peer: # Already has this ip