diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 65c335a9..f7249d81 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -165,7 +165,7 @@ class FileRequest(object): peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer # On complete publish to other peers diffs = params.get("diffs", {}) - site.onComplete.once(lambda: site.publish(inner_path=inner_path, diffs=diffs, limit=3), "publish_%s" % inner_path) + site.onComplete.once(lambda: site.publish(inner_path=inner_path, diffs=diffs), "publish_%s" % inner_path) # Load new content file and download changed files in new thread def downloader(): diff --git a/src/Site/Site.py b/src/Site/Site.py index 1581a106..d79d16ec 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -11,6 +11,7 @@ import base64 import gevent import gevent.pool +import gevent.lock import util from Config import config @@ -61,6 +62,90 @@ class ScaledTimeoutHandler: else: return None +class BackgroundPublisher: + def __init__(self, site, published=[], limit=5, inner_path="content.json", diffs={}): + self.site = site + self.threads = gevent.pool.Pool(None) + self.inner_path = inner_path + self.stages = [ + { + "interval": ScaledTimeoutHandler(60, 60), + "max_tries": 2, + "tries": 0, + "limit": 0, + "done": False + }, + { + "interval": ScaledTimeoutHandler(60 * 10, 60 * 10), + "max_tries": 5, + "tries": 0, + "limit": 0, + "done": False + } + ] + self.reinit(published=published, limit=limit, diffs=diffs) + + def reinit(self, published=[], limit=5, diffs={}): + self.threads.kill() + self.published = published + self.diffs = diffs + + i = 0 + for stage in self.stages: + stage["nr"] = i + stage["limit"] = limit * (2 + i) + stage["tries"] = False + stage["done"] = False + stage["thread"] = None + if i > 0: + stage["interval"].done() + i += 1 + + def isStageComplete(self, stage): + if not stage["done"]: + stage["done"] = len(self.published) >= stage["limit"] + if not stage["done"]: + stage["done"] = stage["tries"] >= stage["max_tries"] + return stage["done"] + + def isComplete(self): + for stage in self.stages: + if not self.isStageComplete(stage): + return False + return True + + def process(self): + for stage in self.stages: + if not self.isStageComplete(stage): + self.processStage(stage) + break + return self.isComplete() + + def processStage(self, stage): + if not stage["interval"].isExpired(0): + return + + if len(self.site.peers) < stage["limit"]: + self.site.announce(mode="more") + + if not stage["thread"]: + peers = list(self.site.peers.values()) + random.shuffle(peers) + stage["thread"] = self.threads.spawn(self.site.publisher, + self.inner_path, peers, self.published, stage["limit"], diffs=self.diffs, max_retries=1) + + stage["tries"] += 1 + stage["interval"].done() + + self.site.log.info("Background publisher: Stage #%s: %s published to %s/%s peers", + stage["nr"], self.inner_path, len(self.published), stage["limit"]) + + def finalize(self): + self.threads.kill() + self.site.log.info("Background publisher: Published %s to %s peers", self.inner_path, len(self.published)) + + + @PluginManager.acceptPlugins class Site(object): @@ -81,6 +166,9 @@ class Site(object): scaler=self.getActivityRating) ] + self.background_publishers = {} + self.background_publishers_lock = gevent.lock.RLock() + self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers_recent = collections.deque(maxlen=150) @@ -328,6 +416,11 @@ class Site(object): inner_path, time.time() - s, len(self.worker_manager.tasks) )) + + # If no file tasks have been started, worker_manager.checkComplete() + # never called. So call it explicitly. + self.greenlet_manager.spawn(self.worker_manager.checkComplete) + return True # Return bad files with less than 3 retry @@ -662,7 +755,7 @@ class Site(object): gevent.joinall(content_threads) # Publish worker - def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None): + def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None, max_retries=2): file_size = self.storage.getSize(inner_path) content_json_modified = self.content_manager.contents[inner_path]["modified"] body = self.storage.read(inner_path) @@ -687,7 +780,7 @@ class Site(object): timeout = 10 + int(file_size / 1024) result = {"exception": "Timeout"} - for retry in range(2): + for retry in range(max_retries): try: with gevent.Timeout(timeout, False): result = peer.publish(self.address, inner_path, body, content_json_modified, diffs) @@ -708,6 +801,25 @@ class Site(object): self.log.info("[FAILED] %s: %s" % (peer.key, result)) time.sleep(0.01) + def addBackgroundPublisher(self, published=[], limit=5, inner_path="content.json", diffs={}): + with self.background_publishers_lock: + if self.background_publishers.get(inner_path, None): + background_publisher = self.background_publishers[inner_path] + background_publisher.reinit(published=published, limit=limit, diffs=diffs) + else: + background_publisher = BackgroundPublisher(self, published=published, limit=limit, inner_path=inner_path, diffs=diffs) + self.background_publishers[inner_path] = background_publisher + + gevent.spawn(background_publisher.process) + + def processBackgroundPublishers(self): + with self.background_publishers_lock: + for inner_path, background_publisher in list(self.background_publishers.items()): + background_publisher.process() + if background_publisher.isComplete(): + background_publisher.finalize() + del self.background_publishers[inner_path] + # Update content.json on peers @util.Noparallel() def publish(self, limit="default", inner_path="content.json", diffs={}, cb_progress=None): @@ -752,12 +864,11 @@ class Site(object): # Publish more peers in the backgroup self.log.info( - "Published %s to %s peers, publishing to %s more peers in the background" % - (inner_path, len(published), limit) + "Published %s to %s peers, publishing to more peers in the background" % + (inner_path, len(published)) ) - for thread in range(2): - gevent.spawn(self.publisher, inner_path, peers, published, limit=limit * 2, diffs=diffs) + self.addBackgroundPublisher(published=published, limit=limit, inner_path=inner_path, diffs=diffs) # Send my hashfield to every connected peer if changed gevent.spawn(self.sendMyHashfield, 100) @@ -1296,6 +1407,8 @@ class Site(object): with gevent.Timeout(10, exception=False): self.announcer.announcePex() + self.processBackgroundPublishers() + self.update() return True