Be more persistent in delivering site updates.

This commit is contained in:
Vadim Ushakov 2020-11-04 16:08:01 +07:00
parent 6c8b059f57
commit 7354d712e0
2 changed files with 120 additions and 7 deletions

View file

@ -165,7 +165,7 @@ class FileRequest(object):
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer
# On complete publish to other peers # On complete publish to other peers
diffs = params.get("diffs", {}) diffs = params.get("diffs", {})
site.onComplete.once(lambda: site.publish(inner_path=inner_path, diffs=diffs, limit=3), "publish_%s" % inner_path) site.onComplete.once(lambda: site.publish(inner_path=inner_path, diffs=diffs), "publish_%s" % inner_path)
# Load new content file and download changed files in new thread # Load new content file and download changed files in new thread
def downloader(): def downloader():

View file

@ -11,6 +11,7 @@ import base64
import gevent import gevent
import gevent.pool import gevent.pool
import gevent.lock
import util import util
from Config import config from Config import config
@ -61,6 +62,90 @@ class ScaledTimeoutHandler:
else: else:
return None return None
class BackgroundPublisher:
def __init__(self, site, published=[], limit=5, inner_path="content.json", diffs={}):
self.site = site
self.threads = gevent.pool.Pool(None)
self.inner_path = inner_path
self.stages = [
{
"interval": ScaledTimeoutHandler(60, 60),
"max_tries": 2,
"tries": 0,
"limit": 0,
"done": False
},
{
"interval": ScaledTimeoutHandler(60 * 10, 60 * 10),
"max_tries": 5,
"tries": 0,
"limit": 0,
"done": False
}
]
self.reinit(published=published, limit=limit, diffs=diffs)
def reinit(self, published=[], limit=5, diffs={}):
self.threads.kill()
self.published = published
self.diffs = diffs
i = 0
for stage in self.stages:
stage["nr"] = i
stage["limit"] = limit * (2 + i)
stage["tries"] = False
stage["done"] = False
stage["thread"] = None
if i > 0:
stage["interval"].done()
i += 1
def isStageComplete(self, stage):
if not stage["done"]:
stage["done"] = len(self.published) >= stage["limit"]
if not stage["done"]:
stage["done"] = stage["tries"] >= stage["max_tries"]
return stage["done"]
def isComplete(self):
for stage in self.stages:
if not self.isStageComplete(stage):
return False
return True
def process(self):
for stage in self.stages:
if not self.isStageComplete(stage):
self.processStage(stage)
break
return self.isComplete()
def processStage(self, stage):
if not stage["interval"].isExpired(0):
return
if len(self.site.peers) < stage["limit"]:
self.site.announce(mode="more")
if not stage["thread"]:
peers = list(self.site.peers.values())
random.shuffle(peers)
stage["thread"] = self.threads.spawn(self.site.publisher,
self.inner_path, peers, self.published, stage["limit"], diffs=self.diffs, max_retries=1)
stage["tries"] += 1
stage["interval"].done()
self.site.log.info("Background publisher: Stage #%s: %s published to %s/%s peers",
stage["nr"], self.inner_path, len(self.published), stage["limit"])
def finalize(self):
self.threads.kill()
self.site.log.info("Background publisher: Published %s to %s peers", self.inner_path, len(self.published))
@PluginManager.acceptPlugins @PluginManager.acceptPlugins
class Site(object): class Site(object):
@ -81,6 +166,9 @@ class Site(object):
scaler=self.getActivityRating) scaler=self.getActivityRating)
] ]
self.background_publishers = {}
self.background_publishers_lock = gevent.lock.RLock()
self.content = None # Load content.json self.content = None # Load content.json
self.peers = {} # Key: ip:port, Value: Peer.Peer self.peers = {} # Key: ip:port, Value: Peer.Peer
self.peers_recent = collections.deque(maxlen=150) self.peers_recent = collections.deque(maxlen=150)
@ -328,6 +416,11 @@ class Site(object):
inner_path, time.time() - s, len(self.worker_manager.tasks) inner_path, time.time() - s, len(self.worker_manager.tasks)
)) ))
# If no file tasks have been started, worker_manager.checkComplete()
# never called. So call it explicitly.
self.greenlet_manager.spawn(self.worker_manager.checkComplete)
return True return True
# Return bad files with less than 3 retry # Return bad files with less than 3 retry
@ -662,7 +755,7 @@ class Site(object):
gevent.joinall(content_threads) gevent.joinall(content_threads)
# Publish worker # Publish worker
def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None): def publisher(self, inner_path, peers, published, limit, diffs={}, event_done=None, cb_progress=None, max_retries=2):
file_size = self.storage.getSize(inner_path) file_size = self.storage.getSize(inner_path)
content_json_modified = self.content_manager.contents[inner_path]["modified"] content_json_modified = self.content_manager.contents[inner_path]["modified"]
body = self.storage.read(inner_path) body = self.storage.read(inner_path)
@ -687,7 +780,7 @@ class Site(object):
timeout = 10 + int(file_size / 1024) timeout = 10 + int(file_size / 1024)
result = {"exception": "Timeout"} result = {"exception": "Timeout"}
for retry in range(2): for retry in range(max_retries):
try: try:
with gevent.Timeout(timeout, False): with gevent.Timeout(timeout, False):
result = peer.publish(self.address, inner_path, body, content_json_modified, diffs) result = peer.publish(self.address, inner_path, body, content_json_modified, diffs)
@ -708,6 +801,25 @@ class Site(object):
self.log.info("[FAILED] %s: %s" % (peer.key, result)) self.log.info("[FAILED] %s: %s" % (peer.key, result))
time.sleep(0.01) time.sleep(0.01)
def addBackgroundPublisher(self, published=[], limit=5, inner_path="content.json", diffs={}):
with self.background_publishers_lock:
if self.background_publishers.get(inner_path, None):
background_publisher = self.background_publishers[inner_path]
background_publisher.reinit(published=published, limit=limit, diffs=diffs)
else:
background_publisher = BackgroundPublisher(self, published=published, limit=limit, inner_path=inner_path, diffs=diffs)
self.background_publishers[inner_path] = background_publisher
gevent.spawn(background_publisher.process)
def processBackgroundPublishers(self):
with self.background_publishers_lock:
for inner_path, background_publisher in list(self.background_publishers.items()):
background_publisher.process()
if background_publisher.isComplete():
background_publisher.finalize()
del self.background_publishers[inner_path]
# Update content.json on peers # Update content.json on peers
@util.Noparallel() @util.Noparallel()
def publish(self, limit="default", inner_path="content.json", diffs={}, cb_progress=None): def publish(self, limit="default", inner_path="content.json", diffs={}, cb_progress=None):
@ -752,12 +864,11 @@ class Site(object):
# Publish more peers in the backgroup # Publish more peers in the backgroup
self.log.info( self.log.info(
"Published %s to %s peers, publishing to %s more peers in the background" % "Published %s to %s peers, publishing to more peers in the background" %
(inner_path, len(published), limit) (inner_path, len(published))
) )
for thread in range(2): self.addBackgroundPublisher(published=published, limit=limit, inner_path=inner_path, diffs=diffs)
gevent.spawn(self.publisher, inner_path, peers, published, limit=limit * 2, diffs=diffs)
# Send my hashfield to every connected peer if changed # Send my hashfield to every connected peer if changed
gevent.spawn(self.sendMyHashfield, 100) gevent.spawn(self.sendMyHashfield, 100)
@ -1296,6 +1407,8 @@ class Site(object):
with gevent.Timeout(10, exception=False): with gevent.Timeout(10, exception=False):
self.announcer.announcePex() self.announcer.announcePex()
self.processBackgroundPublishers()
self.update() self.update()
return True return True