diff --git a/plugins/OptionalManager/ContentDbPlugin.py b/plugins/OptionalManager/ContentDbPlugin.py index e7945d93..fd28092b 100644 --- a/plugins/OptionalManager/ContentDbPlugin.py +++ b/plugins/OptionalManager/ContentDbPlugin.py @@ -61,7 +61,7 @@ class ContentDbPlugin(object): if self.need_filling: self.fillTableFileOptional(site) if not self.optional_files_loading: - gevent.spawn_later(1, self.loadFilesOptional) + site.greenlet_manager.spawnLater(1, self.loadFilesOptional) self.optional_files_loading = True def checkTables(self): diff --git a/plugins/PeerDb/PeerDbPlugin.py b/plugins/PeerDb/PeerDbPlugin.py index b4c8787b..addd1d6f 100644 --- a/plugins/PeerDb/PeerDbPlugin.py +++ b/plugins/PeerDb/PeerDbPlugin.py @@ -70,7 +70,7 @@ class ContentDbPlugin(object): def savePeers(self, site, spawn=False): if spawn: # Save peers every hour (+random some secs to not update very site at same time) - gevent.spawn_later(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True) + site.greenlet_manager.spawnLater(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True) if not site.peers: site.log.debug("Peers not saved: No peers found") return @@ -89,8 +89,8 @@ class ContentDbPlugin(object): def initSite(self, site): super(ContentDbPlugin, self).initSite(site) - gevent.spawn_later(0.5, self.loadPeers, site) - gevent.spawn_later(60*60, self.savePeers, site, spawn=True) + site.greenlet_manager.spawnLater(0.5, self.loadPeers, site) + site.greenlet_manager.spawnLater(60*60, self.savePeers, site, spawn=True) def saveAllPeers(self): for site in list(self.sites.values()): diff --git a/src/Site/Site.py b/src/Site/Site.py index 59b5745f..807507ee 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -22,6 +22,7 @@ from .SiteStorage import SiteStorage from Crypt import CryptHash from util import helper from util import Diff +from util import GreenletManager from Plugin import PluginManager from File import FileServer from .SiteAnnouncer import SiteAnnouncer @@ -43,6 +44,7 @@ class Site(object): self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers_recent = collections.deque(maxlen=100) self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself) + self.greenlet_manager = GreenletManager.GreenletManager() # Running greenlets self.worker_manager = WorkerManager(self) # Handle site download from other peers self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept) self.content_updated = None # Content.js update time @@ -1026,14 +1028,21 @@ class Site(object): return self.settings.get("autodownloadoptional") def delete(self): + self.log.debug("Deleting site...") + s = time.time() self.settings["serving"] = False self.saveSettings() + num_greenlets = self.greenlet_manager.stopGreenlets("Site %s deleted" % self.address) self.worker_manager.running = False - self.worker_manager.stopWorkers() - self.storage.deleteFiles() - self.updateWebsocket(deleted=True) - self.content_manager.contents.db.deleteSite(self) + num_workers = self.worker_manager.stopWorkers() SiteManager.site_manager.delete(self.address) + self.content_manager.contents.db.deleteSite(self) + self.updateWebsocket(deleted=True) + self.storage.deleteFiles() + self.log.debug( + "Deleted site in %.3fs (greenlets: %s, workers: %s)" % + (time.time() - s, num_greenlets, num_workers) + ) # - Events - diff --git a/src/Site/SiteAnnouncer.py b/src/Site/SiteAnnouncer.py index 0d6fc9b1..78e47731 100644 --- a/src/Site/SiteAnnouncer.py +++ b/src/Site/SiteAnnouncer.py @@ -95,7 +95,7 @@ class SiteAnnouncer(object): if config.verbose: self.site.log.debug("Tracker %s looks unreliable, announce skipped (error: %s)" % (tracker, tracker_stats["num_error"])) continue - thread = gevent.spawn(self.announceTracker, tracker, mode=mode) + thread = self.site.greenlet_manager.spawn(self.announceTracker, tracker, mode=mode) threads.append(thread) thread.tracker = tracker @@ -135,7 +135,7 @@ class SiteAnnouncer(object): self.site.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s)) if len(threads) == 1 and mode != "start": # Move to next tracker self.site.log.debug("Tracker failed, skipping to next one...") - gevent.spawn_later(1.0, self.announce, force=force, mode=mode, pex=pex) + self.site.greenlet_manager.spawnLater(1.0, self.announce, force=force, mode=mode, pex=pex) self.updateWebsocket(trackers="announced") diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 236b048a..c35b4b93 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -25,7 +25,7 @@ class WorkerManager(object): self.running = True self.time_task_added = 0 self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short) - self.process_taskchecker = gevent.spawn(self.checkTasks) + self.site.greenlet_manager.spawn(self.checkTasks) def __str__(self): return "WorkerManager %s" % self.site.address_short @@ -308,7 +308,7 @@ class WorkerManager(object): if not peers: peers = self.site.getConnectablePeers() for peer in peers: - threads.append(gevent.spawn(peer.updateHashfield, force=find_more)) + threads.append(self.site.greenlet_manager.spawn(peer.updateHashfield, force=find_more)) gevent.joinall(threads, timeout=5) if time_tasks != self.time_task_added: # New task added since start @@ -340,7 +340,7 @@ class WorkerManager(object): peers = self.site.getConnectablePeers(ignore=self.asked_peers) for peer in peers: - threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids))) + threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids))) self.asked_peers.append(peer.key) for i in range(5): @@ -379,7 +379,7 @@ class WorkerManager(object): peers = self.site.getConnectablePeers(ignore=self.asked_peers) for peer in peers: - threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids))) + threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids))) self.asked_peers.append(peer.key) gevent.joinall(threads, timeout=15) @@ -397,17 +397,20 @@ class WorkerManager(object): if time_tasks != self.time_task_added: # New task added since start self.log.debug("New task since start, restarting...") - gevent.spawn_later(0.1, self.startFindOptional) + self.site.greenlet_manager.spawnLater(0.1, self.startFindOptional) else: self.log.debug("startFindOptional ended") # Stop all worker def stopWorkers(self): + num = 0 for worker in list(self.workers.values()): worker.stop() + num += 1 tasks = self.tasks[:] # Copy for task in tasks: # Mark all current task as failed self.failTask(task) + return num # Find workers by task def findWorkers(self, task): @@ -554,7 +557,7 @@ class WorkerManager(object): self.site.onFileDone(task["inner_path"]) task["evt"].set(True) if not self.tasks: - gevent.spawn(self.checkComplete) + self.site.greenlet_manager.spawn(self.checkComplete) # Mark a task failed def failTask(self, task): diff --git a/src/util/GreenletManager.py b/src/util/GreenletManager.py new file mode 100644 index 00000000..6379f97c --- /dev/null +++ b/src/util/GreenletManager.py @@ -0,0 +1,23 @@ +import gevent + + +class GreenletManager: + def __init__(self): + self.greenlets = set() + + def spawnLater(self, *args, **kwargs): + greenlet = gevent.spawn_later(*args, **kwargs) + greenlet.link(lambda greenlet: self.greenlets.remove(greenlet)) + self.greenlets.add(greenlet) + return greenlet + + def spawn(self, *args, **kwargs): + greenlet = gevent.spawn(*args, **kwargs) + greenlet.link(lambda greenlet: self.greenlets.remove(greenlet)) + self.greenlets.add(greenlet) + return greenlet + + def stopGreenlets(self, reason="Stopping greenlets"): + num = len(self.greenlets) + gevent.killall(list(self.greenlets)) + return num