Improve the file server shutdown logic and display the shutdown progress bar in the UI
This commit is contained in:
parent
77d2d69376
commit
e3daa09316
9 changed files with 170 additions and 75 deletions
|
@ -131,10 +131,10 @@ class FileServerPlugin(object):
|
|||
gevent.spawn(self.local_announcer.start)
|
||||
return super(FileServerPlugin, self).start(*args, **kwargs)
|
||||
|
||||
def stop(self):
|
||||
def stop(self, ui_websocket=None):
|
||||
if self.local_announcer:
|
||||
self.local_announcer.stop()
|
||||
res = super(FileServerPlugin, self).stop()
|
||||
res = super(FileServerPlugin, self).stop(ui_websocket=ui_websocket)
|
||||
return res
|
||||
|
||||
|
||||
|
|
|
@ -131,33 +131,52 @@ class ConnectionServer(object):
|
|||
return False
|
||||
self.log.debug("Stopped.")
|
||||
|
||||
def stop(self):
|
||||
def stop(self, ui_websocket=None):
|
||||
self.log.debug("Stopping %s" % self.stream_server)
|
||||
self.stopping = True
|
||||
self.running = False
|
||||
self.stopping_event.set()
|
||||
self.onStop()
|
||||
self.onStop(ui_websocket=ui_websocket)
|
||||
|
||||
def onStop(self):
|
||||
prev_sizes = {}
|
||||
for i in range(60):
|
||||
def onStop(self, ui_websocket=None):
|
||||
timeout = 30
|
||||
start_time = time.time()
|
||||
join_quantum = 0.1
|
||||
prev_msg = None
|
||||
while True:
|
||||
if time.time() >= start_time + timeout:
|
||||
break
|
||||
|
||||
total_size = 0
|
||||
sizes = {}
|
||||
|
||||
timestep = 0
|
||||
for name, pool in list(self.managed_pools.items()):
|
||||
pool.join(timeout=1)
|
||||
timestep += join_quantum
|
||||
pool.join(timeout=join_quantum)
|
||||
size = len(pool)
|
||||
if size:
|
||||
sizes[name] = size
|
||||
total_size += size
|
||||
|
||||
if len(sizes) == 0:
|
||||
break
|
||||
|
||||
if prev_sizes != sizes:
|
||||
s = ""
|
||||
for name, size in sizes.items():
|
||||
s += "%s pool: %s, " % (name, size)
|
||||
self.log.info("Waiting for tasks in managed pools to stop: %s", s)
|
||||
prev_sizes = sizes
|
||||
if timestep < 1:
|
||||
time.sleep(1 - timestep)
|
||||
|
||||
# format message
|
||||
s = ""
|
||||
for name, size in sizes.items():
|
||||
s += "%s pool: %s, " % (name, size)
|
||||
msg = "Waiting for tasks in managed pools to stop: %s" % s
|
||||
# Prevent flooding to log
|
||||
if msg != prev_msg:
|
||||
prev_msg = msg
|
||||
self.log.info("%s", msg)
|
||||
|
||||
percent = 100 * (time.time() - start_time) / timeout
|
||||
msg = "File Server: waiting for %s tasks to stop" % total_size
|
||||
self.sendShutdownProgress(ui_websocket, msg, percent)
|
||||
|
||||
for name, pool in list(self.managed_pools.items()):
|
||||
size = len(pool)
|
||||
|
@ -165,12 +184,20 @@ class ConnectionServer(object):
|
|||
self.log.info("Killing %s tasks in %s pool", size, name)
|
||||
pool.kill()
|
||||
|
||||
self.sendShutdownProgress(ui_websocket, "File Server stopped. Now to exit.", 100)
|
||||
|
||||
if self.thread_checker:
|
||||
gevent.kill(self.thread_checker)
|
||||
self.thread_checker = None
|
||||
if self.stream_server:
|
||||
self.stream_server.stop()
|
||||
|
||||
def sendShutdownProgress(self, ui_websocket, message, progress):
|
||||
if not ui_websocket:
|
||||
return
|
||||
ui_websocket.cmd("progress", ["shutdown", message, progress])
|
||||
time.sleep(0.01)
|
||||
|
||||
# Sleeps the specified amount of time or until ConnectionServer is stopped
|
||||
def sleep(self, t):
|
||||
if t:
|
||||
|
@ -178,7 +205,7 @@ class ConnectionServer(object):
|
|||
else:
|
||||
time.sleep(t)
|
||||
|
||||
# Spawns a thread that will be waited for on server being stooped (and killed after a timeout)
|
||||
# Spawns a thread that will be waited for on server being stopped (and killed after a timeout)
|
||||
def spawn(self, *args, **kwargs):
|
||||
thread = self.thread_pool.spawn(*args, **kwargs)
|
||||
return thread
|
||||
|
|
|
@ -239,7 +239,7 @@ class ContentManager(object):
|
|||
|
||||
if num_removed_bad_files > 0:
|
||||
self.site.worker_manager.removeSolvedFileTasks(mark_as_good=False)
|
||||
gevent.spawn(self.site.update, since=0)
|
||||
self.site.spawn(self.site.update, since=0)
|
||||
|
||||
self.log.debug("Archived removed contents: %s, removed bad files: %s" % (num_removed_contents, num_removed_bad_files))
|
||||
|
||||
|
|
|
@ -11,20 +11,34 @@ from . import Debug
|
|||
|
||||
last_error = None
|
||||
|
||||
def shutdown(reason="Unknown"):
|
||||
logging.info("Shutting down (reason: %s)..." % reason)
|
||||
thread_shutdown = None
|
||||
|
||||
def shutdownThread():
|
||||
import main
|
||||
if "file_server" in dir(main):
|
||||
try:
|
||||
gevent.spawn(main.file_server.stop)
|
||||
if "ui_server" in dir(main):
|
||||
gevent.spawn(main.ui_server.stop)
|
||||
except Exception as err:
|
||||
print("Proper shutdown error: %s" % err)
|
||||
sys.exit(0)
|
||||
try:
|
||||
if "file_server" in dir(main):
|
||||
thread = gevent.spawn(main.file_server.stop)
|
||||
thread.join(timeout=60)
|
||||
if "ui_server" in dir(main):
|
||||
thread = gevent.spawn(main.ui_server.stop)
|
||||
thread.join(timeout=10)
|
||||
except Exception as err:
|
||||
print("Error in shutdown thread: %s" % err)
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def shutdown(reason="Unknown"):
|
||||
global thread_shutdown
|
||||
logging.info("Shutting down (reason: %s)..." % reason)
|
||||
try:
|
||||
if not thread_shutdown:
|
||||
thread_shutdown = gevent.spawn(shutdownThread)
|
||||
except Exception as err:
|
||||
print("Proper shutdown error: %s" % err)
|
||||
sys.exit(0)
|
||||
|
||||
# Store last error, ignore notify, allow manual error logging
|
||||
def handleError(*args, **kwargs):
|
||||
global last_error
|
||||
|
|
|
@ -36,6 +36,7 @@ class FileServer(ConnectionServer):
|
|||
self.recheck_port = True
|
||||
|
||||
self.active_mode_thread_pool = gevent.pool.Pool(None)
|
||||
self.site_pool = gevent.pool.Pool(None)
|
||||
|
||||
self.update_pool = gevent.pool.Pool(5)
|
||||
self.update_start_time = 0
|
||||
|
@ -71,6 +72,7 @@ class FileServer(ConnectionServer):
|
|||
|
||||
self.managed_pools["active_mode_thread"] = self.active_mode_thread_pool
|
||||
self.managed_pools["update"] = self.update_pool
|
||||
self.managed_pools["site"] = self.site_pool
|
||||
|
||||
if ip_type == "dual" and ip == "::":
|
||||
# Also bind to ipv4 addres in dual mode
|
||||
|
@ -707,7 +709,7 @@ class FileServer(ConnectionServer):
|
|||
|
||||
log.info("Stopped.")
|
||||
|
||||
def stop(self):
|
||||
def stop(self, ui_websocket=None):
|
||||
if self.running and self.portchecker.upnp_port_opened:
|
||||
log.debug('Closing port %d' % self.port)
|
||||
try:
|
||||
|
@ -716,7 +718,4 @@ class FileServer(ConnectionServer):
|
|||
except Exception as err:
|
||||
log.info("Failed at attempt to use upnp to close port: %s" % err)
|
||||
|
||||
self.leaveActiveMode();
|
||||
gevent.joinall(self.active_mode_threads.values(), timeout=15)
|
||||
|
||||
return ConnectionServer.stop(self)
|
||||
return ConnectionServer.stop(self, ui_websocket=ui_websocket)
|
||||
|
|
107
src/Site/Site.py
107
src/Site/Site.py
|
@ -175,25 +175,8 @@ class Site(object):
|
|||
self.fzs_count = random.randint(0, self.fzs_range / 4)
|
||||
self.fzs_timestamp = 0
|
||||
|
||||
self.content = None # Load content.json
|
||||
self.peers = {} # Key: ip:port, Value: Peer.Peer
|
||||
self.peers_recent = collections.deque(maxlen=150)
|
||||
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
|
||||
self.greenlet_manager = GreenletManager.GreenletManager() # Running greenlets
|
||||
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
||||
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
||||
self.content_updated = None # Content.js update time
|
||||
self.last_online_update = 0
|
||||
self.startup_announce_done = 0
|
||||
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
|
||||
self.page_requested = False # Page viewed in browser
|
||||
self.websockets = [] # Active site websocket connections
|
||||
|
||||
##############################################
|
||||
self.connection_server = None
|
||||
self.loadSettings(settings) # Load settings from sites.json
|
||||
self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files
|
||||
self.content_manager = ContentManager(self)
|
||||
self.content_manager.loadContents() # Load content.json files
|
||||
if "main" in sys.modules: # import main has side-effects, breaks tests
|
||||
import main
|
||||
if "file_server" in dir(main): # Use global file server by default if possible
|
||||
|
@ -203,6 +186,26 @@ class Site(object):
|
|||
self.connection_server = main.file_server
|
||||
else:
|
||||
self.connection_server = FileServer()
|
||||
##############################################
|
||||
|
||||
self.content = None # Load content.json
|
||||
self.peers = {} # Key: ip:port, Value: Peer.Peer
|
||||
self.peers_recent = collections.deque(maxlen=150)
|
||||
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
|
||||
self.greenlet_manager = GreenletManager.GreenletManager(self.connection_server.site_pool) # Running greenlets
|
||||
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
||||
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
||||
self.content_updated = None # Content.js update time
|
||||
self.last_online_update = 0
|
||||
self.startup_announce_done = 0
|
||||
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
|
||||
self.page_requested = False # Page viewed in browser
|
||||
self.websockets = [] # Active site websocket connections
|
||||
|
||||
self.loadSettings(settings) # Load settings from sites.json
|
||||
self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files
|
||||
self.content_manager = ContentManager(self)
|
||||
self.content_manager.loadContents() # Load content.json files
|
||||
|
||||
self.announcer = SiteAnnouncer(self) # Announce and get peer list from other nodes
|
||||
|
||||
|
@ -275,14 +278,32 @@ class Site(object):
|
|||
SiteManager.site_manager.load(False)
|
||||
SiteManager.site_manager.saveDelayed()
|
||||
|
||||
# Returns True if any site-related activity should be interrupted
|
||||
# due to connection server being stooped or site being deleted
|
||||
def isStopping(self):
|
||||
return self.connection_server.stopping or self.settings.get("deleting", False)
|
||||
|
||||
# Returns False if any network activity for the site should not happen
|
||||
def isServing(self):
|
||||
if config.offline:
|
||||
return False
|
||||
elif self.connection_server.stopping:
|
||||
elif self.isStopping():
|
||||
return False
|
||||
else:
|
||||
return self.settings["serving"]
|
||||
|
||||
# Spawns a thread that will be waited for on server being stopped (and killed after a timeout).
|
||||
# Short cut to self.greenlet_manager.spawn()
|
||||
def spawn(self, *args, **kwargs):
|
||||
thread = self.greenlet_manager.spawn(*args, **kwargs)
|
||||
return thread
|
||||
|
||||
# Spawns a thread that will be waited for on server being stopped (and killed after a timeout).
|
||||
# Short cut to self.greenlet_manager.spawnLater()
|
||||
def spawnLater(self, *args, **kwargs):
|
||||
thread = self.greenlet_manager.spawnLater(*args, **kwargs)
|
||||
return thread
|
||||
|
||||
def getSettingsCache(self):
|
||||
back = {}
|
||||
back["bad_files"] = self.bad_files
|
||||
|
@ -418,7 +439,7 @@ class Site(object):
|
|||
|
||||
# Optionals files
|
||||
if inner_path == "content.json":
|
||||
gevent.spawn(self.updateHashfield)
|
||||
self.spawn(self.updateHashfield)
|
||||
|
||||
for file_relative_path in list(self.content_manager.contents[inner_path].get("files_optional", {}).keys()):
|
||||
file_inner_path = content_inner_dir + file_relative_path
|
||||
|
@ -437,7 +458,7 @@ class Site(object):
|
|||
include_threads = []
|
||||
for file_relative_path in list(self.content_manager.contents[inner_path].get("includes", {}).keys()):
|
||||
file_inner_path = content_inner_dir + file_relative_path
|
||||
include_thread = gevent.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer)
|
||||
include_thread = self.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer)
|
||||
include_threads.append(include_thread)
|
||||
|
||||
if config.verbose:
|
||||
|
@ -517,9 +538,9 @@ class Site(object):
|
|||
)
|
||||
|
||||
if self.isAddedRecently():
|
||||
gevent.spawn(self.announce, mode="start", force=True)
|
||||
self.spawn(self.announce, mode="start", force=True)
|
||||
else:
|
||||
gevent.spawn(self.announce, mode="update")
|
||||
self.spawn(self.announce, mode="update")
|
||||
|
||||
if check_size: # Check the size first
|
||||
valid = self.downloadContent("content.json", download_files=False) # Just download content.json files
|
||||
|
@ -615,7 +636,7 @@ class Site(object):
|
|||
self.log.info("CheckModifications: %s: %s > %s" % (
|
||||
inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0)
|
||||
))
|
||||
t = gevent.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True)
|
||||
t = self.spawn(self.pooledDownloadContent, modified_contents, only_if_bad=True)
|
||||
threads.append(t)
|
||||
|
||||
if send_back:
|
||||
|
@ -628,7 +649,7 @@ class Site(object):
|
|||
self.log.info("CheckModifications: %s: %s < %s" % (
|
||||
inner_path, res["modified_files"][inner_path], my_modified.get(inner_path, 0)
|
||||
))
|
||||
gevent.spawn(self.publisher, inner_path, [peer], [], 1)
|
||||
self.spawn(self.publisher, inner_path, [peer], [], 1)
|
||||
|
||||
self.log.debug("CheckModifications: Waiting for %s pooledDownloadContent" % len(threads))
|
||||
gevent.joinall(threads)
|
||||
|
@ -685,7 +706,7 @@ class Site(object):
|
|||
|
||||
updaters = []
|
||||
for i in range(updater_limit):
|
||||
updaters.append(gevent.spawn(self.updater, peers_try, queried, need_queries, since))
|
||||
updaters.append(self.spawn(self.updater, peers_try, queried, need_queries, since))
|
||||
|
||||
for r in range(10):
|
||||
gevent.joinall(updaters, timeout=5+r)
|
||||
|
@ -738,13 +759,17 @@ class Site(object):
|
|||
elif check_files:
|
||||
self.updateWebsocket(checking=True)
|
||||
|
||||
if verify_files:
|
||||
self.storage.updateBadFiles(quick_check=False)
|
||||
self.settings["check_files_timestamp"] = time.time()
|
||||
self.settings["verify_files_timestamp"] = time.time()
|
||||
elif check_files:
|
||||
self.storage.updateBadFiles(quick_check=True) # Quick check and mark bad files based on file size
|
||||
self.settings["check_files_timestamp"] = time.time()
|
||||
if check_files:
|
||||
if verify_files:
|
||||
self.storage.updateBadFiles(quick_check=False) # Full-featured checksum verification
|
||||
else:
|
||||
self.storage.updateBadFiles(quick_check=True) # Quick check and mark bad files based on file size
|
||||
# Don't update the timestamps in case of the application being shut down,
|
||||
# so we can make another try next time.
|
||||
if not self.isStopping():
|
||||
self.settings["check_files_timestamp"] = time.time()
|
||||
if verify_files:
|
||||
self.settings["verify_files_timestamp"] = time.time()
|
||||
|
||||
if not self.isServing():
|
||||
self.updateWebsocket(updated=True)
|
||||
|
@ -766,7 +791,7 @@ class Site(object):
|
|||
|
||||
if self.bad_files:
|
||||
self.log.debug("Bad files: %s" % self.bad_files)
|
||||
gevent.spawn(self.retryBadFiles, force=True)
|
||||
self.spawn(self.retryBadFiles, force=True)
|
||||
|
||||
if len(queried) == 0:
|
||||
# Failed to query modifications
|
||||
|
@ -856,7 +881,7 @@ class Site(object):
|
|||
background_publisher = BackgroundPublisher(self, published=published, limit=limit, inner_path=inner_path, diffs=diffs)
|
||||
self.background_publishers[inner_path] = background_publisher
|
||||
|
||||
gevent.spawn(background_publisher.process)
|
||||
self.spawn(background_publisher.process)
|
||||
|
||||
def processBackgroundPublishers(self):
|
||||
with self.background_publishers_lock:
|
||||
|
@ -928,7 +953,7 @@ class Site(object):
|
|||
|
||||
event_done = gevent.event.AsyncResult()
|
||||
for i in range(min(len(peers), limit, threads)):
|
||||
publisher = gevent.spawn(self.publisher, inner_path, peers, published, limit, diffs, event_done, cb_progress)
|
||||
publisher = self.spawn(self.publisher, inner_path, peers, published, limit, diffs, event_done, cb_progress)
|
||||
publishers.append(publisher)
|
||||
|
||||
event_done.get() # Wait for done
|
||||
|
@ -946,7 +971,7 @@ class Site(object):
|
|||
self.addBackgroundPublisher(published=published, limit=limit, inner_path=inner_path, diffs=diffs)
|
||||
|
||||
# Send my hashfield to every connected peer if changed
|
||||
gevent.spawn(self.sendMyHashfield, 100)
|
||||
self.spawn(self.sendMyHashfield, 100)
|
||||
|
||||
return len(published)
|
||||
|
||||
|
@ -1109,7 +1134,7 @@ class Site(object):
|
|||
if not self.content_manager.contents.get("content.json"): # No content.json, download it first!
|
||||
self.log.debug("Need content.json first (inner_path: %s, priority: %s)" % (inner_path, priority))
|
||||
if priority > 0:
|
||||
gevent.spawn(self.announce)
|
||||
self.spawn(self.announce)
|
||||
if inner_path != "content.json": # Prevent double download
|
||||
task = self.worker_manager.addTask("content.json", peer)
|
||||
task["evt"].get()
|
||||
|
@ -1508,6 +1533,9 @@ class Site(object):
|
|||
|
||||
# Send hashfield to peers
|
||||
def sendMyHashfield(self, limit=5):
|
||||
if not self.isServing():
|
||||
return False
|
||||
|
||||
if not self.content_manager.hashfield: # No optional files
|
||||
return False
|
||||
|
||||
|
@ -1525,6 +1553,9 @@ class Site(object):
|
|||
|
||||
# Update hashfield
|
||||
def updateHashfield(self, limit=5):
|
||||
if not self.isServing():
|
||||
return False
|
||||
|
||||
# Return if no optional files
|
||||
if not self.content_manager.hashfield and not self.content_manager.has_optional_files:
|
||||
return False
|
||||
|
|
|
@ -375,7 +375,7 @@ class SiteStorage(object):
|
|||
# Reopen DB to check changes
|
||||
if self.has_db:
|
||||
self.closeDb("New dbschema")
|
||||
gevent.spawn(self.getDb)
|
||||
self.site.spawn(self.getDb)
|
||||
elif not config.disable_db and should_load_to_db and self.has_db: # Load json file to db
|
||||
if config.verbose:
|
||||
self.log.debug("Loading json file to db: %s (file: %s)" % (inner_path, file))
|
||||
|
@ -458,6 +458,10 @@ class SiteStorage(object):
|
|||
i += 1
|
||||
if i % 50 == 0:
|
||||
time.sleep(0.001) # Context switch to avoid gevent hangs
|
||||
|
||||
if self.site.isStopping():
|
||||
break
|
||||
|
||||
if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file
|
||||
back["num_content_missing"] += 1
|
||||
self.log.debug("[MISSING] %s" % content_inner_path)
|
||||
|
|
|
@ -1187,7 +1187,7 @@ class UiWebsocket(object):
|
|||
return False
|
||||
if restart:
|
||||
main.restart_after_shutdown = True
|
||||
main.file_server.stop()
|
||||
main.file_server.stop(ui_websocket=self)
|
||||
main.ui_server.stop()
|
||||
|
||||
if restart:
|
||||
|
|
|
@ -3,17 +3,37 @@ from Debug import Debug
|
|||
|
||||
|
||||
class GreenletManager:
|
||||
def __init__(self):
|
||||
# pool is either gevent.pool.Pool or GreenletManager.
|
||||
# if pool is None, new gevent.pool.Pool() is created.
|
||||
def __init__(self, pool=None):
|
||||
self.greenlets = set()
|
||||
if not pool:
|
||||
pool = gevent.pool.Pool(None)
|
||||
self.pool = pool
|
||||
|
||||
def _spawn_later(self, seconds, *args, **kwargs):
|
||||
# If pool is another GreenletManager, delegate to it.
|
||||
if hasattr(self.pool, 'spawnLater'):
|
||||
return self.pool.spawnLater(seconds, *args, **kwargs)
|
||||
|
||||
# There's gevent.spawn_later(), but there isn't gevent.pool.Pool.spawn_later().
|
||||
# Doing manually.
|
||||
greenlet = self.pool.greenlet_class(*args, **kwargs)
|
||||
self.pool.add(greenlet)
|
||||
greenlet.start_later(seconds)
|
||||
return greenlet
|
||||
|
||||
def _spawn(self, *args, **kwargs):
|
||||
return self.pool.spawn(*args, **kwargs)
|
||||
|
||||
def spawnLater(self, *args, **kwargs):
|
||||
greenlet = gevent.spawn_later(*args, **kwargs)
|
||||
greenlet = self._spawn_later(*args, **kwargs)
|
||||
greenlet.link(lambda greenlet: self.greenlets.remove(greenlet))
|
||||
self.greenlets.add(greenlet)
|
||||
return greenlet
|
||||
|
||||
def spawn(self, *args, **kwargs):
|
||||
greenlet = gevent.spawn(*args, **kwargs)
|
||||
greenlet = self._spawn(*args, **kwargs)
|
||||
greenlet.link(lambda greenlet: self.greenlets.remove(greenlet))
|
||||
self.greenlets.add(greenlet)
|
||||
return greenlet
|
||||
|
|
Loading…
Reference in a new issue