site connection stats, msgpack unpacker stats, make sure we dont skip any namecoin blocks, no more sha1 hash to content.json, keep 5 open connection in passive mode, publish got content to 5 peers, upnp retry 3 times, keep connection loggers
This commit is contained in:
parent
2491814070
commit
9c5176a8cb
7 changed files with 70 additions and 11 deletions
|
@ -64,7 +64,8 @@ class UiRequestPlugin(object):
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
yield "Connections (%s):<br>" % len(main.file_server.connections)
|
# Connections
|
||||||
|
yield "<b>Connections</b> (%s):<br>" % len(main.file_server.connections)
|
||||||
yield "<table><tr> <th>id</th> <th>protocol</th> <th>type</th> <th>ip</th> <th>ping</th> <th>buff</th>"
|
yield "<table><tr> <th>id</th> <th>protocol</th> <th>type</th> <th>ip</th> <th>ping</th> <th>buff</th>"
|
||||||
yield "<th>idle</th> <th>open</th> <th>delay</th> <th>sent</th> <th>received</th> <th>last sent</th> <th>waiting</th> <th>version</th> <th>peerid</th> </tr>"
|
yield "<th>idle</th> <th>open</th> <th>delay</th> <th>sent</th> <th>received</th> <th>last sent</th> <th>waiting</th> <th>version</th> <th>peerid</th> </tr>"
|
||||||
for connection in main.file_server.connections:
|
for connection in main.file_server.connections:
|
||||||
|
@ -87,6 +88,25 @@ class UiRequestPlugin(object):
|
||||||
])
|
])
|
||||||
yield "</table>"
|
yield "</table>"
|
||||||
|
|
||||||
|
|
||||||
|
# Sites
|
||||||
|
yield "<br><br><b>Sites</b>:"
|
||||||
|
yield "<table>"
|
||||||
|
yield "<tr><th>address</th> <th>peers</th> <th colspan=2>connected</th> <th>content.json</th> </tr>"
|
||||||
|
for site in self.server.sites.values():
|
||||||
|
yield self.formatTableRow([
|
||||||
|
("%s", site.address),
|
||||||
|
("%s", len(site.peers)),
|
||||||
|
("%s", len([peer for peer in site.peers.values() if peer.connection and peer.connection.connected])),
|
||||||
|
("%s", [peer.connection.id for peer in site.peers.values() if peer.connection and peer.connection.connected]),
|
||||||
|
("%s", len(site.content_manager.contents)),
|
||||||
|
])
|
||||||
|
yield "</table>"
|
||||||
|
|
||||||
|
|
||||||
|
# Objects
|
||||||
|
yield "<br><br><b>Objects in memory:</b><br>"
|
||||||
|
|
||||||
from greenlet import greenlet
|
from greenlet import greenlet
|
||||||
objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
|
objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
|
||||||
yield "<br>Greenlets (%s):<br>" % len(objs)
|
yield "<br>Greenlets (%s):<br>" % len(objs)
|
||||||
|
@ -108,6 +128,13 @@ class UiRequestPlugin(object):
|
||||||
yield " - %.3fkb: %s<br>" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj)))
|
yield " - %.3fkb: %s<br>" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj)))
|
||||||
|
|
||||||
|
|
||||||
|
from msgpack import Unpacker
|
||||||
|
objs = [obj for obj in gc.get_objects() if isinstance(obj, Unpacker)]
|
||||||
|
yield "<br>Msgpack unpacker (%s):<br>" % len(objs)
|
||||||
|
for obj in objs:
|
||||||
|
yield " - %.3fkb: %s<br>" % (self.getObjSize(obj, hpy), cgi.escape(repr(obj)))
|
||||||
|
|
||||||
|
|
||||||
from Site import Site
|
from Site import Site
|
||||||
objs = [obj for obj in gc.get_objects() if isinstance(obj, Site)]
|
objs = [obj for obj in gc.get_objects() if isinstance(obj, Site)]
|
||||||
yield "<br>Sites (%s):<br>" % len(objs)
|
yield "<br>Sites (%s):<br>" % len(objs)
|
||||||
|
|
|
@ -107,9 +107,10 @@ while 1:
|
||||||
rpc.waitforblock()
|
rpc.waitforblock()
|
||||||
break # Block found
|
break # Block found
|
||||||
except Exception, err: # Timeout
|
except Exception, err: # Timeout
|
||||||
pass
|
print "Exception", err
|
||||||
last_block = int(rpc.getinfo()["blocks"])
|
last_block = int(rpc.getinfo()["blocks"])
|
||||||
processBlock(last_block)
|
for block_id in range(config["lastprocessed"]+1, last_block+1):
|
||||||
|
processBlock(block_id)
|
||||||
|
|
||||||
config["lastprocessed"] = last_block
|
config["lastprocessed"] = last_block
|
||||||
open(config_path, "w").write(json.dumps(config, indent=2))
|
open(config_path, "w").write(json.dumps(config, indent=2))
|
||||||
|
|
|
@ -79,7 +79,6 @@ class Connection:
|
||||||
try:
|
try:
|
||||||
firstchar = sock.recv(1) # Find out if pure socket or zeromq
|
firstchar = sock.recv(1) # Find out if pure socket or zeromq
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
if self.log:
|
|
||||||
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
||||||
self.close()
|
self.close()
|
||||||
return False
|
return False
|
||||||
|
@ -111,7 +110,7 @@ class Connection:
|
||||||
try:
|
try:
|
||||||
if not firstchar: firstchar = sock.recv(1)
|
if not firstchar: firstchar = sock.recv(1)
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
if self.log: self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
||||||
self.close()
|
self.close()
|
||||||
return False
|
return False
|
||||||
if firstchar == "\xff": # Backward compatibility to zmq
|
if firstchar == "\xff": # Backward compatibility to zmq
|
||||||
|
@ -277,6 +276,7 @@ class Connection:
|
||||||
def close(self):
|
def close(self):
|
||||||
if self.closed: return False # Already closed
|
if self.closed: return False # Already closed
|
||||||
self.closed = True
|
self.closed = True
|
||||||
|
self.connected = False
|
||||||
self.event_connected.set(False)
|
self.event_connected.set(False)
|
||||||
|
|
||||||
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv))
|
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv))
|
||||||
|
@ -296,7 +296,6 @@ class Connection:
|
||||||
if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err))
|
if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err))
|
||||||
|
|
||||||
# Little cleanup
|
# Little cleanup
|
||||||
del self.log
|
|
||||||
del self.unpacker
|
del self.unpacker
|
||||||
del self.sock
|
del self.sock
|
||||||
self.unpacker = None
|
self.unpacker = None
|
||||||
|
|
|
@ -157,8 +157,6 @@ class ContentManager:
|
||||||
sha512sum = CryptHash.sha512sum(file_path) # Calculate sha512 sum of file
|
sha512sum = CryptHash.sha512sum(file_path) # Calculate sha512 sum of file
|
||||||
self.log.info("- %s (SHA512: %s)" % (file_inner_path, sha512sum))
|
self.log.info("- %s (SHA512: %s)" % (file_inner_path, sha512sum))
|
||||||
hashed_files[file_inner_path] = {"sha512": sha512sum, "size": os.path.getsize(file_path)}
|
hashed_files[file_inner_path] = {"sha512": sha512sum, "size": os.path.getsize(file_path)}
|
||||||
if inner_path == "content.json": # Backward compatibility to root conten.json
|
|
||||||
hashed_files[file_inner_path]["sha1"] = CryptHash.sha1sum(file_path)
|
|
||||||
|
|
||||||
# Generate new content.json
|
# Generate new content.json
|
||||||
self.log.info("Adding timestamp and sha512sums to new content.json...")
|
self.log.info("Adding timestamp and sha512sums to new content.json...")
|
||||||
|
|
|
@ -101,6 +101,8 @@ class FileServer(ConnectionServer):
|
||||||
if site.settings["serving"]:
|
if site.settings["serving"]:
|
||||||
site.announce() # Announce site to tracker
|
site.announce() # Announce site to tracker
|
||||||
site.update() # Update site's content.json and download changed files
|
site.update() # Update site's content.json and download changed files
|
||||||
|
if self.port_opened == False: # In passive mode keep 5 active peer connection to get the updates
|
||||||
|
site.needConnections()
|
||||||
|
|
||||||
|
|
||||||
# Check sites integrity
|
# Check sites integrity
|
||||||
|
@ -112,6 +114,7 @@ class FileServer(ConnectionServer):
|
||||||
for address, site in self.sites.items(): # Check sites integrity
|
for address, site in self.sites.items(): # Check sites integrity
|
||||||
gevent.spawn(self.checkSite, site) # Check in new thread
|
gevent.spawn(self.checkSite, site) # Check in new thread
|
||||||
time.sleep(2) # Prevent too quick request
|
time.sleep(2) # Prevent too quick request
|
||||||
|
site = None
|
||||||
|
|
||||||
|
|
||||||
# Announce sites every 20 min
|
# Announce sites every 20 min
|
||||||
|
@ -121,10 +124,19 @@ class FileServer(ConnectionServer):
|
||||||
for address, site in self.sites.items():
|
for address, site in self.sites.items():
|
||||||
if site.settings["serving"]:
|
if site.settings["serving"]:
|
||||||
site.announce() # Announce site to tracker
|
site.announce() # Announce site to tracker
|
||||||
for inner_path in site.bad_files: # Reset bad file retry counter
|
|
||||||
|
# Reset bad file retry counter
|
||||||
|
for inner_path in site.bad_files:
|
||||||
site.bad_files[inner_path] = 0
|
site.bad_files[inner_path] = 0
|
||||||
|
|
||||||
|
# In passive mode keep 5 active peer connection to get the updates
|
||||||
|
if self.port_opened == False:
|
||||||
|
site.needConnections()
|
||||||
|
|
||||||
time.sleep(2) # Prevent too quick request
|
time.sleep(2) # Prevent too quick request
|
||||||
|
|
||||||
|
site = None
|
||||||
|
|
||||||
|
|
||||||
# Detects if computer back from wakeup
|
# Detects if computer back from wakeup
|
||||||
def wakeupWatcher(self):
|
def wakeupWatcher(self):
|
||||||
|
|
|
@ -211,7 +211,7 @@ class Site:
|
||||||
|
|
||||||
|
|
||||||
# Update content.json on peers
|
# Update content.json on peers
|
||||||
def publish(self, limit=3, inner_path="content.json"):
|
def publish(self, limit=5, inner_path="content.json"):
|
||||||
self.log.info( "Publishing to %s/%s peers..." % (limit, len(self.peers)) )
|
self.log.info( "Publishing to %s/%s peers..." % (limit, len(self.peers)) )
|
||||||
published = [] # Successfuly published (Peer)
|
published = [] # Successfuly published (Peer)
|
||||||
publishers = [] # Publisher threads
|
publishers = [] # Publisher threads
|
||||||
|
@ -317,6 +317,27 @@ class Site:
|
||||||
self.log.error("Announced to %s trackers, failed" % len(SiteManager.TRACKERS))
|
self.log.error("Announced to %s trackers, failed" % len(SiteManager.TRACKERS))
|
||||||
|
|
||||||
|
|
||||||
|
# Need open connections
|
||||||
|
def needConnections(self):
|
||||||
|
need = min(len(self.peers)/2, 10) # Connect to half of total peers, but max 10
|
||||||
|
need = max(need, 5) # But minimum 5 peers
|
||||||
|
need = min(len(self.peers), need) # Max total peers
|
||||||
|
|
||||||
|
connected = 0
|
||||||
|
for peer in self.peers.values(): # Check current connected number
|
||||||
|
if peer.connection and peer.connection.connected:
|
||||||
|
connected += 1
|
||||||
|
|
||||||
|
self.log.debug("Need connections: %s, Current: %s, Total: %s" % (need, connected, len(self.peers)))
|
||||||
|
|
||||||
|
if connected < need: # Need more than we have
|
||||||
|
for peer in self.peers.values():
|
||||||
|
if not peer.connection or not peer.connection.connected: # No peer connection or disconnected
|
||||||
|
peer.connect()
|
||||||
|
if peer.connection and peer.connection.connected: connected += 1 # Successfully connected
|
||||||
|
if connected >= need: break
|
||||||
|
return connected
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# - Events -
|
# - Events -
|
||||||
|
|
|
@ -191,6 +191,7 @@ def open_port(port=15441, desc="UpnpPunch"):
|
||||||
|
|
||||||
local_ips = list(set(local_ips)) # Delete duplicates
|
local_ips = list(set(local_ips)) # Delete duplicates
|
||||||
logging.debug("Found local ips: %s" % local_ips)
|
logging.debug("Found local ips: %s" % local_ips)
|
||||||
|
local_ips = local_ips*3 # Retry every ip 3 times
|
||||||
|
|
||||||
for local_ip in local_ips:
|
for local_ip in local_ips:
|
||||||
logging.debug("Trying using local ip: %s" % local_ip)
|
logging.debug("Trying using local ip: %s" % local_ip)
|
||||||
|
|
Loading…
Reference in a new issue