diff --git a/src/Config.py b/src/Config.py index 77f2e33b..627a6d4a 100644 --- a/src/Config.py +++ b/src/Config.py @@ -4,7 +4,7 @@ import ConfigParser class Config(object): def __init__(self): self.version = "0.2.9" - self.rev = 116 + self.rev = 119 self.parser = self.createArguments() argv = sys.argv[:] # Copy command line arguments argv = self.parseConfig(argv) # Add arguments from config file diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 4decdc62..51c616a6 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -117,6 +117,9 @@ class Connection: # Message loop for connection def messageLoop(self, firstchar=None): + if not self.sock: + self.log("Socket error: No socket found") + return False sock = self.sock try: if not firstchar: firstchar = sock.recv(1) @@ -317,4 +320,5 @@ class Connection: # Little cleanup del self.unpacker del self.sock + self.sock = None self.unpacker = None diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 77afa6a6..fdec9348 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -14,6 +14,7 @@ class ConnectionServer: self.port = port self.last_connection_id = 1 # Connection id incrementer self.log = logging.getLogger("ConnServer") + self.port_opened = None self.connections = [] # Connections self.ips = {} # Connection by ip diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index abb80871..a53aa85e 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -2,6 +2,7 @@ import os, msgpack, shutil, gevent, socket, struct, random from cStringIO import StringIO from Debug import Debug from Config import config +from util import RateLimit FILE_BUFF = 1024*512 @@ -14,6 +15,7 @@ class FileRequest: self.req_id = None self.sites = self.server.sites self.log = server.log + self.responded = False # Responded to the request def unpackAddress(self, packed): @@ -21,24 +23,34 @@ class FileRequest: def send(self, msg): - self.connection.send(msg) + if not self.connection.closed: + self.connection.send(msg) def response(self, msg): + if self.responded: + self.log.debug("Req id %s already responded" % self.req_id) + return if not isinstance(msg, dict): # If msg not a dict create a {"body": msg} msg = {"body": msg} msg["cmd"] = "response" msg["to"] = self.req_id + self.responded = True self.send(msg) # Route file requests def route(self, cmd, req_id, params): self.req_id = req_id + if cmd == "getFile": self.actionGetFile(params) elif cmd == "update": - self.actionUpdate(params) + event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"]) + if not RateLimit.isAllowed(event): # There was already an updat for this file in the last 10 second + self.response({"ok": "File update queued"}) + RateLimit.callAsync(event, 10, self.actionUpdate, params) # If called more than once within 10 sec only keep the last update + elif cmd == "pex": self.actionPex(params) elif cmd == "ping": diff --git a/src/Site/Site.py b/src/Site/Site.py index 517020c0..c15b306c 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -96,7 +96,6 @@ class Site: # Download all file from content.json - @util.Noparallel(blocking=True) def downloadContent(self, inner_path, download_files=True, peer=None): s = time.time() self.log.debug("Downloading %s..." % inner_path) @@ -223,6 +222,7 @@ class Site: # Update content.json on peers + @util.Noparallel() def publish(self, limit=5, inner_path="content.json"): self.log.info( "Publishing to %s/%s peers..." % (limit, len(self.peers)) ) published = [] # Successfully published (Peer) diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index fe6c3ff3..cc8b5e42 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -111,7 +111,7 @@ class UiRequest(object): if self.env["REQUEST_METHOD"] == "OPTIONS": headers.append(("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept")) # Allow json access - if (self.env["REQUEST_METHOD"] == "OPTIONS" or not self.isAjaxRequest()) and status == 200 and (content_type == "text/css" or content_type == "application/javascript" or self.env["REQUEST_METHOD"] == "OPTIONS" or content_type.startswith("image")): # Cache Css, Js, Image files for 10min + if (self.env["REQUEST_METHOD"] == "OPTIONS" or not self.isAjaxRequest()) and status == 200 and (content_type == "text/css" or content_type.startswith("application") or self.env["REQUEST_METHOD"] == "OPTIONS" or content_type.startswith("image")): # Cache Css, Js, Image files for 10min headers.append(("Cache-Control", "public, max-age=600")) # Cache 10 min else: # Images, Css, Js headers.append(("Cache-Control", "no-cache, no-store, private, must-revalidate, max-age=0")) # No caching at all diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 7e874d55..83dacaf1 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -2,7 +2,7 @@ import json, gevent, time, sys, hashlib from Config import config from Site import SiteManager from Debug import Debug -from util import QueryJson +from util import QueryJson, RateLimit from Plugin import PluginManager @PluginManager.acceptPlugins @@ -149,21 +149,6 @@ class UiWebsocket(object): func(req["id"], params) - # - Actions - - - # Do callback on response {"cmd": "response", "to": message_id, "result": result} - def actionResponse(self, to, result): - if to in self.waiting_cb: - self.waiting_cb[to](result) # Call callback function - else: - self.log.error("Websocket callback not found: %s, %s" % (to, result)) - - - # Send a simple pong answer - def actionPing(self, to): - self.response(to, "pong") - - # Format site info def formatSiteInfo(self, site, create_user=True): content = site.content_manager.contents.get("content.json") @@ -198,21 +183,6 @@ class UiWebsocket(object): return ret - # Send site details - def actionSiteInfo(self, to, file_status = None): - ret = self.formatSiteInfo(self.site) - if file_status: # Client queries file status - if self.site.storage.isFile(file_status): # File exits, add event done - ret["event"] = ("file_done", file_status) - self.response(to, ret) - - - # Join to an event channel - def actionChannelJoin(self, to, channel): - if channel not in self.channels: - self.channels.append(channel) - - def formatServerInfo(self): return { "ip_external": bool(sys.modules["main"].file_server.port_opened), @@ -228,6 +198,36 @@ class UiWebsocket(object): } + # - Actions - + + # Do callback on response {"cmd": "response", "to": message_id, "result": result} + def actionResponse(self, to, result): + if to in self.waiting_cb: + self.waiting_cb[to](result) # Call callback function + else: + self.log.error("Websocket callback not found: %s, %s" % (to, result)) + + + # Send a simple pong answer + def actionPing(self, to): + self.response(to, "pong") + + + # Send site details + def actionSiteInfo(self, to, file_status = None): + ret = self.formatSiteInfo(self.site) + if file_status: # Client queries file status + if self.site.storage.isFile(file_status): # File exits, add event done + ret["event"] = ("file_done", file_status) + self.response(to, ret) + + + # Join to an event channel + def actionChannelJoin(self, to, channel): + if channel not in self.channels: + self.channels.append(channel) + + # Server variables def actionServerInfo(self, to): ret = self.formatServerInfo() @@ -261,18 +261,28 @@ class UiWebsocket(object): site.saveSettings() site.announce() - published = site.publish(5, inner_path) # Publish to 5 peer + event_name = "publish %s %s" % (site.address, inner_path) + thread = RateLimit.callAsync(event_name, 7, site.publish, 5, inner_path) # Only publish once in 7 second to 5 peers + notification = "linked" not in dir(thread) # Only display notification on first callback + thread.linked = True + thread.link(lambda thread: self.cbSitePublish(to, thread, notification)) # At the end callback with request id and thread + + + # Callback of site publish + def cbSitePublish(self, to, thread, notification=True): + site = self.site + published = thread.value if published>0: # Successfuly published - self.cmd("notification", ["done", "Content published to %s peers." % published, 5000]) + if notification: self.cmd("notification", ["done", "Content published to %s peers." % published, 5000]) self.response(to, "ok") - site.updateWebsocket() # Send updated site data to local websocket clients + if notification: site.updateWebsocket() # Send updated site data to local websocket clients else: if len(site.peers) == 0: - self.cmd("notification", ["info", "No peers found, but your content is ready to access."]) + if notification: self.cmd("notification", ["info", "No peers found, but your content is ready to access."]) self.response(to, "No peers found, but your content is ready to access.") else: - self.cmd("notification", ["error", "Content publish failed."]) + if notification: self.cmd("notification", ["error", "Content publish failed."]) self.response(to, "Content publish failed.") @@ -326,6 +336,7 @@ class UiWebsocket(object): return self.response(to, body) + # - Admin actions - # List all site info diff --git a/src/Ui/media/img/favicon.psd b/src/Ui/media/img/favicon.psd index 1eea4ccf..1cf6f35f 100644 Binary files a/src/Ui/media/img/favicon.psd and b/src/Ui/media/img/favicon.psd differ diff --git a/src/Ui/media/img/logo.psd b/src/Ui/media/img/logo.psd new file mode 100644 index 00000000..3babd0c0 Binary files /dev/null and b/src/Ui/media/img/logo.psd differ diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index 553a5d69..0129ed4a 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -61,7 +61,7 @@ class Worker: self.task = None else: # Hash failed self.manager.log.debug("%s: Hash failed: %s, failed peers: %s" % (self.key, task["inner_path"], len(task["failed"]))) - task["failed"].append(self.key) + task["failed"].append(self.peer) self.task = None self.peer.hash_failed += 1 if self.peer.hash_failed >= 3: # Broken peer diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 62fc8d06..783d30f7 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -76,7 +76,7 @@ class WorkerManager: self.tasks.sort(key=self.taskSorter, reverse=True) # Sort tasks by priority and worker numbers for task in self.tasks: # Find a task if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task - if peer.key in task["failed"]: continue # Peer already tried to solve this, but failed + if peer in task["failed"]: continue # Peer already tried to solve this, but failed return task @@ -145,6 +145,12 @@ class WorkerManager: task["peers"].append(peer) self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"])) self.startWorkers([peer]) + elif peer and peer in task["failed"]: + task["failed"].remove(peer) # New update arrived, remove the peer from failed peers + self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"])) + self.startWorkers([peer]) + + if priority: task["priority"] += priority # Boost on priority return task["evt"] diff --git a/src/util/Event.py b/src/util/Event.py index 850e920d..0eab1c63 100644 --- a/src/util/Event.py +++ b/src/util/Event.py @@ -25,9 +25,26 @@ class Event(list): return self -if __name__ == "__main__": + + +def testBenchmark(): def say(pre, text): print "%s Say: %s" % (pre, text) + + import time + s = time.time() + onChanged = Event() + for i in range(1000): + onChanged.once(lambda pre: say(pre, "once"), "once") + print "Created 1000 once in %.3fs" % (time.time()-s) + onChanged("#1") + + + +def testUsage(): + def say(pre, text): + print "%s Say: %s" % (pre, text) + onChanged = Event() onChanged.once(lambda pre: say(pre, "once")) onChanged.once(lambda pre: say(pre, "once")) @@ -37,3 +54,7 @@ if __name__ == "__main__": onChanged("#1") onChanged("#2") onChanged("#3") + + +if __name__ == "__main__": + testBenchmark() diff --git a/src/util/Noparallel.py b/src/util/Noparallel.py index 1af0699f..68521944 100644 --- a/src/util/Noparallel.py +++ b/src/util/Noparallel.py @@ -1,5 +1,6 @@ import gevent, time + class Noparallel(object): # Only allow function running once in same time def __init__(self,blocking=True): self.threads = {} @@ -30,15 +31,21 @@ class Noparallel(object): # Only allow function running once in same time if key in self.threads: del(self.threads[key]) # Allowing it to run again return ret else: # No blocking just return the thread + thread.link(lambda thread: self.cleanup(key, thread)) return thread wrapper.func_name = func.func_name return wrapper + # Cleanup finished threads + def cleanup(self, key, thread): + if key in self.threads: del(self.threads[key]) + + class Test(): @Noparallel() - def count(self): - for i in range(5): + def count(self, num=5): + for i in range(num): print self, i time.sleep(1) return "%s return:%s" % (self, i) @@ -46,8 +53,8 @@ class Test(): class TestNoblock(): @Noparallel(blocking=False) - def count(self): - for i in range(5): + def count(self, num=5): + for i in range(num): print self, i time.sleep(1) return "%s return:%s" % (self, i) @@ -104,11 +111,33 @@ def testNoblocking(): print thread1.value, thread2.value, thread3.value, thread4.value print "Done." + +def testBenchmark(): + import time + def printThreadNum(): + import gc + from greenlet import greenlet + objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] + print "Greenlets: %s" % len(objs) + + printThreadNum() + test = TestNoblock() + s = time.time() + for i in range(3): + gevent.spawn(test.count, i+1) + print "Created in %.3fs" % (time.time()-s) + printThreadNum() + time.sleep(5) + + + if __name__ == "__main__": from gevent import monkey monkey.patch_all() + testBenchmark() print "Testing blocking mode..." testBlocking() print "Testing noblocking mode..." testNoblocking() + print [instance.threads for instance in registry] diff --git a/src/util/RateLimit.py b/src/util/RateLimit.py new file mode 100644 index 00000000..3693f0b3 --- /dev/null +++ b/src/util/RateLimit.py @@ -0,0 +1,106 @@ +import time +import gevent +import logging + +log = logging.getLogger("RateLimit") + +called_db = {} +queue_db = {} + +# Register event as called +# Return: None +def called(event): + called_db[event] = time.time() + + +# Check if calling event is allowed +# Return: True if allowed False if not +def isAllowed(event, allowed_again=10): + last_called = called_db.get(event) + if not last_called: # Its not called before + return True + elif time.time()-last_called >= allowed_again: + del called_db[event] # Delete last call time to save memory + return True + else: + return False + + +def callQueue(event): + func, args, kwargs, thread = queue_db[event] + log.debug("Calling: %s" % event) + del called_db[event] + del queue_db[event] + return func(*args, **kwargs) + + + +# Rate limit and delay function call if needed, If the function called again within the rate limit interval then previous queued call will be dropped +# Return: Immedietly gevent thread +def callAsync(event, allowed_again=10, func=None, *args, **kwargs): + if isAllowed(event): # Not called recently, call it now + called(event) + # print "Calling now" + return gevent.spawn(func, *args, **kwargs) + else: # Called recently, schedule it for later + time_left = allowed_again-max(0, time.time()-called_db[event]) + log.debug("Added to queue (%.2fs left): %s " % (time_left, event)) + if not queue_db.get(event): # Function call not queued yet + thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later + queue_db[event] = (func, args, kwargs, thread) + return thread + else: # Function call already queued, just update the parameters + thread = queue_db[event][3] + queue_db[event] = (func, args, kwargs, thread) + return thread + + +# Rate limit and delay function call if needed +# Return: Wait for execution/delay then return value +def call(event, allowed_again=10, func=None, *args, **kwargs): + if isAllowed(event): # Not called recently, call it now + called(event) + # print "Calling now" + return func(*args, **kwargs) + + else: # Called recently, schedule it for later + time_left = max(0, allowed_again-(time.time()-called_db[event])) + # print "Time left: %s" % time_left, args, kwargs + log.debug("Calling sync (%.2fs left): %s" % (time_left, event)) + time.sleep(time_left) + called(event) + back = func(*args, **kwargs) + if event in called_db: + del called_db[event] + return back + + + +if __name__ == "__main__": + from gevent import monkey + monkey.patch_all() + import random + + def publish(inner_path): + print "Publishing %s..." % inner_path + return 1 + + def cb(thread): + print "Value:", thread.value + + print "Testing async spam requests rate limit to 1/sec..." + for i in range(3000): + thread = callAsync("publish content.json", 1, publish, "content.json %s" % i) + time.sleep(float(random.randint(1,20))/100000) + print thread.link(cb) + print "Done" + + time.sleep(2) + + print "Testing sync spam requests rate limit to 1/sec..." + for i in range(5): + call("publish data.json", 1, publish, "data.json %s" % i) + time.sleep(float(random.randint(1,100))/100) + print "Done" + + print called_db, queue_db