From e6a82d4c6454068e18c0b2e7a4984993ce555e99 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Mon, 7 Nov 2016 23:39:45 +0100 Subject: [PATCH] Pooled file downloads to handle large ammount of files better --- src/Site/Site.py | 45 ++++++++++++++++++++++-------- src/util/Pooled.py | 65 ++++++++++++++++++++++++++++++++++++++++++++ src/util/__init__.py | 1 + 3 files changed, 100 insertions(+), 11 deletions(-) create mode 100644 src/util/Pooled.py diff --git a/src/Site/Site.py b/src/Site/Site.py index bf131efd..72dcd134 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -179,13 +179,18 @@ class Site(object): if inner_path == "content.json": gevent.spawn(self.updateHashfield) - if self.settings.get("autodownloadoptional"): - for file_relative_path in self.content_manager.contents[inner_path].get("files_optional", {}).keys(): - file_inner_path = content_inner_dir + file_relative_path - # Start download and dont wait for finish, return the event - res = self.needFile(file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer) - if res is not True and res is not False: # Need downloading and file is allowed - file_threads.append(res) # Append evt + for file_relative_path in self.content_manager.contents[inner_path].get("files_optional", {}).keys(): + file_inner_path = content_inner_dir + file_relative_path + if file_inner_path not in changed and not self.bad_files.get(file_inner_path): + continue + if not self.isDownloadable(file_inner_path): + continue + # Start download and dont wait for finish, return the event + res = self.pooledNeedFile( + file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer + ) + if res is not True and res is not False: # Need downloading and file is allowed + file_threads.append(res) # Append evt # Wait for includes download include_threads = [] @@ -229,10 +234,13 @@ class Site(object): if bad_file.endswith("content.json"): content_inner_paths.append(bad_file) else: - self.needFile(bad_file, update=True, blocking=False) + file_inner_paths.append(bad_file) if content_inner_paths: - self.pooledDownloadContent(content_inner_paths) + self.pooledDownloadContent(content_inner_paths, only_if_bad=True) + + if file_inner_paths: + self.pooledDownloadFile(file_inner_paths, only_if_bad=True) # Download all files of the site @util.Noparallel(blocking=False) @@ -254,15 +262,26 @@ class Site(object): return valid - def pooledDownloadContent(self, inner_paths, pool_size=100): + def pooledDownloadContent(self, inner_paths, pool_size=100, only_if_bad=False): self.log.debug("New downloadContent pool: len: %s" % len(inner_paths)) self.worker_manager.started_task_num += len(inner_paths) pool = gevent.pool.Pool(pool_size) for inner_path in inner_paths: - pool.spawn(self.downloadContent, inner_path) + if not only_if_bad or inner_path in self.bad_files: + pool.spawn(self.downloadContent, inner_path) self.worker_manager.started_task_num -= 1 self.log.debug("Ended downloadContent pool len: %s" % len(inner_paths)) + def pooledDownloadFile(self, inner_paths, pool_size=100, only_if_bad=False): + self.log.debug("New downloadFile pool: len: %s" % len(inner_paths)) + self.worker_manager.started_task_num += len(inner_paths) + pool = gevent.pool.Pool(pool_size) + for inner_path in inner_paths: + if not only_if_bad or inner_path in self.bad_files: + pool.spawn(self.needFile, inner_path, update=True) + self.worker_manager.started_task_num -= 1 + self.log.debug("Ended downloadFile pool len: %s" % len(inner_paths)) + # Update worker, try to find client that supports listModifications command def updater(self, peers_try, queried, since): while 1: @@ -580,6 +599,10 @@ class Site(object): return new_site + @util.Pooled(100) + def pooledNeedFile(self, *args, **kwargs): + return self.needFile(*args, **kwargs) + # Check and download if file not exist def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0): if self.storage.isFile(inner_path) and not update: # File exist, no need to do anything diff --git a/src/util/Pooled.py b/src/util/Pooled.py new file mode 100644 index 00000000..b7751995 --- /dev/null +++ b/src/util/Pooled.py @@ -0,0 +1,65 @@ +import gevent.pool + + +class Pooled(object): + def __init__(self, size=100): + self.pool = gevent.pool.Pool(size) + self.pooler_running = False + self.queue = [] + self.func = None + + def waiter(self, evt, args, kwargs): + res = self.func(*args, **kwargs) + if type(res) == gevent.event.AsyncResult: + evt.set(res.get()) + else: + evt.set(res) + + def pooler(self): + while self.queue: + evt, args, kwargs = self.queue.pop(0) + self.pool.spawn(self.waiter, evt, args, kwargs) + self.pooler_running = False + + def __call__(self, func): + def wrapper(*args, **kwargs): + evt = gevent.event.AsyncResult() + self.queue.append((evt, args, kwargs)) + if not self.pooler_running: + self.pooler_running = True + gevent.spawn(self.pooler) + return evt + wrapper.func_name = func.func_name + self.func = func + + return wrapper + +if __name__ == "__main__": + import gevent + import gevent.pool + import gevent.queue + import gevent.event + import gevent.monkey + import time + + gevent.monkey.patch_all() + + def addTask(inner_path): + evt = gevent.event.AsyncResult() + gevent.spawn_later(1, lambda: evt.set(True)) + return evt + + def needFile(inner_path): + return addTask(inner_path) + + @Pooled(10) + def pooledNeedFile(inner_path): + return needFile(inner_path) + + threads = [] + for i in range(100): + threads.append(pooledNeedFile(i)) + + s = time.time() + gevent.joinall(threads) # Should take 10 second + print time.time() - s diff --git a/src/util/__init__.py b/src/util/__init__.py index c226368e..1c873327 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -1,2 +1,3 @@ from Event import Event from Noparallel import Noparallel +from Pooled import Pooled