diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 72d3be22..218ec17a 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -64,7 +64,8 @@ class UiRequestPlugin(object): except Exception, err: pass - yield "Connections (%s):
" % len(main.file_server.connections) + # Connections + yield "Connections (%s):
" % len(main.file_server.connections) yield "" yield "" for connection in main.file_server.connections: @@ -87,6 +88,25 @@ class UiRequestPlugin(object): ]) yield "
id protocol type ip ping buffidle open delay sent received last sent waiting version peerid
" + + # Sites + yield "

Sites:" + yield "" + yield "" + for site in self.server.sites.values(): + yield self.formatTableRow([ + ("%s", site.address), + ("%s", len(site.peers)), + ("%s", len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected])), + ("%s", [peer.connection.id for peer in site.peers.values() if peer.connection and peer.connection.connected]), + ("%s", len(site.content_manager.contents)), + ]) + yield "
address peers connected content.json
" + + + # Objects + yield "

Objects in memory:
" + from greenlet import greenlet objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] yield "
Greenlets (%s):
" % len(objs) @@ -108,6 +128,13 @@ class UiRequestPlugin(object): yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + from msgpack import Unpacker + objs = [obj for obj in gc.get_objects() if isinstance(obj, Unpacker)] + yield "
Msgpack unpacker (%s):
" % len(objs) + for obj in objs: + yield " - %.3fkb: %s
" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj))) + + from Site import Site objs = [obj for obj in gc.get_objects() if isinstance(obj, Site)] yield "
Sites (%s):
" % len(objs) diff --git a/plugins/Zeroname/updater/zeroname_updater.py b/plugins/Zeroname/updater/zeroname_updater.py index f6df3d51..ef71b112 100644 --- a/plugins/Zeroname/updater/zeroname_updater.py +++ b/plugins/Zeroname/updater/zeroname_updater.py @@ -107,9 +107,10 @@ while 1: rpc.waitforblock() break # Block found except Exception, err: # Timeout - pass + print "Exception", err last_block = int(rpc.getinfo()["blocks"]) - processBlock(last_block) + for block_id in range(config["lastprocessed"]+1, last_block+1): + processBlock(block_id) config["lastprocessed"] = last_block open(config_path, "w").write(json.dumps(config, indent=2)) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index eb8f9282..e5f6d05c 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -79,8 +79,7 @@ class Connection: try: firstchar = sock.recv(1) # Find out if pure socket or zeromq except Exception, err: - if self.log: - self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) self.close() return False if firstchar == "\xff": # Backward compatiblity: forward data to zmq @@ -111,7 +110,7 @@ class Connection: try: if not firstchar: firstchar = sock.recv(1) except Exception, err: - if self.log: self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) + self.log.debug("Socket firstchar error: %s" % Debug.formatException(err)) self.close() return False if firstchar == "\xff": # Backward compatibility to zmq @@ -277,6 +276,7 @@ class Connection: def close(self): if self.closed: return False # Already closed self.closed = True + self.connected = False self.event_connected.set(False) if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv)) @@ -296,7 +296,6 @@ class Connection: if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err)) # Little cleanup - del self.log del self.unpacker del self.sock self.unpacker = None diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index 83b4e533..242704b1 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -157,8 +157,6 @@ class ContentManager: sha512sum = CryptHash.sha512sum(file_path) # Calculate sha512 sum of file self.log.info("- %s (SHA512: %s)" % (file_inner_path, sha512sum)) hashed_files[file_inner_path] = {"sha512": sha512sum, "size": os.path.getsize(file_path)} - if inner_path == "content.json": # Backward compatibility to root conten.json - hashed_files[file_inner_path]["sha1"] = CryptHash.sha1sum(file_path) # Generate new content.json self.log.info("Adding timestamp and sha512sums to new content.json...") diff --git a/src/File/FileServer.py b/src/File/FileServer.py index 0538c527..65a303f4 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -101,6 +101,8 @@ class FileServer(ConnectionServer): if site.settings["serving"]: site.announce() # Announce site to tracker site.update() # Update site's content.json and download changed files + if self.port_opened == False: # In passive mode keep 5 active peer connection to get the updates + site.needConnections() # Check sites integrity @@ -112,6 +114,7 @@ class FileServer(ConnectionServer): for address, site in self.sites.items(): # Check sites integrity gevent.spawn(self.checkSite, site) # Check in new thread time.sleep(2) # Prevent too quick request + site = None # Announce sites every 20 min @@ -121,10 +124,19 @@ class FileServer(ConnectionServer): for address, site in self.sites.items(): if site.settings["serving"]: site.announce() # Announce site to tracker - for inner_path in site.bad_files: # Reset bad file retry counter + + # Reset bad file retry counter + for inner_path in site.bad_files: site.bad_files[inner_path] = 0 + + # In passive mode keep 5 active peer connection to get the updates + if self.port_opened == False: + site.needConnections() + time.sleep(2) # Prevent too quick request + site = None + # Detects if computer back from wakeup def wakeupWatcher(self): diff --git a/src/Site/Site.py b/src/Site/Site.py index b0592b61..d6fa3cab 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -211,7 +211,7 @@ class Site: # Update content.json on peers - def publish(self, limit=3, inner_path="content.json"): + def publish(self, limit=5, inner_path="content.json"): self.log.info( "Publishing to %s/%s peers..." % (limit, len(self.peers)) ) published = [] # Successfuly published (Peer) publishers = [] # Publisher threads @@ -317,6 +317,27 @@ class Site: self.log.error("Announced to %s trackers, failed" % len(SiteManager.TRACKERS)) + # Need open connections + def needConnections(self): + need = min(len(self.peers)/2, 10) # Connect to half of total peers, but max 10 + need = max(need, 5) # But minimum 5 peers + need = min(len(self.peers), need) # Max total peers + + connected = 0 + for peer in self.peers.values(): # Check current connected number + if peer.connection and peer.connection.connected: + connected += 1 + + self.log.debug("Need connections: %s, Current: %s, Total: %s" % (need, connected, len(self.peers))) + + if connected < need: # Need more than we have + for peer in self.peers.values(): + if not peer.connection or not peer.connection.connected: # No peer connection or disconnected + peer.connect() + if peer.connection and peer.connection.connected: connected += 1 # Successfully connected + if connected >= need: break + return connected + # - Events - diff --git a/src/util/UpnpPunch.py b/src/util/UpnpPunch.py index 27608de1..2f86b689 100644 --- a/src/util/UpnpPunch.py +++ b/src/util/UpnpPunch.py @@ -191,6 +191,7 @@ def open_port(port=15441, desc="UpnpPunch"): local_ips = list(set(local_ips)) # Delete duplicates logging.debug("Found local ips: %s" % local_ips) + local_ips = local_ips*3 # Retry every ip 3 times for local_ip in local_ips: logging.debug("Trying using local ip: %s" % local_ip)