Refactor SiteAnnouncer.announce
This commit is contained in:
parent
8fd88c50f9
commit
adf40dbb6b
1 changed files with 114 additions and 79 deletions
|
@ -13,6 +13,7 @@ from Debug import Debug
|
|||
from util import helper
|
||||
from greenlet import GreenletExit
|
||||
import util
|
||||
from util import CircularIterator
|
||||
|
||||
|
||||
class AnnounceError(Exception):
|
||||
|
@ -30,7 +31,7 @@ class SiteAnnouncer(object):
|
|||
self.stats = {}
|
||||
self.fileserver_port = config.fileserver_port
|
||||
self.peer_id = self.site.connection_server.peer_id
|
||||
self.last_tracker_id = random.randint(0, 10)
|
||||
self.tracker_circular_iterator = CircularIterator()
|
||||
self.time_last_announce = 0
|
||||
|
||||
def getTrackers(self):
|
||||
|
@ -49,15 +50,58 @@ class SiteAnnouncer(object):
|
|||
|
||||
return trackers
|
||||
|
||||
def getAnnouncingTrackers(self, mode):
|
||||
def shouldTrackerBeTemporarilyIgnored(self, tracker, mode, force):
|
||||
if not tracker:
|
||||
return True
|
||||
|
||||
if force:
|
||||
return False
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Throttle accessing unresponsive trackers
|
||||
tracker_stats = global_stats[tracker]
|
||||
delay = min(30 * tracker_stats["num_error"], 60 * 10)
|
||||
time_announce_allowed = tracker_stats["time_request"] + delay
|
||||
if now < time_announce_allowed:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def getAnnouncingTrackers(self, mode, force):
|
||||
trackers = self.getSupportedTrackers()
|
||||
|
||||
if trackers and (mode == "update" or mode == "more"): # Only announce on one tracker, increment the queried tracker id
|
||||
self.last_tracker_id += 1
|
||||
self.last_tracker_id = self.last_tracker_id % len(trackers)
|
||||
trackers_announcing = [trackers[self.last_tracker_id]] # We only going to use this one
|
||||
if trackers and (mode == "update" or mode == "more"):
|
||||
|
||||
# Choose just 2 trackers to announce to
|
||||
|
||||
trackers_announcing = []
|
||||
|
||||
# One is the next in sequence
|
||||
|
||||
self.tracker_circular_iterator.resetSuccessiveCount()
|
||||
while 1:
|
||||
tracker = self.tracker_circular_iterator.next(trackers)
|
||||
if not self.shouldTrackerBeTemporarilyIgnored(tracker, mode, force):
|
||||
trackers_announcing.append(tracker)
|
||||
break
|
||||
if self.tracker_circular_iterator.isWrapped():
|
||||
break
|
||||
|
||||
# And one is just random
|
||||
|
||||
shuffled_trackers = random.sample(trackers, len(trackers))
|
||||
for tracker in shuffled_trackers:
|
||||
if tracker in trackers_announcing:
|
||||
continue
|
||||
if not self.shouldTrackerBeTemporarilyIgnored(tracker, mode, force):
|
||||
trackers_announcing.append(tracker)
|
||||
break
|
||||
else:
|
||||
trackers_announcing = trackers
|
||||
trackers_announcing = [
|
||||
tracker for tracker in trackers
|
||||
if not self.shouldTrackerBeTemporarilyIgnored(tracker, mode, force)
|
||||
]
|
||||
|
||||
return trackers_announcing
|
||||
|
||||
|
@ -76,83 +120,18 @@ class SiteAnnouncer(object):
|
|||
def announce(self, force=False, mode="start", pex=True):
|
||||
if time.time() - self.time_last_announce < 30 and not force:
|
||||
return # No reannouncing within 30 secs
|
||||
if force:
|
||||
self.log.debug("Force reannounce in mode %s" % mode)
|
||||
|
||||
self.log.debug("announce: force=%s, mode=%s, pex=%s" % (force, mode, pex))
|
||||
|
||||
self.fileserver_port = config.fileserver_port
|
||||
self.time_last_announce = time.time()
|
||||
|
||||
trackers = self.getAnnouncingTrackers(mode)
|
||||
|
||||
if config.verbose:
|
||||
self.log.debug("Tracker announcing, trackers: %s" % trackers)
|
||||
|
||||
errors = []
|
||||
slow = []
|
||||
s = time.time()
|
||||
threads = []
|
||||
num_announced = 0
|
||||
|
||||
for tracker in trackers: # Start announce threads
|
||||
tracker_stats = global_stats[tracker]
|
||||
# Reduce the announce time for trackers that looks unreliable
|
||||
time_announce_allowed = time.time() - 60 * min(30, tracker_stats["num_error"])
|
||||
if tracker_stats["num_error"] > 5 and tracker_stats["time_request"] > time_announce_allowed and not force:
|
||||
if config.verbose:
|
||||
self.log.debug("Tracker %s looks unreliable, announce skipped (error: %s)" % (tracker, tracker_stats["num_error"]))
|
||||
continue
|
||||
thread = self.site.greenlet_manager.spawn(self.announceTracker, tracker, mode=mode)
|
||||
threads.append(thread)
|
||||
thread.tracker = tracker
|
||||
|
||||
time.sleep(0.01)
|
||||
self.updateWebsocket(trackers="announcing")
|
||||
|
||||
gevent.joinall(threads, timeout=20) # Wait for announce finish
|
||||
|
||||
for thread in threads:
|
||||
if thread.value is None:
|
||||
continue
|
||||
if thread.value is not False:
|
||||
if thread.value > 1.0: # Takes more than 1 second to announce
|
||||
slow.append("%.2fs %s" % (thread.value, thread.tracker))
|
||||
num_announced += 1
|
||||
else:
|
||||
if thread.ready():
|
||||
errors.append(thread.tracker)
|
||||
else: # Still running
|
||||
slow.append("30s+ %s" % thread.tracker)
|
||||
|
||||
# Save peers num
|
||||
self.site.settings["peers"] = len(self.site.peers)
|
||||
|
||||
if len(errors) < len(threads): # At least one tracker finished
|
||||
if len(trackers) == 1:
|
||||
announced_to = trackers[0]
|
||||
else:
|
||||
announced_to = "%s/%s trackers" % (num_announced, len(threads))
|
||||
if mode != "update" or config.verbose:
|
||||
self.log.debug(
|
||||
"Announced in mode %s to %s in %.3fs, errors: %s, slow: %s" %
|
||||
(mode, announced_to, time.time() - s, errors, slow)
|
||||
)
|
||||
else:
|
||||
if len(threads) > 1:
|
||||
self.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s))
|
||||
if len(threads) == 1 and mode != "start": # Move to next tracker
|
||||
self.log.debug("Tracker failed, skipping to next one...")
|
||||
self.site.greenlet_manager.spawnLater(1.0, self.announce, force=force, mode=mode, pex=pex)
|
||||
|
||||
self.updateWebsocket(trackers="announced")
|
||||
trackers = self.getAnnouncingTrackers(mode, force)
|
||||
self.log.debug("Chosen trackers: %s" % trackers)
|
||||
self.announceToTrackers(trackers, force=force, mode=mode)
|
||||
|
||||
if pex:
|
||||
self.updateWebsocket(pex="announcing")
|
||||
if mode == "more": # Need more peers
|
||||
self.announcePex(need_num=10)
|
||||
else:
|
||||
self.announcePex()
|
||||
|
||||
self.updateWebsocket(pex="announced")
|
||||
self.announcePex()
|
||||
|
||||
def getTrackerHandler(self, protocol):
|
||||
return None
|
||||
|
@ -258,8 +237,62 @@ class SiteAnnouncer(object):
|
|||
)
|
||||
return time.time() - s
|
||||
|
||||
def announceToTrackers(self, trackers, force=False, mode="start"):
|
||||
errors = []
|
||||
slow = []
|
||||
s = time.time()
|
||||
threads = []
|
||||
num_announced = 0
|
||||
|
||||
for tracker in trackers: # Start announce threads
|
||||
thread = self.site.greenlet_manager.spawn(self.announceTracker, tracker, mode=mode)
|
||||
threads.append(thread)
|
||||
thread.tracker = tracker
|
||||
|
||||
time.sleep(0.01)
|
||||
self.updateWebsocket(trackers="announcing")
|
||||
|
||||
gevent.joinall(threads, timeout=20) # Wait for announce finish
|
||||
|
||||
for thread in threads:
|
||||
if thread.value is None:
|
||||
continue
|
||||
if thread.value is not False:
|
||||
if thread.value > 1.0: # Takes more than 1 second to announce
|
||||
slow.append("%.2fs %s" % (thread.value, thread.tracker))
|
||||
num_announced += 1
|
||||
else:
|
||||
if thread.ready():
|
||||
errors.append(thread.tracker)
|
||||
else: # Still running
|
||||
slow.append("30s+ %s" % thread.tracker)
|
||||
|
||||
# Save peers num
|
||||
self.site.settings["peers"] = len(self.site.peers)
|
||||
|
||||
if len(errors) < len(threads): # At least one tracker finished
|
||||
if len(trackers) == 1:
|
||||
announced_to = trackers[0]
|
||||
else:
|
||||
announced_to = "%s/%s trackers" % (num_announced, len(threads))
|
||||
if mode != "update" or config.verbose:
|
||||
self.log.debug(
|
||||
"Announced in mode %s to %s in %.3fs, errors: %s, slow: %s" %
|
||||
(mode, announced_to, time.time() - s, errors, slow)
|
||||
)
|
||||
else:
|
||||
if len(threads) > 1:
|
||||
self.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s))
|
||||
if len(threads) > 1 and mode != "start": # Move to next tracker
|
||||
self.log.debug("Tracker failed, skipping to next one...")
|
||||
self.site.greenlet_manager.spawnLater(5.0, self.announce, force=force, mode=mode, pex=False)
|
||||
|
||||
self.updateWebsocket(trackers="announced")
|
||||
|
||||
@util.Noparallel(blocking=False)
|
||||
def announcePex(self, query_num=2, need_num=5):
|
||||
def announcePex(self, query_num=2, need_num=10):
|
||||
self.updateWebsocket(pex="announcing")
|
||||
|
||||
peers = self.site.getConnectedPeers()
|
||||
if len(peers) == 0: # Wait 3s for connections
|
||||
time.sleep(3)
|
||||
|
@ -286,6 +319,8 @@ class SiteAnnouncer(object):
|
|||
break
|
||||
self.log.debug("Pex result: from %s peers got %s new peers." % (done, total_added))
|
||||
|
||||
self.updateWebsocket(pex="announced")
|
||||
|
||||
def updateWebsocket(self, **kwargs):
|
||||
if kwargs:
|
||||
param = {"event": list(kwargs.items())[0]}
|
||||
|
|
Loading…
Reference in a new issue