Pooled file downloads to handle large ammount of files better

This commit is contained in:
shortcutme 2016-11-07 23:39:45 +01:00
parent 83b3dc8fbe
commit e6a82d4c64
3 changed files with 100 additions and 11 deletions

View file

@ -179,13 +179,18 @@ class Site(object):
if inner_path == "content.json": if inner_path == "content.json":
gevent.spawn(self.updateHashfield) gevent.spawn(self.updateHashfield)
if self.settings.get("autodownloadoptional"): for file_relative_path in self.content_manager.contents[inner_path].get("files_optional", {}).keys():
for file_relative_path in self.content_manager.contents[inner_path].get("files_optional", {}).keys(): file_inner_path = content_inner_dir + file_relative_path
file_inner_path = content_inner_dir + file_relative_path if file_inner_path not in changed and not self.bad_files.get(file_inner_path):
# Start download and dont wait for finish, return the event continue
res = self.needFile(file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer) if not self.isDownloadable(file_inner_path):
if res is not True and res is not False: # Need downloading and file is allowed continue
file_threads.append(res) # Append evt # Start download and dont wait for finish, return the event
res = self.pooledNeedFile(
file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer
)
if res is not True and res is not False: # Need downloading and file is allowed
file_threads.append(res) # Append evt
# Wait for includes download # Wait for includes download
include_threads = [] include_threads = []
@ -229,10 +234,13 @@ class Site(object):
if bad_file.endswith("content.json"): if bad_file.endswith("content.json"):
content_inner_paths.append(bad_file) content_inner_paths.append(bad_file)
else: else:
self.needFile(bad_file, update=True, blocking=False) file_inner_paths.append(bad_file)
if content_inner_paths: if content_inner_paths:
self.pooledDownloadContent(content_inner_paths) self.pooledDownloadContent(content_inner_paths, only_if_bad=True)
if file_inner_paths:
self.pooledDownloadFile(file_inner_paths, only_if_bad=True)
# Download all files of the site # Download all files of the site
@util.Noparallel(blocking=False) @util.Noparallel(blocking=False)
@ -254,15 +262,26 @@ class Site(object):
return valid return valid
def pooledDownloadContent(self, inner_paths, pool_size=100): def pooledDownloadContent(self, inner_paths, pool_size=100, only_if_bad=False):
self.log.debug("New downloadContent pool: len: %s" % len(inner_paths)) self.log.debug("New downloadContent pool: len: %s" % len(inner_paths))
self.worker_manager.started_task_num += len(inner_paths) self.worker_manager.started_task_num += len(inner_paths)
pool = gevent.pool.Pool(pool_size) pool = gevent.pool.Pool(pool_size)
for inner_path in inner_paths: for inner_path in inner_paths:
pool.spawn(self.downloadContent, inner_path) if not only_if_bad or inner_path in self.bad_files:
pool.spawn(self.downloadContent, inner_path)
self.worker_manager.started_task_num -= 1 self.worker_manager.started_task_num -= 1
self.log.debug("Ended downloadContent pool len: %s" % len(inner_paths)) self.log.debug("Ended downloadContent pool len: %s" % len(inner_paths))
def pooledDownloadFile(self, inner_paths, pool_size=100, only_if_bad=False):
self.log.debug("New downloadFile pool: len: %s" % len(inner_paths))
self.worker_manager.started_task_num += len(inner_paths)
pool = gevent.pool.Pool(pool_size)
for inner_path in inner_paths:
if not only_if_bad or inner_path in self.bad_files:
pool.spawn(self.needFile, inner_path, update=True)
self.worker_manager.started_task_num -= 1
self.log.debug("Ended downloadFile pool len: %s" % len(inner_paths))
# Update worker, try to find client that supports listModifications command # Update worker, try to find client that supports listModifications command
def updater(self, peers_try, queried, since): def updater(self, peers_try, queried, since):
while 1: while 1:
@ -580,6 +599,10 @@ class Site(object):
return new_site return new_site
@util.Pooled(100)
def pooledNeedFile(self, *args, **kwargs):
return self.needFile(*args, **kwargs)
# Check and download if file not exist # Check and download if file not exist
def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0): def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0):
if self.storage.isFile(inner_path) and not update: # File exist, no need to do anything if self.storage.isFile(inner_path) and not update: # File exist, no need to do anything

65
src/util/Pooled.py Normal file
View file

@ -0,0 +1,65 @@
import gevent.pool
class Pooled(object):
def __init__(self, size=100):
self.pool = gevent.pool.Pool(size)
self.pooler_running = False
self.queue = []
self.func = None
def waiter(self, evt, args, kwargs):
res = self.func(*args, **kwargs)
if type(res) == gevent.event.AsyncResult:
evt.set(res.get())
else:
evt.set(res)
def pooler(self):
while self.queue:
evt, args, kwargs = self.queue.pop(0)
self.pool.spawn(self.waiter, evt, args, kwargs)
self.pooler_running = False
def __call__(self, func):
def wrapper(*args, **kwargs):
evt = gevent.event.AsyncResult()
self.queue.append((evt, args, kwargs))
if not self.pooler_running:
self.pooler_running = True
gevent.spawn(self.pooler)
return evt
wrapper.func_name = func.func_name
self.func = func
return wrapper
if __name__ == "__main__":
import gevent
import gevent.pool
import gevent.queue
import gevent.event
import gevent.monkey
import time
gevent.monkey.patch_all()
def addTask(inner_path):
evt = gevent.event.AsyncResult()
gevent.spawn_later(1, lambda: evt.set(True))
return evt
def needFile(inner_path):
return addTask(inner_path)
@Pooled(10)
def pooledNeedFile(inner_path):
return needFile(inner_path)
threads = []
for i in range(100):
threads.append(pooledNeedFile(i))
s = time.time()
gevent.joinall(threads) # Should take 10 second
print time.time() - s

View file

@ -1,2 +1,3 @@
from Event import Event from Event import Event
from Noparallel import Noparallel from Noparallel import Noparallel
from Pooled import Pooled