Use pool to download large ammount of content.json files
This commit is contained in:
parent
8f158cbc8f
commit
4437a99330
1 changed files with 17 additions and 5 deletions
|
@ -12,6 +12,7 @@ import urllib
|
|||
import urllib2
|
||||
|
||||
import gevent
|
||||
import gevent.pool
|
||||
|
||||
import util
|
||||
from lib import bencode
|
||||
|
@ -120,6 +121,7 @@ class Site(object):
|
|||
s = time.time()
|
||||
if config.verbose:
|
||||
self.log.debug("Downloading %s..." % inner_path)
|
||||
|
||||
found = self.needFile(inner_path, update=self.bad_files.get(inner_path))
|
||||
content_inner_dir = helper.getDirname(inner_path)
|
||||
if not found:
|
||||
|
@ -246,6 +248,15 @@ class Site(object):
|
|||
|
||||
return valid
|
||||
|
||||
def pooledDownloadContent(self, inner_paths, pool_size=100):
|
||||
self.log.debug("New downloadContent pool: len: %s" % len(inner_paths))
|
||||
self.worker_manager.started_task_num += len(inner_paths)
|
||||
pool = gevent.pool.Pool(pool_size)
|
||||
for inner_path in inner_paths:
|
||||
pool.spawn(self.downloadContent, inner_path)
|
||||
self.worker_manager.started_task_num -= 1
|
||||
self.log.debug("Ended downloadContent pool len: %s" % len(inner_paths))
|
||||
|
||||
# Update worker, try to find client that supports listModifications command
|
||||
def updater(self, peers_try, queried, since):
|
||||
while 1:
|
||||
|
@ -266,12 +277,13 @@ class Site(object):
|
|||
for inner_path, modified in res["modified_files"].iteritems(): # Check if the peer has newer files than we
|
||||
newer = int(modified) > my_modified.get(inner_path, 0)
|
||||
if newer and inner_path not in self.bad_files and not self.content_manager.isArchived(inner_path, modified):
|
||||
num_modified += 1
|
||||
# We dont have this file or we have older
|
||||
self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 # Mark as bad file
|
||||
gevent.spawn(self.downloadContent, inner_path) # Download the content.json + the changed files
|
||||
if num_modified > 0:
|
||||
self.log.debug("%s new modified file from %s" % (num_modified, peer))
|
||||
modified_contents.append(inner_path)
|
||||
self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1
|
||||
if modified_contents:
|
||||
self.log.debug("%s new modified file from %s" % (len(modified_contents), peer))
|
||||
modified_contents.sort(key=lambda inner_path: 0 - res["modified_files"][inner_path]) # Download newest first
|
||||
gevent.spawn(self.pooledDownloadContent, modified_contents)
|
||||
|
||||
# Check modified content.json files from peers and add modified files to bad_files
|
||||
# Return: Successfully queried peers [Peer, Peer...]
|
||||
|
|
Loading…
Reference in a new issue