From 67783bd494f5039b1f915b010d2d044300683e78 Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Wed, 8 Apr 2015 01:57:55 +0200 Subject: [PATCH] total object stat, proper zeronet updater timeout catch, connection using connectionserver logger, trigger sitestorage onupdated when signing new file, named once events, only publish once same updated files, retry bad files every 20 min, trigger connection error on failed connection --- plugins/Stats/StatsPlugin.py | 31 +++++++--- plugins/Zeroname/updater/zeroname_updater.py | 18 ++++-- src/Connection/Connection.py | 59 +++++++++++--------- src/Connection/ConnectionServer.py | 16 ++++-- src/Content/ContentManager.py | 11 +++- src/File/FileRequest.py | 2 +- src/File/FileServer.py | 6 +- src/Peer/Peer.py | 2 + src/Site/Site.py | 14 +++-- src/Site/SiteStorage.py | 8 ++- src/main.py | 7 ++- src/util/Event.py | 14 ++++- 12 files changed, 130 insertions(+), 58 deletions(-) diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 218ec17a..8609b26d 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -105,53 +105,66 @@ class UiRequestPlugin(object): # Objects - yield "

Objects in memory:
" + + obj_count = {} + for obj in gc.get_objects(): + obj_type = str(type(obj)) + if not obj_type in obj_count: + obj_count[obj_type] = [0, 0] + obj_count[obj_type][0] += 1 # Count + obj_count[obj_type][1] += float(sys.getsizeof(obj))/1024 # Size + + yield "

Objects in memory (total: %s, %.2fkb):
" % (len(obj_count), sum([stat[1] for stat in obj_count.values()])) + + for obj, stat in sorted(obj_count.items(), key=lambda x: x[1][0], reverse=True): # Sorted by count + yield " - %.1fkb = %s x %s
" % (stat[1], stat[0], cgi.escape(obj)) + from greenlet import greenlet objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] yield "
Greenlets (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from Worker import Worker objs = [obj for obj in gc.get_objects() if isinstance(obj, Worker)] yield "
Workers (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from Connection import Connection objs = [obj for obj in gc.get_objects() if isinstance(obj, Connection)] yield "
Connections (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from msgpack import Unpacker objs = [obj for obj in gc.get_objects() if isinstance(obj, Unpacker)] yield "
Msgpack unpacker (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) from Site import Site objs = [obj for obj in gc.get_objects() if isinstance(obj, Site)] yield "
Sites (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) objs = [obj for obj in gc.get_objects() if isinstance(obj, self.server.log.__class__)] yield "
Loggers (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj.name))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj.name))) objs = [obj for obj in gc.get_objects() if isinstance(obj, UiRequest)] yield "
UiRequest (%s):
" % len(objs) for obj in objs: - yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + yield " - %.1fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) objs = [(key, val) for key, val in sys.modules.iteritems() if val is not None] objs.sort() @@ -159,4 +172,4 @@ class UiRequestPlugin(object): for module_name, module in objs: yield " - %.3fkb: %s %s
" % (self.getObjSize(module, hpy), module_name, cgi.escape(repr(module))) - yield "Done in %.3f" % (time.time()-s) + yield "Done in %.1f" % (time.time()-s) diff --git a/plugins/Zeroname/updater/zeroname_updater.py b/plugins/Zeroname/updater/zeroname_updater.py index ef71b112..e7fcda25 100644 --- a/plugins/Zeroname/updater/zeroname_updater.py +++ b/plugins/Zeroname/updater/zeroname_updater.py @@ -1,5 +1,5 @@ from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException -import time, json, os, sys, re +import time, json, os, sys, re, socket def publish(): @@ -47,6 +47,7 @@ def processNameOp(domain, value): def processBlock(block_id): print "Processing block #%s..." % block_id + s = time.time() block_hash = rpc.getblockhash(block_id) block = rpc.getblock(block_hash) @@ -58,7 +59,7 @@ def processBlock(block_id): if "scriptPubKey" in vout and "nameOp" in vout["scriptPubKey"] and "name" in vout["scriptPubKey"]["nameOp"]: name_op = vout["scriptPubKey"]["nameOp"] updated += processNameOp(name_op["name"].replace("d/", ""), name_op["value"]) - print "Done (updated %s)." % updated + print "Done in %.3fs (updated %s)." % (time.time()-s, updated) if updated: publish() @@ -98,16 +99,23 @@ for block_id in range(config["lastprocessed"], last_block+1): #processBlock(223911) # Testing while 1: - print "Waiting for new block..." + print "Waiting for new block", + sys.stdout.flush() while 1: try: rpc = AuthServiceProxy(rpc_url, timeout=60*5) if (int(rpc.getinfo()["blocks"]) > last_block): break time.sleep(1) rpc.waitforblock() + print "Found" break # Block found - except Exception, err: # Timeout - print "Exception", err + except socket.timeout: # Timeout + print ".", + sys.stdout.flush() + except Exception, err: + print "Exception", err.__class__, err + time.sleep(5) + last_block = int(rpc.getinfo()["blocks"]) for block_id in range(config["lastprocessed"]+1, last_block+1): processBlock(block_id) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index e5f6d05c..34a6c02e 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -24,7 +24,6 @@ class Connection: self.type = "?" self.server = server - self.log = logging.getLogger(str(self)) self.unpacker = None # Stream incoming socket messages here self.req_id = 0 # Last request id self.handshake = {} # Handshake info got from peer @@ -50,19 +49,31 @@ class Connection: self.last_req_time = 0 self.last_cmd = None + self.name = None + self.updateName() + self.waiting_requests = {} # Waiting sent requests + def updateName(self): + self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) + + def __str__(self): - return "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) + return self.name + def __repr__(self): return "<%s>" % self.__str__() + def log(self, text): + self.server.log.debug("%s > %s" % (self.name, text)) + + # Open connection to peer and wait for handshake def connect(self): - self.log.debug("Connecting...") + self.log("Connecting...") self.type = "out" self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.ip, self.port)) @@ -79,14 +90,14 @@ class Connection: try: firstchar = sock.recv(1) # Find out if pure socket or zeromq except Exception, err: - self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.log("Socket firstchar error: %s" % Debug.formatException(err)) self.close() return False if firstchar == "\xff": # Backward compatiblity: forward data to zmq - if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ") + if config.debug_socket: self.log("Fallback incoming connection to ZeroMQ") self.protocol = "zeromq" - self.log.name = str(self) + self.updateName() self.connected = True self.event_connected.set(self.protocol) @@ -99,7 +110,7 @@ class Connection: self.server.forward(self, sock, zmq_sock) self.close() # Forward ended close connection else: - self.config.debug("ZeroMQ Server not running, exiting!") + self.log("ZeroMQ Server not running, exiting!") else: # Normal socket self.messageLoop(firstchar) @@ -110,16 +121,16 @@ class Connection: try: if not firstchar: firstchar = sock.recv(1) except Exception, err: - self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.log("Socket firstchar error: %s" % Debug.formatException(err)) self.close() return False if firstchar == "\xff": # Backward compatibility to zmq self.sock.close() # Close normal socket del firstchar if zmq: - if config.debug_socket: self.log.debug("Connecting as ZeroMQ") + if config.debug_socket: self.log("Connecting as ZeroMQ") self.protocol = "zeromq" - self.log.name = str(self) + self.updateName() self.connected = True self.event_connected.set(self.protocol) # Mark handshake as done @@ -133,12 +144,12 @@ class Connection: zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port)) self.zmq_sock = zmq_sock except Exception, err: - if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err)) + if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) else: return False # No zeromq connection supported else: # Normal socket self.protocol = "v2" - self.log.name = str(self) + self.updateName() self.connected = True self.event_connected.set(self.protocol) # Mark handshake as done @@ -152,7 +163,6 @@ class Connection: self.incomplete_buff_recv += 1 self.bytes_recv += len(buff) if not self.unpacker: - self.log.debug("Unpacker created") self.unpacker = msgpack.Unpacker() self.unpacker.feed(buff) for message in self.unpacker: @@ -161,7 +171,7 @@ class Connection: message = None buf = None except Exception, err: - if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err)) + if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) self.close() # MessageLoop ended, close connection @@ -184,17 +194,17 @@ class Connection: del self.waiting_requests[message["to"]] elif message["to"] == 0: # Other peers handshake ping = time.time()-self.start_time - if config.debug_socket: self.log.debug("Got handshake response: %s, ping: %s" % (message, ping)) + if config.debug_socket: self.log("Got handshake response: %s, ping: %s" % (message, ping)) self.last_ping_delay = ping self.handshake = message self.port = message["fileserver_port"] # Set peer fileserver port else: - self.log.debug("Unknown response: %s" % message) + self.log("Unknown response: %s" % message) elif message.get("cmd"): # Handhsake request if message["cmd"] == "handshake": self.handshake = message["params"] self.port = self.handshake["fileserver_port"] # Set peer fileserver port - if config.debug_socket: self.log.debug("Handshake request: %s" % message) + if config.debug_socket: self.log("Handshake request: %s" % message) data = self.handshakeInfo() data["cmd"] = "response" data["to"] = message["req_id"] @@ -202,7 +212,7 @@ class Connection: else: self.server.handleRequest(self, message) else: # Old style response, no req_id definied - if config.debug_socket: self.log.debug("Old style response, waiting: %s" % self.waiting_requests.keys()) + if config.debug_socket: self.log("Old style response, waiting: %s" % self.waiting_requests.keys()) last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true self.waiting_requests[last_req_id].set(message) del self.waiting_requests[last_req_id] # Remove from waiting request @@ -211,13 +221,13 @@ class Connection: # Send data to connection def send(self, message): - if config.debug_socket: self.log.debug("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) + if config.debug_socket: self.log("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) self.last_send_time = time.time() if self.protocol == "zeromq": if self.zmq_sock: # Outgoing connection self.zmq_queue.append(message) if self.zmq_working: - self.log.debug("ZeroMQ already working...") + self.log("ZeroMQ already working...") return while self.zmq_queue: self.zmq_working = True @@ -233,14 +243,13 @@ class Connection: self.bytes_sent += len(data) self.sock.sendall(data) self.last_sent_time = time.time() - if config.debug_socket: self.log.debug("Sent: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id"))) return True # Create and send a request to peer def request(self, cmd, params={}): if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: # Last command sent more than 10 sec ago, timeout - self.log.debug("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) + self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) self.close() return False @@ -262,7 +271,7 @@ class Connection: try: response = self.request("ping") except Exception, err: - self.log.debug("Ping error: %s" % Debug.formatException(err)) + self.log("Ping error: %s" % Debug.formatException(err)) if response and "body" in response and response["body"] == "Pong!": self.last_ping_delay = time.time()-s return True @@ -279,7 +288,7 @@ class Connection: self.connected = False self.event_connected.set(False) - if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv)) + if config.debug_socket: self.log("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv)) for request in self.waiting_requests.values(): # Mark pending requests failed request.set(False) self.waiting_requests = {} @@ -293,7 +302,7 @@ class Connection: self.sock.shutdown(gevent.socket.SHUT_WR) self.sock.close() except Exception, err: - if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err)) + if config.debug_socket: self.log("Close error: %s" % Debug.formatException(err)) # Little cleanup del self.unpacker diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index baf3349e..862e2f72 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -66,7 +66,8 @@ class ConnectionServer: succ = connection.event_connected.get() # Wait for connection if not succ: raise Exception("Connection event return error") return connection - if ip in self.ips: # Find connection by ip + # Find connection by ip + if ip in self.ips: connection = self.ips[ip] if not connection.connected: succ = connection.event_connected.get() # Wait for connection @@ -87,7 +88,9 @@ class ConnectionServer: self.connections.append(connection) succ = connection.connect() if not succ: + connection.close() raise Exception("Connection event return error") + except Exception, err: self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) connection.close() @@ -97,6 +100,7 @@ class ConnectionServer: def removeConnection(self, connection): + self.log.debug("Removing %s..." % connection) if self.ips.get(connection.ip) == connection: # Delete if same as in registry del self.ips[connection.ip] if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry @@ -115,10 +119,10 @@ class ConnectionServer: if connection.unpacker and idle > 30: # Delete the unpacker if not needed del connection.unpacker connection.unpacker = None - connection.log.debug("Unpacker deleted") + connection.log("Unpacker deleted") if idle > 60*60: # Wake up after 1h - connection.log.debug("[Cleanup] After wakeup, idle: %s" % idle) + connection.log("[Cleanup] After wakeup, idle: %s" % idle) connection.close() elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec @@ -130,15 +134,15 @@ class ConnectionServer: connection.close() elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle - connection.log.debug("[Cleanup] Connection buff stalled") + connection.log("[Cleanup] Connection buff stalled") connection.close() elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10: # Sent command and no response in 10 sec - connection.log.debug("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time)) + connection.log("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time)) connection.close() elif idle > 60 and connection.protocol == "?": # No connection after 1 min - connection.log.debug("[Cleanup] Connect timeout: %s" % idle) + connection.log("[Cleanup] Connect timeout: %s" % idle) connection.close() diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 242704b1..e5baa739 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -131,7 +131,7 @@ class ContentManager: # Create and sign a content.json # Return: The new content if filewrite = False - def sign(self, inner_path = "content.json", privatekey=None, filewrite=True): + def sign(self, inner_path = "content.json", privatekey=None, filewrite=True, update_changed_files=False): content = self.contents.get(inner_path) if not content: # Content not exits yet, load default one self.log.info("File %s not exits yet, loading default values..." % inner_path) @@ -146,6 +146,7 @@ class ContentManager: self.log.info("Opening site data directory: %s..." % directory) hashed_files = {} + changed_files = [inner_path] for root, dirs, files in os.walk(directory): for file_name in files: file_path = self.site.storage.getPath("%s/%s" % (root.strip("/"), file_name)) @@ -157,6 +158,14 @@ class ContentManager: sha512sum = CryptHash.sha512sum(file_path) # Calculate sha512 sum of file self.log.info("- %s (SHA512: %s)" % (file_inner_path, sha512sum)) hashed_files[file_inner_path] = {"sha512": sha512sum, "size": os.path.getsize(file_path)} + if file_inner_path in content["files"].keys() and hashed_files[file_inner_path]["sha512"] != content["files"][file_inner_path].get("sha512"): + changed_files.append(file_path) + + + self.log.debug("Changed files: %s" % changed_files) + if update_changed_files: + for file_path in changed_files: + self.site.storage.onUpdated(file_path) # Generate new content.json self.log.info("Adding timestamp and sha512sums to new content.json...") diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 9373f9fb..1f982cb7 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -63,7 +63,7 @@ class FileRequest: if params["inner_path"].endswith("content.json"): # Download every changed file from peer peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer - site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"])) # On complete publish to other peers + site.onComplete.once(lambda: site.publish(inner_path=params["inner_path"]), "publish_%s" % params["inner_path"]) # On complete publish to other peers gevent.spawn( lambda: site.downloadContent(params["inner_path"], peer=peer) ) # Load new content file and download changed files in new thread diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 65a303f4..e32ed4e5 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -129,6 +129,10 @@ class FileServer(ConnectionServer): for inner_path in site.bad_files: site.bad_files[inner_path] = 0 + # Retry failed files + if site.bad_files: + site.retryBadFiles() + # In passive mode keep 5 active peer connection to get the updates if self.port_opened == False: site.needConnections() @@ -152,7 +156,7 @@ class FileServer(ConnectionServer): # Bind and start serving sites def start(self, check_sites = True): - self.log = logging.getLogger(__name__) + self.log = logging.getLogger("FileServer") if config.debug: # Auto reload FileRequest on change diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index cc13daad..60a453fc 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -39,7 +39,9 @@ class Peer: try: self.connection = self.connection_server.getConnection(self.ip, self.port) except Exception, err: + self.onConnectionError() self.log.debug("Getting connection error: %s (connection_error: %s, hash_failed: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed)) + self.connection = None def __str__(self): return "Peer %-12s" % self.ip diff --git a/src/Site/Site.py b/src/Site/Site.py index e196009f..a2fbd5f2 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -94,8 +94,6 @@ class Site: - - # Download all file from content.json @util.Noparallel(blocking=True) def downloadContent(self, inner_path, download_files=True, peer=None): @@ -141,6 +139,12 @@ class Site: return [bad_file for bad_file, retry in self.bad_files.iteritems() if retry < 3] + # Retry download bad files + def retryBadFiles(self): + for bad_file in self.bad_files.keys(): + self.needFile(bad_file, update=True, blocking=False) + + # Download all files of the site @util.Noparallel(blocking=False) def download(self, check_size=False): @@ -181,11 +185,11 @@ class Site: # Publish worker - def publisher(self, inner_path, peers, published, limit, event_done): + def publisher(self, inner_path, peers, published, limit, event_done=None): timeout = 5+int(self.storage.getSize(inner_path)/1024) # Timeout: 5sec + size in kb while 1: if not peers or len(published) >= limit: - event_done.set(True) + if event_done: event_done.set(True) break # All peers done, or published engouht peer = peers.pop(0) result = {"exception": "Timeout"} @@ -207,7 +211,7 @@ class Site: published.append(peer) self.log.info("[OK] %s: %s" % (peer.key, result["ok"])) else: - self.log.info("[ERROR] %s: %s" % (peer.key, result)) + self.log.info("[FAILED] %s: %s" % (peer.key, result)) # Update content.json on peers diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py index db98359d..157e5648 100644 --- a/src/Site/SiteStorage.py +++ b/src/Site/SiteStorage.py @@ -128,7 +128,12 @@ class SiteStorage: else: # Simple string open(file_path, "wb").write(content) del content + self.onUpdated(inner_path) + + # Site content updated + def onUpdated(self, inner_path): + file_path = self.getPath(inner_path) # Update Sql cache if inner_path == "dbschema.json": self.has_db = self.isFile("dbschema.json") @@ -138,6 +143,7 @@ class SiteStorage: self.getDb().loadJson(file_path) + # Load and parse json file def loadJson(self, inner_path): return json.load(self.open(inner_path)) @@ -197,7 +203,7 @@ class SiteStorage: ok = self.site.content_manager.verifyFile(file_inner_path, open(file_path, "rb")) if not ok: - self.log.error("[ERROR] %s" % file_inner_path) + self.log.debug("[CHNAGED] %s" % file_inner_path) bad_files.append(file_inner_path) self.log.debug("%s verified: %s files, quick_check: %s, bad files: %s" % (content_inner_path, len(content["files"]), quick_check, bad_files)) diff --git a/src/main.py b/src/main.py index 99dfd0a6..0acfbfb5 100644 --- a/src/main.py +++ b/src/main.py @@ -119,7 +119,7 @@ def siteSign(address, privatekey=None, inner_path="content.json"): if not privatekey: # If no privatekey in args then ask it now import getpass privatekey = getpass.getpass("Private key (input hidden):") - site.content_manager.sign(inner_path=inner_path, privatekey=privatekey) + site.content_manager.sign(inner_path=inner_path, privatekey=privatekey, update_changed_files=True) def siteVerify(address): @@ -199,8 +199,11 @@ def sitePublish(address, peer_ip=None, peer_port=15441, inner_path="content.json logging.info("Gathering peers from tracker") site.announce() # Gather peers site.publish(20, inner_path) # Push to 20 peers - logging.info("Serving files....") + time.sleep(1) + logging.info("Serving files...") gevent.joinall([file_server_thread]) + logging.info("Done.") + # Crypto commands diff --git a/src/util/Event.py b/src/util/Event.py index 4b4e7c96..ae7d0a53 100644 --- a/src/util/Event.py +++ b/src/util/Event.py @@ -12,9 +12,16 @@ class Event(list): return "Event(%s)" % list.__repr__(self) - def once(self, func): + def once(self, func, name=None): func.once = True - self.append(func) + func.name = None + if name: # Dont function with same name twice + names = [f.name for f in self] + if name not in names: + func.name = name + self.append(func) + else: + self.append(func) return self @@ -23,6 +30,9 @@ if __name__ == "__main__": print "%s Say: %s" % (pre, text) onChanged = Event() onChanged.once(lambda pre: say(pre, "once")) + onChanged.once(lambda pre: say(pre, "once")) + onChanged.once(lambda pre: say(pre, "namedonce"), "namedonce") + onChanged.once(lambda pre: say(pre, "namedonce"), "namedonce") onChanged.append(lambda pre: say(pre, "always")) onChanged("#1") onChanged("#2")