From 829fd4678133e527d34ac395c6c5bf3da20f8050 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Fri, 30 Oct 2020 14:36:08 +0700 Subject: [PATCH] Redesign the site updating strategy in Site.py, SiteAnnouncer.py, FileServer.py --- src/Config.py | 2 + src/File/FileServer.py | 245 ++++++++++++++++++++++---------------- src/Site/Site.py | 156 ++++++++++++++++++++---- src/Site/SiteAnnouncer.py | 11 +- 4 files changed, 285 insertions(+), 129 deletions(-) diff --git a/src/Config.py b/src/Config.py index 70b284bf..a3d66395 100644 --- a/src/Config.py +++ b/src/Config.py @@ -257,6 +257,8 @@ class Config(object): self.parser.add_argument('--global_connected_limit', help='Max connections', default=512, type=int, metavar='global_connected_limit') self.parser.add_argument('--workers', help='Download workers per site', default=5, type=int, metavar='workers') + self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False) + self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip') self.parser.add_argument('--fileserver_port', help='FileServer bind port (0: randomize)', default=0, type=int, metavar='port') self.parser.add_argument('--fileserver_port_range', help='FileServer randomization range', default="10000-40000", metavar='port') diff --git a/src/File/FileServer.py b/src/File/FileServer.py index fbc3cc85..8294d179 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -10,7 +10,6 @@ from gevent.server import StreamServer import util from util import helper -from util import CircularIterator from Config import config from .FileRequest import FileRequest from Peer import PeerPortchecker @@ -19,16 +18,24 @@ from Connection import ConnectionServer from Plugin import PluginManager from Debug import Debug +log = logging.getLogger("FileServer") + @PluginManager.acceptPlugins class FileServer(ConnectionServer): def __init__(self, ip=config.fileserver_ip, port=config.fileserver_port, ip_type=config.fileserver_ip_type): self.site_manager = SiteManager.site_manager self.portchecker = PeerPortchecker.PeerPortchecker(self) - self.log = logging.getLogger("FileServer") self.ip_type = ip_type self.ip_external_list = [] + # This is wrong: + # self.log = logging.getLogger("FileServer") + # The value of self.log will be overwritten in ConnectionServer.__init__() + + self.check_pool = gevent.pool.Pool(5) + self.check_start_time = 0 + self.supported_ip_types = ["ipv4"] # Outgoing ip_type support if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported(): self.supported_ip_types.append("ipv6") @@ -52,17 +59,17 @@ class FileServer(ConnectionServer): config.arguments.fileserver_port = port ConnectionServer.__init__(self, ip, port, self.handleRequest) - self.log.debug("Supported IP types: %s" % self.supported_ip_types) + log.debug("Supported IP types: %s" % self.supported_ip_types) if ip_type == "dual" and ip == "::": # Also bind to ipv4 addres in dual mode try: - self.log.debug("Binding proxy to %s:%s" % ("::", self.port)) + log.debug("Binding proxy to %s:%s" % ("::", self.port)) self.stream_server_proxy = StreamServer( ("0.0.0.0", self.port), self.handleIncomingConnection, spawn=self.pool, backlog=100 ) except Exception as err: - self.log.info("StreamServer proxy create error: %s" % Debug.formatException(err)) + log.info("StreamServer proxy create error: %s" % Debug.formatException(err)) self.port_opened = {} @@ -71,8 +78,17 @@ class FileServer(ConnectionServer): self.files_parsing = {} self.ui_server = None + def getSiteAddresses(self): + # Avoid saving the site list on the stack, since a site may be deleted + # from the original list while iterating. + # Use the list of addresses instead. + return [ + site.address for site in + sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True) + ] + def getRandomPort(self, ip, port_range_from, port_range_to): - self.log.info("Getting random port in range %s-%s..." % (port_range_from, port_range_to)) + log.info("Getting random port in range %s-%s..." % (port_range_from, port_range_to)) tried = [] for bind_retry in range(100): port = random.randint(port_range_from, port_range_to) @@ -84,11 +100,11 @@ class FileServer(ConnectionServer): sock.bind((ip, port)) success = True except Exception as err: - self.log.warning("Error binding to port %s: %s" % (port, err)) + log.warning("Error binding to port %s: %s" % (port, err)) success = False sock.close() if success: - self.log.info("Found unused random port: %s" % port) + log.info("Found unused random port: %s" % port) return port else: time.sleep(0.1) @@ -104,16 +120,16 @@ class FileServer(ConnectionServer): sock.connect((ipv6_testip, 80)) local_ipv6 = sock.getsockname()[0] if local_ipv6 == "::1": - self.log.debug("IPv6 not supported, no local IPv6 address") + log.debug("IPv6 not supported, no local IPv6 address") return False else: - self.log.debug("IPv6 supported on IP %s" % local_ipv6) + log.debug("IPv6 supported on IP %s" % local_ipv6) return True except socket.error as err: - self.log.warning("IPv6 not supported: %s" % err) + log.warning("IPv6 not supported: %s" % err) return False except Exception as err: - self.log.error("IPv6 check error: %s" % err) + log.error("IPv6 check error: %s" % err) return False def listenProxy(self): @@ -121,20 +137,20 @@ class FileServer(ConnectionServer): self.stream_server_proxy.serve_forever() except Exception as err: if err.errno == 98: # Address already in use error - self.log.debug("StreamServer proxy listen error: %s" % err) + log.debug("StreamServer proxy listen error: %s" % err) else: - self.log.info("StreamServer proxy listen error: %s" % err) + log.info("StreamServer proxy listen error: %s" % err) # Handle request to fileserver def handleRequest(self, connection, message): if config.verbose: if "params" in message: - self.log.debug( + log.debug( "FileRequest: %s %s %s %s" % (str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path")) ) else: - self.log.debug("FileRequest: %s %s" % (str(connection), message["cmd"])) + log.debug("FileRequest: %s %s" % (str(connection), message["cmd"])) req = FileRequest(self, connection) req.route(message["cmd"], message.get("req_id"), message.get("params")) if not self.has_internet and not connection.is_private_ip: @@ -142,7 +158,7 @@ class FileServer(ConnectionServer): self.onInternetOnline() def onInternetOnline(self): - self.log.info("Internet online") + log.info("Internet online") gevent.spawn(self.checkSites, check_files=False, force_port_check=True) # Reload the FileRequest class to prevent restarts in debug mode @@ -153,7 +169,7 @@ class FileServer(ConnectionServer): def portCheck(self): if config.offline: - self.log.info("Offline mode: port check disabled") + log.info("Offline mode: port check disabled") res = {"ipv4": None, "ipv6": None} self.port_opened = res return res @@ -169,7 +185,7 @@ class FileServer(ConnectionServer): } self.ip_external_list = config.ip_external self.port_opened.update(res) - self.log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"])) + log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"])) return res self.port_opened = {} @@ -191,7 +207,7 @@ class FileServer(ConnectionServer): else: res_ipv6 = res_ipv6_thread.get() if res_ipv6["opened"] and not helper.getIpType(res_ipv6["ip"]) == "ipv6": - self.log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"]) + log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"]) res_ipv6["opened"] = False self.ip_external_list = [] @@ -200,7 +216,7 @@ class FileServer(ConnectionServer): self.ip_external_list.append(res_ip["ip"]) SiteManager.peer_blacklist.append((res_ip["ip"], self.port)) - self.log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"])) + log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"])) res = {"ipv4": res_ipv4["opened"], "ipv6": res_ipv6["opened"]} @@ -213,7 +229,7 @@ class FileServer(ConnectionServer): self.ip_external_list.append(ip) res[helper.getIpType(ip)] = True # We have opened port if we have external ip SiteManager.peer_blacklist.append((ip, self.port)) - self.log.debug("External ip found on interfaces: %s" % ip) + log.debug("External ip found on interfaces: %s" % ip) self.port_opened.update(res) @@ -224,79 +240,123 @@ class FileServer(ConnectionServer): # Check site file integrity def checkSite(self, site, check_files=False): - if site.isServing(): - site.announce(mode="startup") # Announce site to tracker - site.update(check_files=check_files) # Update site's content.json and download changed files - site.sendMyHashfield() - site.updateHashfield() + if not site.isServing(): + return + + quick_start = len(site.peers) >= 50 + + if quick_start: + log.debug("Checking site: %s (quick start)" % (site.address)) + else: + log.debug("Checking site: %s" % (site.address)) + + if quick_start: + site.setDelayedStartupAnnounce() + else: + site.announce(mode="startup") + + site.update(check_files=check_files) # Update site's content.json and download changed files + site.sendMyHashfield() + site.updateHashfield() # Check sites integrity @util.Noparallel() def checkSites(self, check_files=False, force_port_check=False): - self.log.debug("Checking sites...") - s = time.time() - sites_checking = False - if not self.port_opened or force_port_check: # Test and open port if not tested yet - if len(self.sites) <= 2: # Don't wait port opening on first startup - sites_checking = True - for address, site in list(self.sites.items()): - gevent.spawn(self.checkSite, site, check_files) + log.info("Checking sites: check_files=%s, force_port_check=%s", check_files, force_port_check) + # Don't wait port opening on first startup. Do the instant check now. + if len(self.sites) <= 2: + sites_checking = True + for address, site in list(self.sites.items()): + gevent.spawn(self.checkSite, site, check_files) + + # Test and open port if not tested yet + if not self.port_opened or force_port_check: self.portCheck() - if not self.port_opened["ipv4"]: self.tor_manager.startOnions() - if not sites_checking: - check_pool = gevent.pool.Pool(5) - # Check sites integrity - for site in sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True): - if not site.isServing(): - continue - check_thread = check_pool.spawn(self.checkSite, site, check_files) # Check in new thread - time.sleep(2) - if site.settings.get("modified", 0) < time.time() - 60 * 60 * 24: # Not so active site, wait some sec to finish - check_thread.join(timeout=5) - self.log.debug("Checksites done in %.3fs" % (time.time() - s)) + site_addresses = self.getSiteAddresses() + + sites_processed = 0 + start_time = time.time() + self.check_start_time = start_time + progress_print_time = time.time() + + # Check sites integrity + for site_address in site_addresses: + site = self.sites.get(site_address, None) + + sites_processed += 1 + + if (not site) or (not site.isServing()): + continue + check_thread = self.check_pool.spawn(self.checkSite, site, check_files) # Check in new thread + + if time.time() - progress_print_time > 60: + progress_print_time = time.time() + time_spent = time.time() - start_time + time_per_site = time_spent / float(sites_processed) + sites_left = len(site_addresses) - sites_processed + time_left = time_per_site * sites_left + log.info("Checking sites: DONE: %d sites in %.2fs (%.2fs per site); LEFT: %d sites in %.2fs", + sites_processed, + time_spent, + time_per_site, + sites_left, + time_left + ) + + if (self.check_start_time != start_time) and self.check_pool.full(): + # Another check is running, throttling... + time.sleep(5) + else: + time.sleep(1) + + log.info("Checking sites: finished in %.3fs" % (time.time() - start_time)) def sitesMaintenanceThread(self): import gc startup = True short_timeout = 2 - long_timeout = 60 * 5 - - circular_iterator = CircularIterator() + long_timeout = 60 * 2 while 1: - if circular_iterator.isWrapped(): - time.sleep(long_timeout) - circular_iterator.resetSuccessiveCount() - gc.collect() # Explicit garbage collection + time.sleep(long_timeout) + gc.collect() # Explicit garbage collection - self.log.debug( - "Running site cleanup, connections: %s, internet: %s" % - (len(self.connections), self.has_internet) + log.debug( + "Starting maintenance cycle: connections=%s, internet=%s", + len(self.connections), self.has_internet + ) + start_time = time.time() + + site_addresses = self.getSiteAddresses() + + sites_processed = 0 + + for site_address in site_addresses: + site = self.sites.get(site_address, None) + if (not site) or (not site.isServing()): + continue + + log.debug("Running maintenance for site: %s", site.address) + + done = site.runPeriodicMaintenance(startup=startup) + site = None + if done: + sites_processed += 1 + time.sleep(short_timeout) + + log.debug("Maintenance cycle finished in %.3fs. Total sites: %d. Processed sites: %d", + time.time() - start_time, + len(site_addresses), + sites_processed ) - site = circular_iterator.next(list(self.sites.values())) - if site: - done = site.runPeriodicMaintenance(startup=startup) - if done: - time.sleep(short_timeout) - site = None - - if circular_iterator.isWrapped(): - startup = False - - def announceSite(self, site): - site.announce(mode="update", pex=False) - active_site = time.time() - site.settings.get("modified", 0) < 24 * 60 * 60 - if site.settings["own"] or active_site: - # Check connections more frequently on own and active sites to speed-up first connections - site.needConnections(check_site_on_reconnect=True) - site.sendMyHashfield(3) - site.updateHashfield(3) + site_addresses = None + startup = False # Periodic reloading of tracker files def reloadTrackerFilesThread(self): @@ -309,24 +369,6 @@ class FileServer(ConnectionServer): time.sleep(interval) config.loadTrackersFile() - # Announce sites every 20 min - def announceSites(self): - time.sleep(5 * 60) # Sites already announced on startup - while 1: - s = time.time() - for address, site in list(self.sites.items()): - if not site.isServing(): - continue - gevent.spawn(self.announceSite, site).join(timeout=10) - time.sleep(1) - taken = time.time() - s - - # Query all trackers one-by-one in 20 minutes evenly distributed - sleep = max(0, 60 * 20 / len(config.trackers) - taken) - - self.log.debug("Site announce tracker done in %.3fs, sleeping for %.3fs..." % (taken, sleep)) - time.sleep(sleep) - # Detects if computer back from wakeup def wakeupWatcher(self): last_time = time.time() @@ -336,7 +378,7 @@ class FileServer(ConnectionServer): is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3 if is_time_changed: # If taken more than 3 minute then the computer was in sleep mode - self.log.info( + log.info( "Wakeup detected: time warp from %0.f to %0.f (%0.f sleep seconds), acting like startup..." % (last_time, time.time(), time.time() - last_time) ) @@ -344,7 +386,7 @@ class FileServer(ConnectionServer): my_ips = socket.gethostbyname_ex('')[2] is_ip_changed = my_ips != last_my_ips if is_ip_changed: - self.log.info("IP change detected from %s to %s" % (last_my_ips, my_ips)) + log.info("IP change detected from %s to %s" % (last_my_ips, my_ips)) if is_time_changed or is_ip_changed: self.checkSites(check_files=False, force_port_check=True) @@ -362,9 +404,9 @@ class FileServer(ConnectionServer): try: self.stream_server.start() except Exception as err: - self.log.error("Error listening on: %s:%s: %s" % (self.ip, self.port, err)) + log.error("Error listening on: %s:%s: %s" % (self.ip, self.port, err)) if "ui_server" in dir(sys.modules["main"]): - self.log.debug("Stopping UI Server.") + log.debug("Stopping UI Server.") sys.modules["main"].ui_server.stop() return False @@ -378,21 +420,20 @@ class FileServer(ConnectionServer): gevent.spawn(self.checkSites) thread_reaload_tracker_files = gevent.spawn(self.reloadTrackerFilesThread) - thread_announce_sites = gevent.spawn(self.announceSites) thread_sites_maintenance = gevent.spawn(self.sitesMaintenanceThread) thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) ConnectionServer.listen(self) - self.log.debug("Stopped.") + log.debug("Stopped.") def stop(self): if self.running and self.portchecker.upnp_port_opened: - self.log.debug('Closing port %d' % self.port) + log.debug('Closing port %d' % self.port) try: self.portchecker.portClose(self.port) - self.log.info('Closed port via upnp.') + log.info('Closed port via upnp.') except Exception as err: - self.log.info("Failed at attempt to use upnp to close port: %s" % err) + log.info("Failed at attempt to use upnp to close port: %s" % err) return ConnectionServer.stop(self) diff --git a/src/Site/Site.py b/src/Site/Site.py index b941b035..b01e9aae 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -28,6 +28,38 @@ from File import FileServer from .SiteAnnouncer import SiteAnnouncer from . import SiteManager +class ScaledTimeoutHandler: + def __init__(self, val_min, val_max, handler=None, scaler=None): + self.val_min = val_min + self.val_max = val_max + self.timestamp = 0 + self.handler = handler + self.scaler = scaler + self.log = logging.getLogger("ScaledTimeoutHandler") + + def isExpired(self, scale): + interval = scale * (self.val_max - self.val_min) + self.val_min + expired_at = self.timestamp + interval + now = time.time() + expired = (now > expired_at) + if expired: + self.log.debug( + "Expired: [%d..%d]: scale=%f, interval=%f", + self.val_min, self.val_max, scale, interval) + return expired + + def done(self): + self.timestamp = time.time() + + def run(self, *args, **kwargs): + do_run = kwargs["force"] or self.isExpired(self.scaler()) + if do_run: + result = self.handler(*args, **kwargs) + if result: + self.done() + return result + else: + return None @PluginManager.acceptPlugins class Site(object): @@ -40,8 +72,16 @@ class Site(object): self.log = logging.getLogger("Site:%s" % self.address_short) self.addEventListeners() - self.periodic_maintenance_interval = 60 * 20 - self.periodic_maintenance_timestamp = 0 + self.periodic_maintenance_handlers = [ + ScaledTimeoutHandler(60 * 30, 60 * 2, + handler=self.periodicMaintenanceHandler_announce, + scaler=self.getAnnounceRating), + ScaledTimeoutHandler(60 * 20, 60 * 10, + handler=self.periodicMaintenanceHandler_general, + scaler=self.getActivityRating) + ] + + self.delayed_startup_announce = False self.content = None # Load content.json self.peers = {} # Key: ip:port, Value: Peer.Peer @@ -852,10 +892,63 @@ class Site(object): peer.found(source) return peer + def setDelayedStartupAnnounce(self): + self.delayed_startup_announce = True + + def applyDelayedStartupAnnounce(self): + if self.delayed_startup_announce: + self.delayed_startup_announce = False + self.announce(mode="startup") + return True + return False + def announce(self, *args, **kwargs): if self.isServing(): self.announcer.announce(*args, **kwargs) + def getActivityRating(self, force_safe=False): + age = time.time() - self.settings.get("modified", 0) + + if age < 60 * 60: + rating = 1.0 + elif age < 60 * 60 * 5: + rating = 0.8 + elif age < 60 * 60 * 24: + rating = 0.6 + elif age < 60 * 60 * 24 * 3: + rating = 0.4 + elif age < 60 * 60 * 24 * 7: + rating = 0.2 + else: + rating = 0.0 + + force_safe = force_safe or config.expose_no_ownership + + if (not force_safe) and self.settings["own"]: + rating = min(rating, 0.6) + + return rating + + def getAnnounceRating(self): + # rare frequent + # announces announces + # 0 ------------------- 1 + # activity -------------> -- active site ==> frequent announces + # <---------------- peers -- many peers ==> rare announces + # trackers -------------> -- many trackers ==> frequent announces to iterate over more trackers + + activity_rating = self.getActivityRating(force_safe=True) + + peer_count = len(self.peers) + peer_rating = 1.0 - min(peer_count, 50) / 50.0 + + tracker_count = self.announcer.getSupportedTrackerCount() + tracker_count = max(tracker_count, 1) + tracker_rating = 1.0 - (1.0 / tracker_count) + + v = [activity_rating, peer_rating, tracker_rating] + return sum(v) / float(len(v)) + # The engine tries to maintain the number of active connections: # >= getPreferableActiveConnectionCount() # and @@ -866,18 +959,7 @@ class Site(object): return 0 age = time.time() - self.settings.get("modified", 0) - count = 0 - - if age < 60 * 60: - count = 10 - elif age < 60 * 60 * 5: - count = 8 - elif age < 60 * 60 * 24: - count = 6 - elif age < 60 * 60 * 24 * 3: - count = 4 - elif age < 60 * 60 * 24 * 7: - count = 2 + count = int(10 * self.getActivityRating(force_safe=True)) if len(self.peers) < 50: count = max(count, 5) @@ -957,7 +1039,7 @@ class Site(object): return connected def markConnectedPeersProtected(self): - for peer in site.getConnectedPeers(): + for peer in self.getConnectedPeers(): peer.markProtected() # Return: Probably peers verified to be connectable recently @@ -1099,32 +1181,54 @@ class Site(object): if not self.isServing(): return False - scheduled_time = self.periodic_maintenance_timestamp + self.periodic_maintenance_interval + self.log.debug("runPeriodicMaintenance: startup=%s, force=%s" % (startup, force)) - if time.time() < scheduled_time and not force: + result = False + + for handler in self.periodic_maintenance_handlers: + result = result | bool(handler.run(startup=startup, force=force)) + + return result + + def periodicMaintenanceHandler_general(self, startup=False, force=False): + if not self.isServing(): return False - self.periodic_maintenance_timestamp = time.time() + self.applyDelayedStartupAnnounce() - self.log.debug("runPeriodicMaintenance: startup=%s" % startup) + if not self.peers: + return False + + self.log.debug("periodicMaintenanceHandler_general: startup=%s, force=%s" % (startup, force)) if not startup: self.cleanupPeers() - if self.peers: - with gevent.Timeout(10, exception=False): - self.announcer.announcePex() + self.needConnections(check_site_on_reconnect=True) - # Last check modification failed - if self.content_updated is False: + with gevent.Timeout(10, exception=False): + self.announcer.announcePex() + + self.sendMyHashfield(3) + self.updateHashfield(3) + + if self.content_updated is False: # Last check modification failed self.update() elif self.bad_files: self.retryBadFiles() - self.needConnections(check_site_on_reconnect=True) + return True - self.periodic_maintenance_timestamp = time.time() + def periodicMaintenanceHandler_announce(self, startup=False, force=False): + if not self.isServing(): + return False + self.log.debug("periodicMaintenanceHandler_announce: startup=%s, force=%s" % (startup, force)) + + if self.applyDelayedStartupAnnounce(): + return True + + self.announce(mode="update", pex=False) return True # Send hashfield to peers diff --git a/src/Site/SiteAnnouncer.py b/src/Site/SiteAnnouncer.py index 1440eb57..5a97807e 100644 --- a/src/Site/SiteAnnouncer.py +++ b/src/Site/SiteAnnouncer.py @@ -33,6 +33,7 @@ class SiteAnnouncer(object): self.peer_id = self.site.connection_server.peer_id self.tracker_circular_iterator = CircularIterator() self.time_last_announce = 0 + self.supported_tracker_count = 0 def getTrackers(self): return config.trackers @@ -50,6 +51,12 @@ class SiteAnnouncer(object): return trackers + # Returns a cached value of len(self.getSupportedTrackers()), which can be + # inacurate. + # To be used from Site for estimating available tracker count. + def getSupportedTrackerCount(self): + return self.supported_tracker_count + def shouldTrackerBeTemporarilyIgnored(self, tracker, mode, force): if not tracker: return True @@ -71,6 +78,8 @@ class SiteAnnouncer(object): def getAnnouncingTrackers(self, mode, force): trackers = self.getSupportedTrackers() + self.supported_tracker_count = len(trackers) + if trackers and (mode == "update" or mode == "more"): # Choose just 2 trackers to announce to @@ -116,7 +125,7 @@ class SiteAnnouncer(object): back.append("onion") return back - @util.Noparallel(blocking=False) + @util.Noparallel() def announce(self, force=False, mode="start", pex=True): if time.time() - self.time_last_announce < 30 and not force: return # No reannouncing within 30 secs