From c36cba79806726c014eb904dbe7b1554ace6f5e7 Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Thu, 21 Oct 2021 18:45:08 +0700 Subject: [PATCH] Implement new websocket command serverSetPassiveMode --- src/Connection/ConnectionServer.py | 14 ++--- src/File/FileServer.py | 97 ++++++++++++++++++++++-------- src/Ui/UiWebsocket.py | 13 ++++ 3 files changed, 90 insertions(+), 34 deletions(-) diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index ad834c54..bf95a21a 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -142,28 +142,24 @@ class ConnectionServer(object): prev_sizes = {} for i in range(60): sizes = {} - total_size = 0 - for name, pool in self.managed_pools.items(): + for name, pool in list(self.managed_pools.items()): pool.join(timeout=1) size = len(pool) - sizes[name] = size - total_size += size + if size: + sizes[name] = size - if total_size == 0: + if len(sizes) == 0: break if prev_sizes != sizes: s = "" for name, size in sizes.items(): s += "%s pool: %s, " % (name, size) - s += "total: %s" % total_size - self.log.info("Waiting for tasks in managed pools to stop: %s", s) - prev_sizes = sizes - for name, pool in self.managed_pools.items(): + for name, pool in list(self.managed_pools.items()): size = len(pool) if size: self.log.info("Killing %s tasks in %s pool", size, name) diff --git a/src/File/FileServer.py b/src/File/FileServer.py index a0f16b97..5991adf0 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -35,10 +35,15 @@ class FileServer(ConnectionServer): self.recheck_port = True + self.active_mode_thread_pool = gevent.pool.Pool(None) + self.update_pool = gevent.pool.Pool(5) self.update_start_time = 0 self.update_sites_task_next_nr = 1 + self.passive_mode = None + self.active_mode_threads = {} + self.supported_ip_types = ["ipv4"] # Outgoing ip_type support if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported(): self.supported_ip_types.append("ipv6") @@ -64,7 +69,8 @@ class FileServer(ConnectionServer): ConnectionServer.__init__(self, ip, port, self.handleRequest) log.debug("Supported IP types: %s" % self.supported_ip_types) - self.managed_pools["update"] = self.pool + self.managed_pools["active_mode_thread"] = self.active_mode_thread_pool + self.managed_pools["update"] = self.update_pool if ip_type == "dual" and ip == "::": # Also bind to ipv4 addres in dual mode @@ -356,7 +362,7 @@ class FileServer(ConnectionServer): self.sleep(1) self.waitForInternetOnline() - if self.stopping: + if not self.inActiveMode(): break site = self.getSite(site_address) @@ -368,7 +374,7 @@ class FileServer(ConnectionServer): thread = self.update_pool.spawn(self.updateSite, site) - if self.stopping: + if not self.inActiveMode(): break if time.time() - progress_print_time > 60: @@ -387,7 +393,7 @@ class FileServer(ConnectionServer): time_left ) - if self.stopping: + if not self.inActiveMode(): log.info("%s: stopped", task_description) else: log.info("%s: finished in %.2fs", task_description, time.time() - start_time) @@ -414,11 +420,11 @@ class FileServer(ConnectionServer): self.sleep(long_timeout) - while self.running: + while self.inActiveMode(): site = None self.sleep(short_timeout) - if self.stopping: + if not self.inActiveMode(): break site_address, mode = self.peekSiteForVerification() @@ -457,10 +463,10 @@ class FileServer(ConnectionServer): long_timeout = min_long_timeout short_cycle_time_limit = 60 * 2 - while self.running: + while self.inActiveMode(): self.sleep(long_timeout) - if self.stopping: + if not self.inActiveMode(): break start_time = time.time() @@ -477,7 +483,7 @@ class FileServer(ConnectionServer): sites_processed = 0 for site_address in site_addresses: - if self.stopping: + if not self.inActiveMode(): break site = self.getSite(site_address) @@ -536,13 +542,13 @@ class FileServer(ConnectionServer): # performing the update for a random site. It's way better than just # silly pinging a random peer for no profit. log.info("keepAliveThread started") - while self.running: + while self.inActiveMode(): self.waitForInternetOnline() threshold = self.internet_outage_threshold / 2.0 self.sleep(threshold / 2.0) - if self.stopping: + if not self.inActiveMode(): break last_activity_time = max( @@ -570,9 +576,9 @@ class FileServer(ConnectionServer): # and do it more often. log.info("reloadTrackerFilesThread started") interval = 60 * 10 - while self.running: + while self.inActiveMode(): self.sleep(interval) - if self.stopping: + if not self.inActiveMode(): break config.loadTrackersFile() log.info("reloadTrackerFilesThread stopped") @@ -582,9 +588,9 @@ class FileServer(ConnectionServer): log.info("wakeupWatcherThread started") last_time = time.time() last_my_ips = socket.gethostbyname_ex('')[2] - while self.running: + while self.inActiveMode(): self.sleep(30) - if self.stopping: + if not self.inActiveMode(): break is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3 if is_time_changed: @@ -612,6 +618,53 @@ class FileServer(ConnectionServer): last_my_ips = my_ips log.info("wakeupWatcherThread stopped") + def killActiveModeThreads(self): + for key, thread in list(self.active_mode_threads.items()): + if thread: + if not thread.ready(): + self.log.info("killing %s" % key) + gevent.kill(thread) + del self.active_mode_threads[key] + + def setPassiveMode(self, passive_mode): + if passive_mode: + self.leaveActiveMode(); + else: + self.enterActiveMode(); + + def leaveActiveMode(self): + if self.passive_mode: + return + log.info("passive mode is ON"); + self.passive_mode = True + + def enterActiveMode(self): + if not self.passive_mode and self.passive_mode is not None: + return + log.info("passive mode is OFF"); + self.passive_mode = False + self.killActiveModeThreads() + x = self.active_mode_threads + p = self.active_mode_thread_pool + x["thread_keep_alive"] = p.spawn(self.keepAliveThread) + x["thread_wakeup_watcher"] = p.spawn(self.wakeupWatcherThread) + x["thread_sites_verification"] = p.spawn(self.sitesVerificationThread) + x["thread_reload_tracker_files"] = p.spawn(self.reloadTrackerFilesThread) + x["thread_sites_maintenance_full"] = p.spawn(self.sitesMaintenanceThread, mode="full") + x["thread_sites_maintenance_short"] = p.spawn(self.sitesMaintenanceThread, mode="short") + x["thread_initial_site_updater"] = p.spawn(self.updateSites) + + # Returns True, if an active mode thread should keep going, + # i.e active mode is enabled and the server not going to shutdown + def inActiveMode(self): + if self.passive_mode: + return False + if not self.running: + return False + if self.stopping: + return False + return True + # Bind and start serving sites # If passive_mode is False, FileServer starts the full-featured file serving: # * Checks for updates at startup. @@ -648,16 +701,7 @@ class FileServer(ConnectionServer): # Remove this line when self.sites gets completely unused self.getSites() - if not passive_mode: - thread_keep_alive = self.spawn(self.keepAliveThread) - thread_wakeup_watcher = self.spawn(self.wakeupWatcherThread) - thread_sites_verification = self.spawn(self.sitesVerificationThread) - thread_reload_tracker_files = self.spawn(self.reloadTrackerFilesThread) - thread_sites_maintenance_full = self.spawn(self.sitesMaintenanceThread, mode="full") - thread_sites_maintenance_short = self.spawn(self.sitesMaintenanceThread, mode="short") - - self.sleep(0.1) - self.spawn(self.updateSites) + self.setPassiveMode(passive_mode) ConnectionServer.listen(self) @@ -672,4 +716,7 @@ class FileServer(ConnectionServer): except Exception as err: log.info("Failed at attempt to use upnp to close port: %s" % err) + self.leaveActiveMode(); + gevent.joinall(self.active_mode_threads.values(), timeout=15) + return ConnectionServer.stop(self) diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 85fa904d..77e5b12f 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -318,6 +318,7 @@ class UiWebsocket(object): back["updatesite"] = config.updatesite back["dist_type"] = config.dist_type back["lib_verify_best"] = CryptBitcoin.lib_verify_best + back["passive_mode"] = file_server.passive_mode return back def formatAnnouncerInfo(self, site): @@ -1164,6 +1165,18 @@ class UiWebsocket(object): file_server.portCheck() self.response(to, file_server.port_opened) + @flag.admin + @flag.no_multiuser + def actionServerSetPassiveMode(self, to, passive_mode=False): + import main + file_server = main.file_server + file_server.setPassiveMode(passive_mode) + if passive_mode: + self.cmd("notification", ["info", _["Passive mode enabled"], 5000]) + else: + self.cmd("notification", ["info", _["Passive mode disabled"], 5000]) + self.server.updateWebsocket() + @flag.admin @flag.no_multiuser def actionServerShutdown(self, to, restart=False):