Implement new websocket command serverSetPassiveMode

This commit is contained in:
Vadim Ushakov 2021-10-21 18:45:08 +07:00
parent ddc4861223
commit c36cba7980
3 changed files with 90 additions and 34 deletions

View file

@ -142,28 +142,24 @@ class ConnectionServer(object):
prev_sizes = {}
for i in range(60):
sizes = {}
total_size = 0
for name, pool in self.managed_pools.items():
for name, pool in list(self.managed_pools.items()):
pool.join(timeout=1)
size = len(pool)
sizes[name] = size
total_size += size
if size:
sizes[name] = size
if total_size == 0:
if len(sizes) == 0:
break
if prev_sizes != sizes:
s = ""
for name, size in sizes.items():
s += "%s pool: %s, " % (name, size)
s += "total: %s" % total_size
self.log.info("Waiting for tasks in managed pools to stop: %s", s)
prev_sizes = sizes
for name, pool in self.managed_pools.items():
for name, pool in list(self.managed_pools.items()):
size = len(pool)
if size:
self.log.info("Killing %s tasks in %s pool", size, name)

View file

@ -35,10 +35,15 @@ class FileServer(ConnectionServer):
self.recheck_port = True
self.active_mode_thread_pool = gevent.pool.Pool(None)
self.update_pool = gevent.pool.Pool(5)
self.update_start_time = 0
self.update_sites_task_next_nr = 1
self.passive_mode = None
self.active_mode_threads = {}
self.supported_ip_types = ["ipv4"] # Outgoing ip_type support
if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported():
self.supported_ip_types.append("ipv6")
@ -64,7 +69,8 @@ class FileServer(ConnectionServer):
ConnectionServer.__init__(self, ip, port, self.handleRequest)
log.debug("Supported IP types: %s" % self.supported_ip_types)
self.managed_pools["update"] = self.pool
self.managed_pools["active_mode_thread"] = self.active_mode_thread_pool
self.managed_pools["update"] = self.update_pool
if ip_type == "dual" and ip == "::":
# Also bind to ipv4 addres in dual mode
@ -356,7 +362,7 @@ class FileServer(ConnectionServer):
self.sleep(1)
self.waitForInternetOnline()
if self.stopping:
if not self.inActiveMode():
break
site = self.getSite(site_address)
@ -368,7 +374,7 @@ class FileServer(ConnectionServer):
thread = self.update_pool.spawn(self.updateSite, site)
if self.stopping:
if not self.inActiveMode():
break
if time.time() - progress_print_time > 60:
@ -387,7 +393,7 @@ class FileServer(ConnectionServer):
time_left
)
if self.stopping:
if not self.inActiveMode():
log.info("%s: stopped", task_description)
else:
log.info("%s: finished in %.2fs", task_description, time.time() - start_time)
@ -414,11 +420,11 @@ class FileServer(ConnectionServer):
self.sleep(long_timeout)
while self.running:
while self.inActiveMode():
site = None
self.sleep(short_timeout)
if self.stopping:
if not self.inActiveMode():
break
site_address, mode = self.peekSiteForVerification()
@ -457,10 +463,10 @@ class FileServer(ConnectionServer):
long_timeout = min_long_timeout
short_cycle_time_limit = 60 * 2
while self.running:
while self.inActiveMode():
self.sleep(long_timeout)
if self.stopping:
if not self.inActiveMode():
break
start_time = time.time()
@ -477,7 +483,7 @@ class FileServer(ConnectionServer):
sites_processed = 0
for site_address in site_addresses:
if self.stopping:
if not self.inActiveMode():
break
site = self.getSite(site_address)
@ -536,13 +542,13 @@ class FileServer(ConnectionServer):
# performing the update for a random site. It's way better than just
# silly pinging a random peer for no profit.
log.info("keepAliveThread started")
while self.running:
while self.inActiveMode():
self.waitForInternetOnline()
threshold = self.internet_outage_threshold / 2.0
self.sleep(threshold / 2.0)
if self.stopping:
if not self.inActiveMode():
break
last_activity_time = max(
@ -570,9 +576,9 @@ class FileServer(ConnectionServer):
# and do it more often.
log.info("reloadTrackerFilesThread started")
interval = 60 * 10
while self.running:
while self.inActiveMode():
self.sleep(interval)
if self.stopping:
if not self.inActiveMode():
break
config.loadTrackersFile()
log.info("reloadTrackerFilesThread stopped")
@ -582,9 +588,9 @@ class FileServer(ConnectionServer):
log.info("wakeupWatcherThread started")
last_time = time.time()
last_my_ips = socket.gethostbyname_ex('')[2]
while self.running:
while self.inActiveMode():
self.sleep(30)
if self.stopping:
if not self.inActiveMode():
break
is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3
if is_time_changed:
@ -612,6 +618,53 @@ class FileServer(ConnectionServer):
last_my_ips = my_ips
log.info("wakeupWatcherThread stopped")
def killActiveModeThreads(self):
for key, thread in list(self.active_mode_threads.items()):
if thread:
if not thread.ready():
self.log.info("killing %s" % key)
gevent.kill(thread)
del self.active_mode_threads[key]
def setPassiveMode(self, passive_mode):
if passive_mode:
self.leaveActiveMode();
else:
self.enterActiveMode();
def leaveActiveMode(self):
if self.passive_mode:
return
log.info("passive mode is ON");
self.passive_mode = True
def enterActiveMode(self):
if not self.passive_mode and self.passive_mode is not None:
return
log.info("passive mode is OFF");
self.passive_mode = False
self.killActiveModeThreads()
x = self.active_mode_threads
p = self.active_mode_thread_pool
x["thread_keep_alive"] = p.spawn(self.keepAliveThread)
x["thread_wakeup_watcher"] = p.spawn(self.wakeupWatcherThread)
x["thread_sites_verification"] = p.spawn(self.sitesVerificationThread)
x["thread_reload_tracker_files"] = p.spawn(self.reloadTrackerFilesThread)
x["thread_sites_maintenance_full"] = p.spawn(self.sitesMaintenanceThread, mode="full")
x["thread_sites_maintenance_short"] = p.spawn(self.sitesMaintenanceThread, mode="short")
x["thread_initial_site_updater"] = p.spawn(self.updateSites)
# Returns True, if an active mode thread should keep going,
# i.e active mode is enabled and the server not going to shutdown
def inActiveMode(self):
if self.passive_mode:
return False
if not self.running:
return False
if self.stopping:
return False
return True
# Bind and start serving sites
# If passive_mode is False, FileServer starts the full-featured file serving:
# * Checks for updates at startup.
@ -648,16 +701,7 @@ class FileServer(ConnectionServer):
# Remove this line when self.sites gets completely unused
self.getSites()
if not passive_mode:
thread_keep_alive = self.spawn(self.keepAliveThread)
thread_wakeup_watcher = self.spawn(self.wakeupWatcherThread)
thread_sites_verification = self.spawn(self.sitesVerificationThread)
thread_reload_tracker_files = self.spawn(self.reloadTrackerFilesThread)
thread_sites_maintenance_full = self.spawn(self.sitesMaintenanceThread, mode="full")
thread_sites_maintenance_short = self.spawn(self.sitesMaintenanceThread, mode="short")
self.sleep(0.1)
self.spawn(self.updateSites)
self.setPassiveMode(passive_mode)
ConnectionServer.listen(self)
@ -672,4 +716,7 @@ class FileServer(ConnectionServer):
except Exception as err:
log.info("Failed at attempt to use upnp to close port: %s" % err)
self.leaveActiveMode();
gevent.joinall(self.active_mode_threads.values(), timeout=15)
return ConnectionServer.stop(self)

View file

@ -318,6 +318,7 @@ class UiWebsocket(object):
back["updatesite"] = config.updatesite
back["dist_type"] = config.dist_type
back["lib_verify_best"] = CryptBitcoin.lib_verify_best
back["passive_mode"] = file_server.passive_mode
return back
def formatAnnouncerInfo(self, site):
@ -1164,6 +1165,18 @@ class UiWebsocket(object):
file_server.portCheck()
self.response(to, file_server.port_opened)
@flag.admin
@flag.no_multiuser
def actionServerSetPassiveMode(self, to, passive_mode=False):
import main
file_server = main.file_server
file_server.setPassiveMode(passive_mode)
if passive_mode:
self.cmd("notification", ["info", _["Passive mode enabled"], 5000])
else:
self.cmd("notification", ["info", _["Passive mode disabled"], 5000])
self.server.updateWebsocket()
@flag.admin
@flag.no_multiuser
def actionServerShutdown(self, to, restart=False):