diff --git a/src/Site/SiteAnnouncer.py b/src/Site/SiteAnnouncer.py index b50a01fe..1440eb57 100644 --- a/src/Site/SiteAnnouncer.py +++ b/src/Site/SiteAnnouncer.py @@ -13,6 +13,7 @@ from Debug import Debug from util import helper from greenlet import GreenletExit import util +from util import CircularIterator class AnnounceError(Exception): @@ -30,7 +31,7 @@ class SiteAnnouncer(object): self.stats = {} self.fileserver_port = config.fileserver_port self.peer_id = self.site.connection_server.peer_id - self.last_tracker_id = random.randint(0, 10) + self.tracker_circular_iterator = CircularIterator() self.time_last_announce = 0 def getTrackers(self): @@ -49,15 +50,58 @@ class SiteAnnouncer(object): return trackers - def getAnnouncingTrackers(self, mode): + def shouldTrackerBeTemporarilyIgnored(self, tracker, mode, force): + if not tracker: + return True + + if force: + return False + + now = time.time() + + # Throttle accessing unresponsive trackers + tracker_stats = global_stats[tracker] + delay = min(30 * tracker_stats["num_error"], 60 * 10) + time_announce_allowed = tracker_stats["time_request"] + delay + if now < time_announce_allowed: + return True + + return False + + def getAnnouncingTrackers(self, mode, force): trackers = self.getSupportedTrackers() - if trackers and (mode == "update" or mode == "more"): # Only announce on one tracker, increment the queried tracker id - self.last_tracker_id += 1 - self.last_tracker_id = self.last_tracker_id % len(trackers) - trackers_announcing = [trackers[self.last_tracker_id]] # We only going to use this one + if trackers and (mode == "update" or mode == "more"): + + # Choose just 2 trackers to announce to + + trackers_announcing = [] + + # One is the next in sequence + + self.tracker_circular_iterator.resetSuccessiveCount() + while 1: + tracker = self.tracker_circular_iterator.next(trackers) + if not self.shouldTrackerBeTemporarilyIgnored(tracker, mode, force): + trackers_announcing.append(tracker) + break + if self.tracker_circular_iterator.isWrapped(): + break + + # And one is just random + + shuffled_trackers = random.sample(trackers, len(trackers)) + for tracker in shuffled_trackers: + if tracker in trackers_announcing: + continue + if not self.shouldTrackerBeTemporarilyIgnored(tracker, mode, force): + trackers_announcing.append(tracker) + break else: - trackers_announcing = trackers + trackers_announcing = [ + tracker for tracker in trackers + if not self.shouldTrackerBeTemporarilyIgnored(tracker, mode, force) + ] return trackers_announcing @@ -76,83 +120,18 @@ class SiteAnnouncer(object): def announce(self, force=False, mode="start", pex=True): if time.time() - self.time_last_announce < 30 and not force: return # No reannouncing within 30 secs - if force: - self.log.debug("Force reannounce in mode %s" % mode) + + self.log.debug("announce: force=%s, mode=%s, pex=%s" % (force, mode, pex)) self.fileserver_port = config.fileserver_port self.time_last_announce = time.time() - trackers = self.getAnnouncingTrackers(mode) - - if config.verbose: - self.log.debug("Tracker announcing, trackers: %s" % trackers) - - errors = [] - slow = [] - s = time.time() - threads = [] - num_announced = 0 - - for tracker in trackers: # Start announce threads - tracker_stats = global_stats[tracker] - # Reduce the announce time for trackers that looks unreliable - time_announce_allowed = time.time() - 60 * min(30, tracker_stats["num_error"]) - if tracker_stats["num_error"] > 5 and tracker_stats["time_request"] > time_announce_allowed and not force: - if config.verbose: - self.log.debug("Tracker %s looks unreliable, announce skipped (error: %s)" % (tracker, tracker_stats["num_error"])) - continue - thread = self.site.greenlet_manager.spawn(self.announceTracker, tracker, mode=mode) - threads.append(thread) - thread.tracker = tracker - - time.sleep(0.01) - self.updateWebsocket(trackers="announcing") - - gevent.joinall(threads, timeout=20) # Wait for announce finish - - for thread in threads: - if thread.value is None: - continue - if thread.value is not False: - if thread.value > 1.0: # Takes more than 1 second to announce - slow.append("%.2fs %s" % (thread.value, thread.tracker)) - num_announced += 1 - else: - if thread.ready(): - errors.append(thread.tracker) - else: # Still running - slow.append("30s+ %s" % thread.tracker) - - # Save peers num - self.site.settings["peers"] = len(self.site.peers) - - if len(errors) < len(threads): # At least one tracker finished - if len(trackers) == 1: - announced_to = trackers[0] - else: - announced_to = "%s/%s trackers" % (num_announced, len(threads)) - if mode != "update" or config.verbose: - self.log.debug( - "Announced in mode %s to %s in %.3fs, errors: %s, slow: %s" % - (mode, announced_to, time.time() - s, errors, slow) - ) - else: - if len(threads) > 1: - self.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s)) - if len(threads) == 1 and mode != "start": # Move to next tracker - self.log.debug("Tracker failed, skipping to next one...") - self.site.greenlet_manager.spawnLater(1.0, self.announce, force=force, mode=mode, pex=pex) - - self.updateWebsocket(trackers="announced") + trackers = self.getAnnouncingTrackers(mode, force) + self.log.debug("Chosen trackers: %s" % trackers) + self.announceToTrackers(trackers, force=force, mode=mode) if pex: - self.updateWebsocket(pex="announcing") - if mode == "more": # Need more peers - self.announcePex(need_num=10) - else: - self.announcePex() - - self.updateWebsocket(pex="announced") + self.announcePex() def getTrackerHandler(self, protocol): return None @@ -258,8 +237,62 @@ class SiteAnnouncer(object): ) return time.time() - s + def announceToTrackers(self, trackers, force=False, mode="start"): + errors = [] + slow = [] + s = time.time() + threads = [] + num_announced = 0 + + for tracker in trackers: # Start announce threads + thread = self.site.greenlet_manager.spawn(self.announceTracker, tracker, mode=mode) + threads.append(thread) + thread.tracker = tracker + + time.sleep(0.01) + self.updateWebsocket(trackers="announcing") + + gevent.joinall(threads, timeout=20) # Wait for announce finish + + for thread in threads: + if thread.value is None: + continue + if thread.value is not False: + if thread.value > 1.0: # Takes more than 1 second to announce + slow.append("%.2fs %s" % (thread.value, thread.tracker)) + num_announced += 1 + else: + if thread.ready(): + errors.append(thread.tracker) + else: # Still running + slow.append("30s+ %s" % thread.tracker) + + # Save peers num + self.site.settings["peers"] = len(self.site.peers) + + if len(errors) < len(threads): # At least one tracker finished + if len(trackers) == 1: + announced_to = trackers[0] + else: + announced_to = "%s/%s trackers" % (num_announced, len(threads)) + if mode != "update" or config.verbose: + self.log.debug( + "Announced in mode %s to %s in %.3fs, errors: %s, slow: %s" % + (mode, announced_to, time.time() - s, errors, slow) + ) + else: + if len(threads) > 1: + self.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s)) + if len(threads) > 1 and mode != "start": # Move to next tracker + self.log.debug("Tracker failed, skipping to next one...") + self.site.greenlet_manager.spawnLater(5.0, self.announce, force=force, mode=mode, pex=False) + + self.updateWebsocket(trackers="announced") + @util.Noparallel(blocking=False) - def announcePex(self, query_num=2, need_num=5): + def announcePex(self, query_num=2, need_num=10): + self.updateWebsocket(pex="announcing") + peers = self.site.getConnectedPeers() if len(peers) == 0: # Wait 3s for connections time.sleep(3) @@ -286,6 +319,8 @@ class SiteAnnouncer(object): break self.log.debug("Pex result: from %s peers got %s new peers." % (done, total_added)) + self.updateWebsocket(pex="announced") + def updateWebsocket(self, **kwargs): if kwargs: param = {"event": list(kwargs.items())[0]}