Track and stop site connected greenlets on delete
This commit is contained in:
parent
2ad3493fb0
commit
1c607645c7
6 changed files with 51 additions and 16 deletions
|
@ -61,7 +61,7 @@ class ContentDbPlugin(object):
|
||||||
if self.need_filling:
|
if self.need_filling:
|
||||||
self.fillTableFileOptional(site)
|
self.fillTableFileOptional(site)
|
||||||
if not self.optional_files_loading:
|
if not self.optional_files_loading:
|
||||||
gevent.spawn_later(1, self.loadFilesOptional)
|
site.greenlet_manager.spawnLater(1, self.loadFilesOptional)
|
||||||
self.optional_files_loading = True
|
self.optional_files_loading = True
|
||||||
|
|
||||||
def checkTables(self):
|
def checkTables(self):
|
||||||
|
|
|
@ -70,7 +70,7 @@ class ContentDbPlugin(object):
|
||||||
def savePeers(self, site, spawn=False):
|
def savePeers(self, site, spawn=False):
|
||||||
if spawn:
|
if spawn:
|
||||||
# Save peers every hour (+random some secs to not update very site at same time)
|
# Save peers every hour (+random some secs to not update very site at same time)
|
||||||
gevent.spawn_later(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True)
|
site.greenlet_manager.spawnLater(60 * 60 + random.randint(0, 60), self.savePeers, site, spawn=True)
|
||||||
if not site.peers:
|
if not site.peers:
|
||||||
site.log.debug("Peers not saved: No peers found")
|
site.log.debug("Peers not saved: No peers found")
|
||||||
return
|
return
|
||||||
|
@ -89,8 +89,8 @@ class ContentDbPlugin(object):
|
||||||
|
|
||||||
def initSite(self, site):
|
def initSite(self, site):
|
||||||
super(ContentDbPlugin, self).initSite(site)
|
super(ContentDbPlugin, self).initSite(site)
|
||||||
gevent.spawn_later(0.5, self.loadPeers, site)
|
site.greenlet_manager.spawnLater(0.5, self.loadPeers, site)
|
||||||
gevent.spawn_later(60*60, self.savePeers, site, spawn=True)
|
site.greenlet_manager.spawnLater(60*60, self.savePeers, site, spawn=True)
|
||||||
|
|
||||||
def saveAllPeers(self):
|
def saveAllPeers(self):
|
||||||
for site in list(self.sites.values()):
|
for site in list(self.sites.values()):
|
||||||
|
|
|
@ -22,6 +22,7 @@ from .SiteStorage import SiteStorage
|
||||||
from Crypt import CryptHash
|
from Crypt import CryptHash
|
||||||
from util import helper
|
from util import helper
|
||||||
from util import Diff
|
from util import Diff
|
||||||
|
from util import GreenletManager
|
||||||
from Plugin import PluginManager
|
from Plugin import PluginManager
|
||||||
from File import FileServer
|
from File import FileServer
|
||||||
from .SiteAnnouncer import SiteAnnouncer
|
from .SiteAnnouncer import SiteAnnouncer
|
||||||
|
@ -43,6 +44,7 @@ class Site(object):
|
||||||
self.peers = {} # Key: ip:port, Value: Peer.Peer
|
self.peers = {} # Key: ip:port, Value: Peer.Peer
|
||||||
self.peers_recent = collections.deque(maxlen=100)
|
self.peers_recent = collections.deque(maxlen=100)
|
||||||
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
|
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
|
||||||
|
self.greenlet_manager = GreenletManager.GreenletManager() # Running greenlets
|
||||||
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
||||||
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
||||||
self.content_updated = None # Content.js update time
|
self.content_updated = None # Content.js update time
|
||||||
|
@ -1026,14 +1028,21 @@ class Site(object):
|
||||||
return self.settings.get("autodownloadoptional")
|
return self.settings.get("autodownloadoptional")
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
|
self.log.debug("Deleting site...")
|
||||||
|
s = time.time()
|
||||||
self.settings["serving"] = False
|
self.settings["serving"] = False
|
||||||
self.saveSettings()
|
self.saveSettings()
|
||||||
|
num_greenlets = self.greenlet_manager.stopGreenlets("Site %s deleted" % self.address)
|
||||||
self.worker_manager.running = False
|
self.worker_manager.running = False
|
||||||
self.worker_manager.stopWorkers()
|
num_workers = self.worker_manager.stopWorkers()
|
||||||
self.storage.deleteFiles()
|
|
||||||
self.updateWebsocket(deleted=True)
|
|
||||||
self.content_manager.contents.db.deleteSite(self)
|
|
||||||
SiteManager.site_manager.delete(self.address)
|
SiteManager.site_manager.delete(self.address)
|
||||||
|
self.content_manager.contents.db.deleteSite(self)
|
||||||
|
self.updateWebsocket(deleted=True)
|
||||||
|
self.storage.deleteFiles()
|
||||||
|
self.log.debug(
|
||||||
|
"Deleted site in %.3fs (greenlets: %s, workers: %s)" %
|
||||||
|
(time.time() - s, num_greenlets, num_workers)
|
||||||
|
)
|
||||||
|
|
||||||
# - Events -
|
# - Events -
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ class SiteAnnouncer(object):
|
||||||
if config.verbose:
|
if config.verbose:
|
||||||
self.site.log.debug("Tracker %s looks unreliable, announce skipped (error: %s)" % (tracker, tracker_stats["num_error"]))
|
self.site.log.debug("Tracker %s looks unreliable, announce skipped (error: %s)" % (tracker, tracker_stats["num_error"]))
|
||||||
continue
|
continue
|
||||||
thread = gevent.spawn(self.announceTracker, tracker, mode=mode)
|
thread = self.site.greenlet_manager.spawn(self.announceTracker, tracker, mode=mode)
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
thread.tracker = tracker
|
thread.tracker = tracker
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ class SiteAnnouncer(object):
|
||||||
self.site.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s))
|
self.site.log.error("Announce to %s trackers in %.3fs, failed" % (len(threads), time.time() - s))
|
||||||
if len(threads) == 1 and mode != "start": # Move to next tracker
|
if len(threads) == 1 and mode != "start": # Move to next tracker
|
||||||
self.site.log.debug("Tracker failed, skipping to next one...")
|
self.site.log.debug("Tracker failed, skipping to next one...")
|
||||||
gevent.spawn_later(1.0, self.announce, force=force, mode=mode, pex=pex)
|
self.site.greenlet_manager.spawnLater(1.0, self.announce, force=force, mode=mode, pex=pex)
|
||||||
|
|
||||||
self.updateWebsocket(trackers="announced")
|
self.updateWebsocket(trackers="announced")
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class WorkerManager(object):
|
||||||
self.running = True
|
self.running = True
|
||||||
self.time_task_added = 0
|
self.time_task_added = 0
|
||||||
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
|
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
|
||||||
self.process_taskchecker = gevent.spawn(self.checkTasks)
|
self.site.greenlet_manager.spawn(self.checkTasks)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "WorkerManager %s" % self.site.address_short
|
return "WorkerManager %s" % self.site.address_short
|
||||||
|
@ -308,7 +308,7 @@ class WorkerManager(object):
|
||||||
if not peers:
|
if not peers:
|
||||||
peers = self.site.getConnectablePeers()
|
peers = self.site.getConnectablePeers()
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
threads.append(gevent.spawn(peer.updateHashfield, force=find_more))
|
threads.append(self.site.greenlet_manager.spawn(peer.updateHashfield, force=find_more))
|
||||||
gevent.joinall(threads, timeout=5)
|
gevent.joinall(threads, timeout=5)
|
||||||
|
|
||||||
if time_tasks != self.time_task_added: # New task added since start
|
if time_tasks != self.time_task_added: # New task added since start
|
||||||
|
@ -340,7 +340,7 @@ class WorkerManager(object):
|
||||||
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
||||||
|
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids)))
|
||||||
self.asked_peers.append(peer.key)
|
self.asked_peers.append(peer.key)
|
||||||
|
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
|
@ -379,7 +379,7 @@ class WorkerManager(object):
|
||||||
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
peers = self.site.getConnectablePeers(ignore=self.asked_peers)
|
||||||
|
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids)))
|
||||||
self.asked_peers.append(peer.key)
|
self.asked_peers.append(peer.key)
|
||||||
|
|
||||||
gevent.joinall(threads, timeout=15)
|
gevent.joinall(threads, timeout=15)
|
||||||
|
@ -397,17 +397,20 @@ class WorkerManager(object):
|
||||||
|
|
||||||
if time_tasks != self.time_task_added: # New task added since start
|
if time_tasks != self.time_task_added: # New task added since start
|
||||||
self.log.debug("New task since start, restarting...")
|
self.log.debug("New task since start, restarting...")
|
||||||
gevent.spawn_later(0.1, self.startFindOptional)
|
self.site.greenlet_manager.spawnLater(0.1, self.startFindOptional)
|
||||||
else:
|
else:
|
||||||
self.log.debug("startFindOptional ended")
|
self.log.debug("startFindOptional ended")
|
||||||
|
|
||||||
# Stop all worker
|
# Stop all worker
|
||||||
def stopWorkers(self):
|
def stopWorkers(self):
|
||||||
|
num = 0
|
||||||
for worker in list(self.workers.values()):
|
for worker in list(self.workers.values()):
|
||||||
worker.stop()
|
worker.stop()
|
||||||
|
num += 1
|
||||||
tasks = self.tasks[:] # Copy
|
tasks = self.tasks[:] # Copy
|
||||||
for task in tasks: # Mark all current task as failed
|
for task in tasks: # Mark all current task as failed
|
||||||
self.failTask(task)
|
self.failTask(task)
|
||||||
|
return num
|
||||||
|
|
||||||
# Find workers by task
|
# Find workers by task
|
||||||
def findWorkers(self, task):
|
def findWorkers(self, task):
|
||||||
|
@ -554,7 +557,7 @@ class WorkerManager(object):
|
||||||
self.site.onFileDone(task["inner_path"])
|
self.site.onFileDone(task["inner_path"])
|
||||||
task["evt"].set(True)
|
task["evt"].set(True)
|
||||||
if not self.tasks:
|
if not self.tasks:
|
||||||
gevent.spawn(self.checkComplete)
|
self.site.greenlet_manager.spawn(self.checkComplete)
|
||||||
|
|
||||||
# Mark a task failed
|
# Mark a task failed
|
||||||
def failTask(self, task):
|
def failTask(self, task):
|
||||||
|
|
23
src/util/GreenletManager.py
Normal file
23
src/util/GreenletManager.py
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
import gevent
|
||||||
|
|
||||||
|
|
||||||
|
class GreenletManager:
|
||||||
|
def __init__(self):
|
||||||
|
self.greenlets = set()
|
||||||
|
|
||||||
|
def spawnLater(self, *args, **kwargs):
|
||||||
|
greenlet = gevent.spawn_later(*args, **kwargs)
|
||||||
|
greenlet.link(lambda greenlet: self.greenlets.remove(greenlet))
|
||||||
|
self.greenlets.add(greenlet)
|
||||||
|
return greenlet
|
||||||
|
|
||||||
|
def spawn(self, *args, **kwargs):
|
||||||
|
greenlet = gevent.spawn(*args, **kwargs)
|
||||||
|
greenlet.link(lambda greenlet: self.greenlets.remove(greenlet))
|
||||||
|
self.greenlets.add(greenlet)
|
||||||
|
return greenlet
|
||||||
|
|
||||||
|
def stopGreenlets(self, reason="Stopping greenlets"):
|
||||||
|
num = len(self.greenlets)
|
||||||
|
gevent.killall(list(self.greenlets))
|
||||||
|
return num
|
Loading…
Reference in a new issue