From e3daa09316bc099e7d7db646f98c1c6a125aaf9e Mon Sep 17 00:00:00 2001 From: Vadim Ushakov Date: Fri, 22 Oct 2021 00:30:42 +0700 Subject: [PATCH] Improve the file server shutdown logic and display the shutdown progress bar in the UI --- plugins/AnnounceLocal/AnnounceLocalPlugin.py | 4 +- src/Connection/ConnectionServer.py | 55 +++++++--- src/Content/ContentManager.py | 2 +- src/Debug/DebugHook.py | 34 ++++-- src/File/FileServer.py | 9 +- src/Site/Site.py | 107 ++++++++++++------- src/Site/SiteStorage.py | 6 +- src/Ui/UiWebsocket.py | 2 +- src/util/GreenletManager.py | 26 ++++- 9 files changed, 170 insertions(+), 75 deletions(-) diff --git a/plugins/AnnounceLocal/AnnounceLocalPlugin.py b/plugins/AnnounceLocal/AnnounceLocalPlugin.py index b9225966..01202774 100644 --- a/plugins/AnnounceLocal/AnnounceLocalPlugin.py +++ b/plugins/AnnounceLocal/AnnounceLocalPlugin.py @@ -131,10 +131,10 @@ class FileServerPlugin(object): gevent.spawn(self.local_announcer.start) return super(FileServerPlugin, self).start(*args, **kwargs) - def stop(self): + def stop(self, ui_websocket=None): if self.local_announcer: self.local_announcer.stop() - res = super(FileServerPlugin, self).stop() + res = super(FileServerPlugin, self).stop(ui_websocket=ui_websocket) return res diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index bf95a21a..f4358965 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -131,33 +131,52 @@ class ConnectionServer(object): return False self.log.debug("Stopped.") - def stop(self): + def stop(self, ui_websocket=None): self.log.debug("Stopping %s" % self.stream_server) self.stopping = True self.running = False self.stopping_event.set() - self.onStop() + self.onStop(ui_websocket=ui_websocket) - def onStop(self): - prev_sizes = {} - for i in range(60): + def onStop(self, ui_websocket=None): + timeout = 30 + start_time = time.time() + join_quantum = 0.1 + prev_msg = None + while True: + if time.time() >= start_time + timeout: + break + + total_size = 0 sizes = {} - + timestep = 0 for name, pool in list(self.managed_pools.items()): - pool.join(timeout=1) + timestep += join_quantum + pool.join(timeout=join_quantum) size = len(pool) if size: sizes[name] = size + total_size += size if len(sizes) == 0: break - if prev_sizes != sizes: - s = "" - for name, size in sizes.items(): - s += "%s pool: %s, " % (name, size) - self.log.info("Waiting for tasks in managed pools to stop: %s", s) - prev_sizes = sizes + if timestep < 1: + time.sleep(1 - timestep) + + # format message + s = "" + for name, size in sizes.items(): + s += "%s pool: %s, " % (name, size) + msg = "Waiting for tasks in managed pools to stop: %s" % s + # Prevent flooding to log + if msg != prev_msg: + prev_msg = msg + self.log.info("%s", msg) + + percent = 100 * (time.time() - start_time) / timeout + msg = "File Server: waiting for %s tasks to stop" % total_size + self.sendShutdownProgress(ui_websocket, msg, percent) for name, pool in list(self.managed_pools.items()): size = len(pool) @@ -165,12 +184,20 @@ class ConnectionServer(object): self.log.info("Killing %s tasks in %s pool", size, name) pool.kill() + self.sendShutdownProgress(ui_websocket, "File Server stopped. Now to exit.", 100) + if self.thread_checker: gevent.kill(self.thread_checker) self.thread_checker = None if self.stream_server: self.stream_server.stop() + def sendShutdownProgress(self, ui_websocket, message, progress): + if not ui_websocket: + return + ui_websocket.cmd("progress", ["shutdown", message, progress]) + time.sleep(0.01) + # Sleeps the specified amount of time or until ConnectionServer is stopped def sleep(self, t): if t: @@ -178,7 +205,7 @@ class ConnectionServer(object): else: time.sleep(t) - # Spawns a thread that will be waited for on server being stooped (and killed after a timeout) + # Spawns a thread that will be waited for on server being stopped (and killed after a timeout) def spawn(self, *args, **kwargs): thread = self.thread_pool.spawn(*args, **kwargs) return thread diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 7d1263ef..c6a64750 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -239,7 +239,7 @@ class ContentManager(object): if num_removed_bad_files > 0: self.site.worker_manager.removeSolvedFileTasks(mark_as_good=False) - gevent.spawn(self.site.update, since=0) + self.site.spawn(self.site.update, since=0) self.log.debug("Archived removed contents: %s, removed bad files: %s" % (num_removed_contents, num_removed_bad_files)) diff --git a/src/Debug/DebugHook.py b/src/Debug/DebugHook.py index d100a3b8..4a5bfd75 100644 --- a/src/Debug/DebugHook.py +++ b/src/Debug/DebugHook.py @@ -11,20 +11,34 @@ from . import Debug last_error = None -def shutdown(reason="Unknown"): - logging.info("Shutting down (reason: %s)..." % reason) +thread_shutdown = None + +def shutdownThread(): import main - if "file_server" in dir(main): - try: - gevent.spawn(main.file_server.stop) - if "ui_server" in dir(main): - gevent.spawn(main.ui_server.stop) - except Exception as err: - print("Proper shutdown error: %s" % err) - sys.exit(0) + try: + if "file_server" in dir(main): + thread = gevent.spawn(main.file_server.stop) + thread.join(timeout=60) + if "ui_server" in dir(main): + thread = gevent.spawn(main.ui_server.stop) + thread.join(timeout=10) + except Exception as err: + print("Error in shutdown thread: %s" % err) + sys.exit(0) else: sys.exit(0) + +def shutdown(reason="Unknown"): + global thread_shutdown + logging.info("Shutting down (reason: %s)..." % reason) + try: + if not thread_shutdown: + thread_shutdown = gevent.spawn(shutdownThread) + except Exception as err: + print("Proper shutdown error: %s" % err) + sys.exit(0) + # Store last error, ignore notify, allow manual error logging def handleError(*args, **kwargs): global last_error diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 5991adf0..6eb1ec5b 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -36,6 +36,7 @@ class FileServer(ConnectionServer): self.recheck_port = True self.active_mode_thread_pool = gevent.pool.Pool(None) + self.site_pool = gevent.pool.Pool(None) self.update_pool = gevent.pool.Pool(5) self.update_start_time = 0 @@ -71,6 +72,7 @@ class FileServer(ConnectionServer): self.managed_pools["active_mode_thread"] = self.active_mode_thread_pool self.managed_pools["update"] = self.update_pool + self.managed_pools["site"] = self.site_pool if ip_type == "dual" and ip == "::": # Also bind to ipv4 addres in dual mode @@ -707,7 +709,7 @@ class FileServer(ConnectionServer): log.info("Stopped.") - def stop(self): + def stop(self, ui_websocket=None): if self.running and self.portchecker.upnp_port_opened: log.debug('Closing port %d' % self.port) try: @@ -716,7 +718,4 @@ class FileServer(ConnectionServer): except Exception as err: log.info("Failed at attempt to use upnp to close port: %s" % err) - self.leaveActiveMode(); - gevent.joinall(self.active_mode_threads.values(), timeout=15) - - return ConnectionServer.stop(self) + return ConnectionServer.stop(self, ui_websocket=ui_websocket) diff --git a/src/Site/Site.py b/src/Site/Site.py index ba3c9812..5b228ff9 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -175,25 +175,8 @@ class Site(object): self.fzs_count = random.randint(0, self.fzs_range / 4) self.fzs_timestamp = 0 - self.content = None # Load content.json - self.peers = {} # Key: ip:port, Value: Peer.Peer - self.peers_recent = collections.deque(maxlen=150) - self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself) - self.greenlet_manager = GreenletManager.GreenletManager() # Running greenlets - self.worker_manager = WorkerManager(self) # Handle site download from other peers - self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) - self.content_updated = None # Content.js update time - self.last_online_update = 0 - self.startup_announce_done = 0 - self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] - self.page_requested = False # Page viewed in browser - self.websockets = [] # Active site websocket connections - + ############################################## self.connection_server = None - self.loadSettings(settings) # Load settings from sites.json - self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files - self.content_manager = ContentManager(self) - self.content_manager.loadContents() # Load content.json files if "main" in sys.modules: # import main has side-effects, breaks tests import main if "file_server" in dir(main): # Use global file server by default if possible @@ -203,6 +186,26 @@ class Site(object): self.connection_server = main.file_server else: self.connection_server = FileServer() + ############################################## + + self.content = None # Load content.json + self.peers = {} # Key: ip:port, Value: Peer.Peer + self.peers_recent = collections.deque(maxlen=150) + self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself) + self.greenlet_manager = GreenletManager.GreenletManager(self.connection_server.site_pool) # Running greenlets + self.worker_manager = WorkerManager(self) # Handle site download from other peers + self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) + self.content_updated = None # Content.js update time + self.last_online_update = 0 + self.startup_announce_done = 0 + self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout] + self.page_requested = False # Page viewed in browser + self.websockets = [] # Active site websocket connections + + self.loadSettings(settings) # Load settings from sites.json + self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files + self.content_manager = ContentManager(self) + self.content_manager.loadContents() # Load content.json files self.announcer = SiteAnnouncer(self) # Announce and get peer list from other nodes @@ -275,14 +278,32 @@ class Site(object): SiteManager.site_manager.load(False) SiteManager.site_manager.saveDelayed() + # Returns True if any site-related activity should be interrupted + # due to connection server being stooped or site being deleted + def isStopping(self): + return self.connection_server.stopping or self.settings.get("deleting", False) + + # Returns False if any network activity for the site should not happen def isServing(self): if config.offline: return False - elif self.connection_server.stopping: + elif self.isStopping(): return False else: return self.settings["serving"] + # Spawns a thread that will be waited for on server being stopped (and killed after a timeout). + # Short cut to self.greenlet_manager.spawn() + def spawn(self, *args, **kwargs): + thread = self.greenlet_manager.spawn(*args, **kwargs) + return thread + + # Spawns a thread that will be waited for on server being stopped (and killed after a timeout). + # Short cut to self.greenlet_manager.spawnLater() + def spawnLater(self, *args, **kwargs): + thread = self.greenlet_manager.spawnLater(*args, **kwargs) + return thread + def getSettingsCache(self): back = {} back["bad_files"] = self.bad_files @@ -418,7 +439,7 @@ class Site(object): # Optionals files if inner_path == "content.json": - gevent.spawn(self.updateHashfield) + self.spawn(self.updateHashfield) for file_relative_path in list(self.content_manager.contents[inner_path].get("files_optional", {}).keys()): file_inner_path = content_inner_dir + file_relative_path @@ -437,7 +458,7 @@ class Site(object): include_threads = [] for file_relative_path in list(self.content_manager.contents[inner_path].get("includes", {}).keys()): file_inner_path = content_inner_dir + file_relative_path - include_thread = gevent.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer) + include_thread = self.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer) include_threads.append(include_thread) if config.verbose: @@ -517,9 +538,9 @@ class Site(object): ) if self.isAddedRecently(): - gevent.spawn(self.announce, mode="start", force=True) + self.spawn(self.announce, mode="start", force=True) else: - gevent.spawn(self.announce, mode="update") + self.spawn(self.announce, mode="update") if check_size: # Check the size first valid = self.downloadContent("content.json", download_files=False) # Just download content.json files @@ -615,7 +636,7 @@ class Site(object): self.log.info("CheckModifications: %s: %s > %s" % ( inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0) )) - t = gevent.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True) + t = self.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True) threads.append(t) if send_back: @@ -628,7 +649,7 @@ class Site(object): self.log.info("CheckModifications: %s: %s < %s" % ( inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0) )) - gevent.spawn(self.publisher, inner_path, [peer], [], 1) + self.spawn(self.publisher, inner_path, [peer], [], 1) self.log.debug("CheckModifications: Waiting for %s pooledDownloadContent" % len(threads)) gevent.joinall(threads) @@ -685,7 +706,7 @@ class Site(object): updaters = [] for i in range(updater_limit): - updaters.append(gevent.spawn(self.updater, peers_try, queried, need_queries, since)) + updaters.append(self.spawn(self.updater, peers_try, queried, need_queries, since)) for r in range(10): gevent.joinall(updaters, timeout=5+r) @@ -738,13 +759,17 @@ class Site(object): elif check_files: self.updateWebsocket(checking=True) - if verify_files: - self.storage.updateBadFiles(quick_check=False) - self.settings["check_files_timestamp"] = time.time() - self.settings["verify_files_timestamp"] = time.time() - elif check_files: - self.storage.updateBadFiles(quick_check=True) # Quick check and mark bad files based on file size - self.settings["check_files_timestamp"] = time.time() + if check_files: + if verify_files: + self.storage.updateBadFiles(quick_check=False) # Full-featured checksum verification + else: + self.storage.updateBadFiles(quick_check=True) # Quick check and mark bad files based on file size + # Don't update the timestamps in case of the application being shut down, + # so we can make another try next time. + if not self.isStopping(): + self.settings["check_files_timestamp"] = time.time() + if verify_files: + self.settings["verify_files_timestamp"] = time.time() if not self.isServing(): self.updateWebsocket(updated=True) @@ -766,7 +791,7 @@ class Site(object): if self.bad_files: self.log.debug("Bad files: %s" % self.bad_files) - gevent.spawn(self.retryBadFiles, force=True) + self.spawn(self.retryBadFiles, force=True) if len(queried) == 0: # Failed to query modifications @@ -856,7 +881,7 @@ class Site(object): background_publisher = BackgroundPublisher(self, published=published, limit=limit, inner_path=inner_path, diffs=diffs) self.background_publishers[inner_path] = background_publisher - gevent.spawn(background_publisher.process) + self.spawn(background_publisher.process) def processBackgroundPublishers(self): with self.background_publishers_lock: @@ -928,7 +953,7 @@ class Site(object): event_done = gevent.event.AsyncResult() for i in range(min(len(peers), limit, threads)): - publisher = gevent.spawn(self.publisher, inner_path, peers, published, limit, diffs, event_done, cb_progress) + publisher = self.spawn(self.publisher, inner_path, peers, published, limit, diffs, event_done, cb_progress) publishers.append(publisher) event_done.get() # Wait for done @@ -946,7 +971,7 @@ class Site(object): self.addBackgroundPublisher(published=published, limit=limit, inner_path=inner_path, diffs=diffs) # Send my hashfield to every connected peer if changed - gevent.spawn(self.sendMyHashfield, 100) + self.spawn(self.sendMyHashfield, 100) return len(published) @@ -1109,7 +1134,7 @@ class Site(object): if not self.content_manager.contents.get("content.json"): # No content.json, download it first! self.log.debug("Need content.json first (inner_path: %s, priority: %s)" % (inner_path, priority)) if priority > 0: - gevent.spawn(self.announce) + self.spawn(self.announce) if inner_path != "content.json": # Prevent double download task = self.worker_manager.addTask("content.json", peer) task["evt"].get() @@ -1508,6 +1533,9 @@ class Site(object): # Send hashfield to peers def sendMyHashfield(self, limit=5): + if not self.isServing(): + return False + if not self.content_manager.hashfield: # No optional files return False @@ -1525,6 +1553,9 @@ class Site(object): # Update hashfield def updateHashfield(self, limit=5): + if not self.isServing(): + return False + # Return if no optional files if not self.content_manager.hashfield and not self.content_manager.has_optional_files: return False diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py index 4e532788..b89aedbf 100644 --- a/src/Site/SiteStorage.py +++ b/src/Site/SiteStorage.py @@ -375,7 +375,7 @@ class SiteStorage(object): # Reopen DB to check changes if self.has_db: self.closeDb("New dbschema") - gevent.spawn(self.getDb) + self.site.spawn(self.getDb) elif not config.disable_db and should_load_to_db and self.has_db: # Load json file to db if config.verbose: self.log.debug("Loading json file to db: %s (file: %s)" % (inner_path, file)) @@ -458,6 +458,10 @@ class SiteStorage(object): i += 1 if i % 50 == 0: time.sleep(0.001) # Context switch to avoid gevent hangs + + if self.site.isStopping(): + break + if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file back["num_content_missing"] += 1 self.log.debug("[MISSING] %s" % content_inner_path) diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 77e5b12f..7e68eb34 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -1187,7 +1187,7 @@ class UiWebsocket(object): return False if restart: main.restart_after_shutdown = True - main.file_server.stop() + main.file_server.stop(ui_websocket=self) main.ui_server.stop() if restart: diff --git a/src/util/GreenletManager.py b/src/util/GreenletManager.py index e024233d..d711d09a 100644 --- a/src/util/GreenletManager.py +++ b/src/util/GreenletManager.py @@ -3,17 +3,37 @@ from Debug import Debug class GreenletManager: - def __init__(self): + # pool is either gevent.pool.Pool or GreenletManager. + # if pool is None, new gevent.pool.Pool() is created. + def __init__(self, pool=None): self.greenlets = set() + if not pool: + pool = gevent.pool.Pool(None) + self.pool = pool + + def _spawn_later(self, seconds, *args, **kwargs): + # If pool is another GreenletManager, delegate to it. + if hasattr(self.pool, 'spawnLater'): + return self.pool.spawnLater(seconds, *args, **kwargs) + + # There's gevent.spawn_later(), but there isn't gevent.pool.Pool.spawn_later(). + # Doing manually. + greenlet = self.pool.greenlet_class(*args, **kwargs) + self.pool.add(greenlet) + greenlet.start_later(seconds) + return greenlet + + def _spawn(self, *args, **kwargs): + return self.pool.spawn(*args, **kwargs) def spawnLater(self, *args, **kwargs): - greenlet = gevent.spawn_later(*args, **kwargs) + greenlet = self._spawn_later(*args, **kwargs) greenlet.link(lambda greenlet: self.greenlets.remove(greenlet)) self.greenlets.add(greenlet) return greenlet def spawn(self, *args, **kwargs): - greenlet = gevent.spawn(*args, **kwargs) + greenlet = self._spawn(*args, **kwargs) greenlet.link(lambda greenlet: self.greenlets.remove(greenlet)) self.greenlets.add(greenlet) return greenlet