diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 218ec17a..8609b26d 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -105,53 +105,66 @@ class UiRequestPlugin(object): # Objects - yield "

Objects in memory:
" + + obj_count = {} + for obj in gc.get_objects(): + obj_type = str(type(obj)) + if not obj_type in obj_count: + obj_count[obj_type] = [0, 0] + obj_count[obj_type][0] += 1 # Count + obj_count[obj_type][1] += float(sys.getsizeof(obj))/1024 # Size + + yield "

Objects in memory (total: %s, %.2fkb):
" % (len(obj_count), sum([stat[1] for stat in obj_count.values()])) + + for obj, stat in sorted(obj_count.items(), key=lambda x: x[1][0], reverse=True): # Sorted by count + yield " - %.1fkb = %s x %s
" % (stat[1], stat[0], cgi.escape(obj)) + from greenlet import greenlet objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] yield "
Greenlets (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from Worker import Worker objs = [obj for obj in gc.get_objects() if isinstance(obj, Worker)] yield "
Workers (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from Connection import Connection objs = [obj for obj in gc.get_objects() if isinstance(obj, Connection)] yield "
Connections (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from msgpack import Unpacker objs = [obj for obj in gc.get_objects() if isinstance(obj, Unpacker)] yield "
Msgpack unpacker (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from Site import Site objs = [obj for obj in gc.get_objects() if isinstance(obj, Site)] yield "
Sites (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) objs = [obj for obj in gc.get_objects() if isinstance(obj, self.server.log.__class__)] yield "
Loggers (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj.name))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj.name))) objs = [obj for obj in gc.get_objects() if isinstance(obj, UiRequest)] yield "
UiRequest (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) objs = [(key, val) for key, val in sys.modules.iteritems() if val is not None] objs.sort() @@ -159,4 +172,4 @@ class UiRequestPlugin(object): for module_name, module in objs: yield " - %.3fkb: %s %s
" % (self.getObjSize(module, hpy), module_name, cgi.escape(repr(module))) - yield "Done in %.3f" % (time.time()-s) + yield "Done in %.1f" % (time.time()-s) diff --git a/plugins/Zeroname/updater/zeroname_updater.py b/plugins/Zeroname/updater/zeroname_updater.py index ef71b112..e7fcda25 100644 --- a/plugins/Zeroname/updater/zeroname_updater.py +++ b/plugins/Zeroname/updater/zeroname_updater.py @@ -1,5 +1,5 @@ from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException -import time, json, os, sys, re +import time, json, os, sys, re, socket def publish(): @@ -47,6 +47,7 @@ def processNameOp(domain, value): def processBlock(block_id): print "Processing block #%s..." % block_id + s = time.time() block_hash = rpc.getblockhash(block_id) block = rpc.getblock(block_hash) @@ -58,7 +59,7 @@ def processBlock(block_id): if "scriptPubKey" in vout and "nameOp" in vout["scriptPubKey"] and "name" in vout["scriptPubKey"]["nameOp"]: name_op = vout["scriptPubKey"]["nameOp"] updated += processNameOp(name_op["name"].replace("d/", ""), name_op["value"]) - print "Done (updated %s)." % updated + print "Done in %.3fs (updated %s)." % (time.time()-s, updated) if updated: publish() @@ -98,16 +99,23 @@ for block_id in range(config["lastprocessed"], last_block+1): #processBlock(223911) # Testing while 1: - print "Waiting for new block..." + print "Waiting for new block", + sys.stdout.flush() while 1: try: rpc = AuthServiceProxy(rpc_url, timeout=60*5) if (int(rpc.getinfo()["blocks"]) > last_block): break time.sleep(1) rpc.waitforblock() + print "Found" break # Block found - except Exception, err: # Timeout - print "Exception", err + except socket.timeout: # Timeout + print ".", + sys.stdout.flush() + except Exception, err: + print "Exception", err.__class__, err + time.sleep(5) + last_block = int(rpc.getinfo()["blocks"]) for block_id in range(config["lastprocessed"]+1, last_block+1): processBlock(block_id) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index e5f6d05c..34a6c02e 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -24,7 +24,6 @@ class Connection: self.type = "?" self.server = server - self.log = logging.getLogger(str(self)) self.unpacker = None # Stream incoming socket messages here self.req_id = 0 # Last request id self.handshake = {} # Handshake info got from peer @@ -50,19 +49,31 @@ class Connection: self.last_req_time = 0 self.last_cmd = None + self.name = None + self.updateName() + self.waiting_requests = {} # Waiting sent requests + def updateName(self): + self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) + + def __str__(self): - return "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) + return self.name + def __repr__(self): return "<%s>" % self.__str__() + def log(self, text): + self.server.log.debug("%s > %s" % (self.name, text)) + + # Open connection to peer and wait for handshake def connect(self): - self.log.debug("Connecting...") + self.log("Connecting...") self.type = "out" self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.ip, self.port)) @@ -79,14 +90,14 @@ class Connection: try: firstchar = sock.recv(1) # Find out if pure socket or zeromq except Exception, err: - self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.log("Socket firstchar error: %s" % Debug.formatException(err)) self.close() return False if firstchar == "\xff": # Backward compatiblity: forward data to zmq - if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ") + if config.debug_socket: self.log("Fallback incoming connection to ZeroMQ") self.protocol = "zeromq" - self.log.name = str(self) + self.updateName() self.connected = True self.event_connected.set(self.protocol) @@ -99,7 +110,7 @@ class Connection: self.server.forward(self, sock, zmq_sock) self.close() # Forward ended close connection else: - self.config.debug("ZeroMQ Server not running, exiting!") + self.log("ZeroMQ Server not running, exiting!") else: # Normal socket self.messageLoop(firstchar) @@ -110,16 +121,16 @@ class Connection: try: if not firstchar: firstchar = sock.recv(1) except Exception, err: - self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.log("Socket firstchar error: %s" % Debug.formatException(err)) self.close() return False if firstchar == "\xff": # Backward compatibility to zmq self.sock.close() # Close normal socket del firstchar if zmq: - if config.debug_socket: self.log.debug("Connecting as ZeroMQ") + if config.debug_socket: self.log("Connecting as ZeroMQ") self.protocol = "zeromq" - self.log.name = str(self) + self.updateName() self.connected = True self.event_connected.set(self.protocol) # Mark handshake as done @@ -133,12 +144,12 @@ class Connection: zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port)) self.zmq_sock = zmq_sock except Exception, err: - if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err)) + if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) else: return False # No zeromq connection supported else: # Normal socket self.protocol = "v2" - self.log.name = str(self) + self.updateName() self.connected = True self.event_connected.set(self.protocol) # Mark handshake as done @@ -152,7 +163,6 @@ class Connection: self.incomplete_buff_recv += 1 self.bytes_recv += len(buff) if not self.unpacker: - self.log.debug("Unpacker created") self.unpacker = msgpack.Unpacker() self.unpacker.feed(buff) for message in self.unpacker: @@ -161,7 +171,7 @@ class Connection: message = None buf = None except Exception, err: - if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err)) + if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) self.close() # MessageLoop ended, close connection @@ -184,17 +194,17 @@ class Connection: del self.waiting_requests[message["to"]] elif message["to"] == 0: # Other peers handshake ping = time.time()-self.start_time - if config.debug_socket: self.log.debug("Got handshake response: %s, ping: %s" % (message, ping)) + if config.debug_socket: self.log("Got handshake response: %s, ping: %s" % (message, ping)) self.last_ping_delay = ping self.handshake = message self.port = message["fileserver_port"] # Set peer fileserver port else: - self.log.debug("Unknown response: %s" % message) + self.log("Unknown response: %s" % message) elif message.get("cmd"): # Handhsake request if message["cmd"] == "handshake": self.handshake = message["params"] self.port = self.handshake["fileserver_port"] # Set peer fileserver port - if config.debug_socket: self.log.debug("Handshake request: %s" % message) + if config.debug_socket: self.log("Handshake request: %s" % message) data = self.handshakeInfo() data["cmd"] = "response" data["to"] = message["req_id"] @@ -202,7 +212,7 @@ class Connection: else: self.server.handleRequest(self, message) else: # Old style response, no req_id definied - if config.debug_socket: self.log.debug("Old style response, waiting: %s" % self.waiting_requests.keys()) + if config.debug_socket: self.log("Old style response, waiting: %s" % self.waiting_requests.keys()) last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true self.waiting_requests[last_req_id].set(message) del self.waiting_requests[last_req_id] # Remove from waiting request @@ -211,13 +221,13 @@ class Connection: # Send data to connection def send(self, message): - if config.debug_socket: self.log.debug("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) + if config.debug_socket: self.log("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) self.last_send_time = time.time() if self.protocol == "zeromq": if self.zmq_sock: # Outgoing connection self.zmq_queue.append(message) if self.zmq_working: - self.log.debug("ZeroMQ already working...") + self.log("ZeroMQ already working...") return while self.zmq_queue: self.zmq_working = True @@ -233,14 +243,13 @@ class Connection: self.bytes_sent += len(data) self.sock.sendall(data) self.last_sent_time = time.time() - if config.debug_socket: self.log.debug("Sent: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) return True # Create and send a request to peer def request(self, cmd, params={}): if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: # Last command sent more than 10 sec ago, timeout - self.log.debug("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) + self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) self.close() return False @@ -262,7 +271,7 @@ class Connection: try: response = self.request("ping") except Exception, err: - self.log.debug("Ping error: %s" % Debug.formatException(err)) + self.log("Ping error: %s" % Debug.formatException(err)) if response and "body" in response and response["body"] == "Pong!": self.last_ping_delay = time.time()-s return True @@ -279,7 +288,7 @@ class Connection: self.connected = False self.event_connected.set(False) - if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv)) + if config.debug_socket: self.log("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv)) for request in self.waiting_requests.values(): # Mark pending requests failed request.set(False) self.waiting_requests = {} @@ -293,7 +302,7 @@ class Connection: self.sock.shutdown(gevent.socket.SHUT_WR) self.sock.close() except Exception, err: - if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err)) + if config.debug_socket: self.log("Close error: %s" % Debug.formatException(err)) # Little cleanup del self.unpacker diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index baf3349e..862e2f72 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -66,7 +66,8 @@ class ConnectionServer: succ = connection.event_connected.get() # Wait for connection if not succ: raise Exception("Connection event return error") return connection - if ip in self.ips: # Find connection by ip + # Find connection by ip + if ip in self.ips: connection = self.ips[ip] if not connection.connected: succ = connection.event_connected.get() # Wait for connection @@ -87,7 +88,9 @@ class ConnectionServer: self.connections.append(connection) succ = connection.connect() if not succ: + connection.close() raise Exception("Connection event return error") + except Exception, err: self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) connection.close() @@ -97,6 +100,7 @@ class ConnectionServer: def removeConnection(self, connection): + self.log.debug("Removing %s..." % connection) if self.ips.get(connection.ip) == connection: # Delete if same as in registry del self.ips[connection.ip] if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry @@ -115,10 +119,10 @@ class ConnectionServer: if connection.unpacker and idle > 30: # Delete the unpacker if not needed del connection.unpacker connection.unpacker = None - connection.log.debug("Unpacker deleted") + connection.log("Unpacker deleted") if idle > 60*60: # Wake up after 1h - connection.log.debug("[Cleanup] After wakeup, idle: %s" % idle) + connection.log("[Cleanup] After wakeup, idle: %s" % idle) connection.close() elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec @@ -130,15 +134,15 @@ class ConnectionServer: connection.close() elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle - connection.log.debug("[Cleanup] Connection buff stalled") + connection.log("[Cleanup] Connection buff stalled") connection.close() elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10: # Sent command and no response in 10 sec - connection.log.debug("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time)) + connection.log("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time)) connection.close() elif idle > 60 and connection.protocol == "?": # No connection after 1 min - connection.log.debug("[Cleanup] Connect timeout: %s" % idle) + connection.log("[Cleanup] Connect timeout: %s" % idle) connection.close() diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 242704b1..e5baa739 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -131,7 +131,7 @@ class ContentManager: # Create and sign a content.json # Return: The new content if filewrite = False - def sign(self, inner_path = "content.json", privatekey=None, filewrite=True): + def sign(self, inner_path = "content.json", privatekey=None, filewrite=True, update_changed_files=False): content = self.contents.get(inner_path) if not content: # Content not exits yet, load default one self.log.info("File %s not exits yet, loading default values..." % inner_path) @@ -146,6 +146,7 @@ class ContentManager: self.log.info("Opening site data directory: %s..." % directory) hashed_files = {} + changed_files = [inner_path] for root, dirs, files in os.walk(directory): for file_name in files: file_path = self.site.storage.getPath("%s/%s" % (root.strip("/"), file_name)) @@ -157,6 +158,14 @@ class ContentManager: sha512sum = CryptHash.sha512sum(file_path) # Calculate sha512 sum of file self.log.info("- %s (SHA512: %s)" % (file_inner_path, sha512sum)) hashed_files[file_inner_path] = {"sha512": sha512sum, "size": os.path.getsize(file_path)} + if file_inner_path in content["files"].keys() and hashed_files[file_inner_path]["sha512"] != content["files"][file_inner_path].get("sha512"): + changed_files.append(file_path) + + + self.log.debug("Changed files: %s" % changed_files) + if update_changed_files: + for file_path in changed_files: + self.site.storage.onUpdated(file_path) # Generate new content.json self.log.info("Adding timestamp and sha512sums to new content.json...") diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 9373f9fb..1f982cb7 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -63,7 +63,7 @@ class FileRequest: if params["inner_path"].endswith("content.json"): # Download every changed file from peer peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer - site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"])) # On complete publish to other peers + site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"]) # On complete publish to other peers gevent.spawn( lambda: site.downloadContent(params["inner_path"], peer=peer) ) # Load new content file and download changed files in new thread diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 65a303f4..e32ed4e5 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -129,6 +129,10 @@ class FileServer(ConnectionServer): for inner_path in site.bad_files: site.bad_files[inner_path] = 0 + # Retry failed files + if site.bad_files: + site.retryBadFiles() + # In passive mode keep 5 active peer connection to get the updates if self.port_opened == False: site.needConnections() @@ -152,7 +156,7 @@ class FileServer(ConnectionServer): # Bind and start serving sites def start(self, check_sites = True): - self.log = logging.getLogger(__name__) + self.log = logging.getLogger("FileServer") if config.debug: # Auto reload FileRequest on change diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index cc13daad..60a453fc 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -39,7 +39,9 @@ class Peer: try: self.connection = self.connection_server.getConnection(self.ip, self.port) except Exception, err: + self.onConnectionError() self.log.debug("Getting connection error: %s (connection_error: %s, hash_failed: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed)) + self.connection = None def __str__(self): return "Peer %-12s" % self.ip diff --git a/src/Site/Site.py b/src/Site/Site.py index e196009f..a2fbd5f2 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -94,8 +94,6 @@ class Site: - - # Download all file from content.json @util.Noparallel(blocking=True) def downloadContent(self, inner_path, download_files=True, peer=None): @@ -141,6 +139,12 @@ class Site: return [bad_file for bad_file, retry in self.bad_files.iteritems() if retry < 3] + # Retry download bad files + def retryBadFiles(self): + for bad_file in self.bad_files.keys(): + self.needFile(bad_file, update=True, blocking=False) + + # Download all files of the site @util.Noparallel(blocking=False) def download(self, check_size=False): @@ -181,11 +185,11 @@ class Site: # Publish worker - def publisher(self, inner_path, peers, published, limit, event_done): + def publisher(self, inner_path, peers, published, limit, event_done=None): timeout = 5+int(self.storage.getSize(inner_path)/1024) # Timeout: 5sec + size in kb while 1: if not peers or len(published) >= limit: - event_done.set(True) + if event_done: event_done.set(True) break # All peers done, or published engouht peer = peers.pop(0) result = {"exception": "Timeout"} @@ -207,7 +211,7 @@ class Site: published.append(peer) self.log.info("[OK] %s: %s" % (peer.key, result["ok"])) else: - self.log.info("[ERROR] %s: %s" % (peer.key, result)) + self.log.info("[FAILED] %s: %s" % (peer.key, result)) # Update content.json on peers diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py index db98359d..157e5648 100644 --- a/src/Site/SiteStorage.py +++ b/src/Site/SiteStorage.py @@ -128,7 +128,12 @@ class SiteStorage: else: # Simple string open(file_path, "wb").write(content) del content + self.onUpdated(inner_path) + + # Site content updated + def onUpdated(self, inner_path): + file_path = self.getPath(inner_path) # Update Sql cache if inner_path == "dbschema.json": self.has_db = self.isFile("dbschema.json") @@ -138,6 +143,7 @@ class SiteStorage: self.getDb().loadJson(file_path) + # Load and parse json file def loadJson(self, inner_path): return json.load(self.open(inner_path)) @@ -197,7 +203,7 @@ class SiteStorage: ok = self.site.content_manager.verifyFile(file_inner_path, open(file_path, "rb")) if not ok: - self.log.error("[ERROR] %s" % file_inner_path) + self.log.debug("[CHNAGED] %s" % file_inner_path) bad_files.append(file_inner_path) self.log.debug("%s verified: %s files, quick_check: %s, bad files: %s" % (content_inner_path, len(content["files"]), quick_check, bad_files)) diff --git a/src/main.py b/src/main.py index 99dfd0a6..0acfbfb5 100644 --- a/src/main.py +++ b/src/main.py @@ -119,7 +119,7 @@ def siteSign(address, privatekey=None, inner_path="content.json"): if not privatekey: # If no privatekey in args then ask it now import getpass privatekey = getpass.getpass("Private key (input hidden):") - site.content_manager.sign(inner_path=inner_path, privatekey=privatekey) + site.content_manager.sign(inner_path=inner_path, privatekey=privatekey, update_changed_files=True) def siteVerify(address): @@ -199,8 +199,11 @@ def sitePublish(address, peer_ip=None, peer_port=15441, inner_path="content.json logging.info("Gathering peers from tracker") site.announce() # Gather peers site.publish(20, inner_path) # Push to 20 peers - logging.info("Serving files....") + time.sleep(1) + logging.info("Serving files...") gevent.joinall([file_server_thread]) + logging.info("Done.") + # Crypto commands diff --git a/src/util/Event.py b/src/util/Event.py index 4b4e7c96..ae7d0a53 100644 --- a/src/util/Event.py +++ b/src/util/Event.py @@ -12,9 +12,16 @@ class Event(list): return "Event(%s)" % list.__repr__(self) - def once(self, func): + def once(self, func, name=None): func.once = True - self.append(func) + func.name = None + if name: # Dont function with same name twice + names = [f.name for f in self] + if name not in names: + func.name = name + self.append(func) + else: + self.append(func) return self @@ -23,6 +30,9 @@ if __name__ == "__main__": print "%s Say: %s" % (pre, text) onChanged = Event() onChanged.once(lambda pre: say(pre, "once")) + onChanged.once(lambda pre: say(pre, "once")) + onChanged.once(lambda pre: say(pre, "namedonce"), "namedonce") + onChanged.once(lambda pre: say(pre, "namedonce"), "namedonce") onChanged.append(lambda pre: say(pre, "always")) onChanged("#1") onChanged("#2")