file download queue priority by browser request, newer content json log, peer remove key error fix, peer request error also a connection error, new sites created with own flag
This commit is contained in:
parent
3f974e0bc7
commit
efb1dc3210
6 changed files with 38 additions and 21 deletions
|
@ -11,6 +11,8 @@ class Peer:
|
|||
self.ip = ip
|
||||
self.port = port
|
||||
self.site = site
|
||||
self.key = "%s:%s" % (ip, port)
|
||||
|
||||
self.socket = None
|
||||
self.last_found = None
|
||||
self.added = time.time()
|
||||
|
@ -43,6 +45,7 @@ class Peer:
|
|||
response = msgpack.unpackb(self.socket.recv())
|
||||
if "error" in response:
|
||||
self.log.debug("%s %s error: %s" % (cmd, params, response["error"]))
|
||||
self.onConnectionError()
|
||||
else: # Successful request, reset connection error num
|
||||
self.connection_error = 0
|
||||
return response
|
||||
|
|
|
@ -179,7 +179,7 @@ class Site:
|
|||
|
||||
|
||||
# Check and download if file not exits
|
||||
def needFile(self, inner_path, update=False, blocking=True, peer=None):
|
||||
def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0):
|
||||
if os.path.isfile(self.getPath(inner_path)) and not update: # File exits, no need to do anything
|
||||
return True
|
||||
elif self.settings["serving"] == False: # Site not serving
|
||||
|
@ -189,12 +189,12 @@ class Site:
|
|||
self.log.debug("Need content.json first")
|
||||
self.announce()
|
||||
if inner_path != "content.json": # Prevent double download
|
||||
task = self.worker_manager.addTask("content.json", peer)
|
||||
task = self.worker_manager.addTask("content.json", peer, priority=99999)
|
||||
task.get()
|
||||
self.loadContent()
|
||||
if not self.content: return False
|
||||
|
||||
task = self.worker_manager.addTask(inner_path, peer)
|
||||
task = self.worker_manager.addTask(inner_path, peer, priority=priority)
|
||||
if blocking:
|
||||
return task.get()
|
||||
else:
|
||||
|
@ -330,6 +330,7 @@ class Site:
|
|||
if self.content["modified"] == content["modified"]: # Ignore, have the same content.json
|
||||
return None
|
||||
elif self.content["modified"] > content["modified"]: # We have newer
|
||||
self.log.debug("We have newer content.json (Our: %s, Sent: %s)" % (self.content["modified"], content["modified"]))
|
||||
return False
|
||||
if content["modified"] > time.time()+60*60*24: # Content modified in the far future (allow 1 day window)
|
||||
self.log.error("Content.json modify is in the future!")
|
||||
|
|
|
@ -158,7 +158,7 @@ class UiRequest:
|
|||
else: # File not exits, try to download
|
||||
site = SiteManager.need(match.group("site"), all_file=False)
|
||||
self.sendHeader(content_type=self.getContentType(file_path)) # ?? Get Exception without this
|
||||
result = site.needFile(match.group("inner_path")) # Wait until file downloads
|
||||
result = site.needFile(match.group("inner_path"), priority=1) # Wait until file downloads
|
||||
return self.actionFile(file_path)
|
||||
|
||||
else: # Bad url
|
||||
|
|
|
@ -6,6 +6,7 @@ class UiWebsocket:
|
|||
def __init__(self, ws, site, server):
|
||||
self.ws = ws
|
||||
self.site = site
|
||||
self.log = site.log
|
||||
self.server = server
|
||||
self.next_message_id = 1
|
||||
self.waiting_cb = {} # Waiting for callback. Key: message_id, Value: function pointer
|
||||
|
@ -35,7 +36,7 @@ class UiWebsocket:
|
|||
if config.debug: # Allow websocket errors to appear on /Debug
|
||||
import sys
|
||||
sys.modules["src.main"].DebugHook.handleError()
|
||||
self.site.log.error("WebSocket error: %s" % err)
|
||||
self.log.error("WebSocket error: %s" % err)
|
||||
return "Bye."
|
||||
|
||||
|
||||
|
@ -64,9 +65,12 @@ class UiWebsocket:
|
|||
def send(self, message, cb = None):
|
||||
message["id"] = self.next_message_id # Add message id to allow response
|
||||
self.next_message_id += 1
|
||||
self.ws.send(json.dumps(message))
|
||||
if cb: # Callback after client responsed
|
||||
self.waiting_cb[message["id"]] = cb
|
||||
try:
|
||||
self.ws.send(json.dumps(message))
|
||||
if cb: # Callback after client responsed
|
||||
self.waiting_cb[message["id"]] = cb
|
||||
except Exception, err:
|
||||
self.log.debug("Websocket send error: %s" % err)
|
||||
|
||||
|
||||
# Handle incoming messages
|
||||
|
@ -107,7 +111,7 @@ class UiWebsocket:
|
|||
if req["to"] in self.waiting_cb:
|
||||
self.waiting_cb(req["result"]) # Call callback function
|
||||
else:
|
||||
self.site.log.error("Websocket callback not found: %s" % req)
|
||||
self.log.error("Websocket callback not found: %s" % req)
|
||||
|
||||
|
||||
# Send a simple pong answer
|
||||
|
|
|
@ -8,7 +8,7 @@ class WorkerManager:
|
|||
def __init__(self, site):
|
||||
self.site = site
|
||||
self.workers = {} # Key: ip:port, Value: Worker.Worker
|
||||
self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers}
|
||||
self.tasks = [] # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers, "priority": 0}
|
||||
self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
|
||||
self.process_taskchecker = gevent.spawn(self.checkTasks)
|
||||
|
||||
|
@ -40,15 +40,19 @@ class WorkerManager:
|
|||
continue # One reannounce per loop
|
||||
|
||||
|
||||
# Tasks sorted by this
|
||||
def taskSorter(self, task):
|
||||
if task["inner_path"] == "content.json": return 9999 # Content.json always prority
|
||||
if task["inner_path"] == "index.html": return 9998 # index.html also important
|
||||
return task["priority"]-task["workers_num"] # Prefer more priority and less workers
|
||||
|
||||
|
||||
# Returns the next free or less worked task
|
||||
def getTask(self, peer, only_free=False):
|
||||
best_task = None
|
||||
for task in self.tasks: # Find out the task with lowest worker number
|
||||
def getTask(self, peer):
|
||||
self.tasks.sort(key=self.taskSorter, reverse=True) # Sort tasks by priority and worker numbers
|
||||
for task in self.tasks: # Find a task
|
||||
if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task
|
||||
if task["inner_path"] == "content.json": return task # Content.json always prority
|
||||
if not best_task or task["workers_num"] < best_task["workers_num"]: # If task has lower worker number then its better
|
||||
best_task = task
|
||||
return best_task
|
||||
return task
|
||||
|
||||
|
||||
# New peers added to site
|
||||
|
@ -76,21 +80,24 @@ class WorkerManager:
|
|||
if worker.task == task: workers.append(worker)
|
||||
return workers
|
||||
|
||||
|
||||
# Ends and remove a worker
|
||||
def removeWorker(self, worker):
|
||||
worker.running = False
|
||||
del(self.workers[worker.key])
|
||||
if worker.key in self.workers: del(self.workers[worker.key])
|
||||
self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), MAX_WORKERS))
|
||||
|
||||
|
||||
# Create new task and return asyncresult
|
||||
def addTask(self, inner_path, peer=None):
|
||||
def addTask(self, inner_path, peer=None, priority = 0):
|
||||
self.site.onFileStart(inner_path) # First task, trigger site download started
|
||||
task = self.findTask(inner_path)
|
||||
if task: # Already has task for that file
|
||||
if peer and task["peers"]: # This peer has new version too
|
||||
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
||||
task["peers"].append(peer)
|
||||
self.startWorkers()
|
||||
if priority:
|
||||
task["priority"] += priority # Boost on priority
|
||||
return task["evt"]
|
||||
else: # No task for that file yet
|
||||
evt = gevent.event.AsyncResult()
|
||||
|
@ -98,7 +105,7 @@ class WorkerManager:
|
|||
peers = [peer] # Only download from this peer
|
||||
else:
|
||||
peers = None
|
||||
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers}
|
||||
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_start": time.time(), "peers": peers, "priority": priority}
|
||||
self.tasks.append(task)
|
||||
self.log.debug("New task: %s" % task)
|
||||
self.startWorkers()
|
||||
|
|
|
@ -81,6 +81,8 @@ def siteCreate():
|
|||
logging.info("Creating content.json...")
|
||||
site = Site(address)
|
||||
site.signContent(privatekey)
|
||||
site.settings["own"] = True
|
||||
site.saveSettings()
|
||||
|
||||
logging.info("Site created!")
|
||||
|
||||
|
|
Loading…
Reference in a new issue