more detailed connection statistics, first char recv bugfix, double connection bugfix, websocket send queue, loading screen hide bugfix on slow connection, disable user reload
This commit is contained in:
parent
31d4609a3b
commit
e8368a8da1
10 changed files with 131 additions and 56 deletions
|
@ -15,15 +15,15 @@ class Connection:
|
|||
self.port = port
|
||||
self.peer_id = None # Bittorrent style peer id (not used yet)
|
||||
self.id = server.last_connection_id
|
||||
self.protocol = "?"
|
||||
server.last_connection_id += 1
|
||||
self.protocol = "?"
|
||||
|
||||
self.server = server
|
||||
self.log = logging.getLogger(str(self))
|
||||
self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here
|
||||
self.req_id = 0 # Last request id
|
||||
self.handshake = None # Handshake info got from peer
|
||||
self.event_handshake = gevent.event.AsyncResult() # Solves on handshake received
|
||||
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
|
||||
self.closed = False
|
||||
|
||||
self.zmq_sock = None # Zeromq sock if outgoing connection
|
||||
|
@ -31,8 +31,19 @@ class Connection:
|
|||
self.zmq_working = False # Zmq currently working, just add to queue
|
||||
self.forward_thread = None # Zmq forwarder thread
|
||||
|
||||
# Stats
|
||||
self.start_time = time.time()
|
||||
self.last_recv_time = 0
|
||||
self.last_message_time = 0
|
||||
self.last_send_time = 0
|
||||
self.last_sent_time = 0
|
||||
self.incomplete_buff_recv = 0
|
||||
self.bytes_recv = 0
|
||||
self.bytes_sent = 0
|
||||
self.last_ping_delay = None
|
||||
self.last_req_time = 0
|
||||
|
||||
self.waiting_requests = {} # Waiting sent requests
|
||||
if not sock: self.connect() # Not an incoming connection, connect to peer
|
||||
|
||||
|
||||
def __str__(self):
|
||||
|
@ -44,12 +55,13 @@ class Connection:
|
|||
|
||||
# Open connection to peer and wait for handshake
|
||||
def connect(self):
|
||||
self.log.debug("Connecting...")
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.connect((self.ip, self.port))
|
||||
# Detect protocol
|
||||
self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()})
|
||||
gevent.spawn(self.messageLoop)
|
||||
return self.event_handshake.get() # Wait for handshake
|
||||
return self.event_connected.get() # Wait for first char
|
||||
|
||||
|
||||
|
||||
|
@ -61,7 +73,7 @@ class Connection:
|
|||
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.event_handshake.set(self.protocol)
|
||||
self.event_connected.set(self.protocol)
|
||||
|
||||
if self.server.zmq_running:
|
||||
zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
@ -80,14 +92,19 @@ class Connection:
|
|||
# Message loop for connection
|
||||
def messageLoop(self, firstchar=None):
|
||||
sock = self.sock
|
||||
if not firstchar: firstchar = sock.recv(1)
|
||||
try:
|
||||
if not firstchar: firstchar = sock.recv(1)
|
||||
except Exception, err:
|
||||
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.close()
|
||||
return False
|
||||
if firstchar == "\xff": # Backward compatibility to zmq
|
||||
self.sock.close() # Close normal socket
|
||||
if zmq:
|
||||
if config.debug_socket: self.log.debug("Connecting as ZeroMQ")
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.event_handshake.set(self.protocol) # Mark handshake as done
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
try:
|
||||
context = zmq.Context()
|
||||
|
@ -105,7 +122,7 @@ class Connection:
|
|||
else: # Normal socket
|
||||
self.protocol = "v2"
|
||||
self.log.name = str(self)
|
||||
self.event_handshake.set(self.protocol) # Mark handshake as done
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
unpacker = self.unpacker
|
||||
unpacker.feed(firstchar) # Feed the first char we already requested
|
||||
|
@ -113,31 +130,18 @@ class Connection:
|
|||
while True:
|
||||
buff = sock.recv(16*1024)
|
||||
if not buff: break # Connection closed
|
||||
self.last_recv_time = time.time()
|
||||
self.incomplete_buff_recv += 1
|
||||
self.bytes_recv += len(buff)
|
||||
unpacker.feed(buff)
|
||||
for message in unpacker:
|
||||
self.incomplete_buff_recv = 0
|
||||
self.handleMessage(message)
|
||||
except Exception, err:
|
||||
self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
self.close() # MessageLoop ended, close connection
|
||||
|
||||
|
||||
# Read one line (not used)
|
||||
def recvLine(self):
|
||||
sock = self.sock
|
||||
data = sock.recv(16*1024)
|
||||
if not data: return
|
||||
if not data.endswith("\n"): # Multipart, read until \n
|
||||
buff = StringIO()
|
||||
buff.write(data)
|
||||
while not data.endswith("\n"):
|
||||
data = sock.recv(16*1024)
|
||||
if not data: break
|
||||
buff.write(data)
|
||||
return buff.getvalue().strip("\n")
|
||||
|
||||
return data.strip("\n")
|
||||
|
||||
|
||||
# My handshake info
|
||||
def handshakeInfo(self):
|
||||
return {
|
||||
|
@ -150,12 +154,15 @@ class Connection:
|
|||
|
||||
# Handle incoming message
|
||||
def handleMessage(self, message):
|
||||
self.last_message_time = time.time()
|
||||
if message.get("cmd") == "response": # New style response
|
||||
if message["to"] in self.waiting_requests:
|
||||
self.waiting_requests[message["to"]].set(message) # Set the response to event
|
||||
del self.waiting_requests[message["to"]]
|
||||
elif message["to"] == 0: # Other peers handshake
|
||||
if config.debug_socket: self.log.debug("Got handshake response: %s" % message)
|
||||
ping = time.time()-self.start_time
|
||||
if config.debug_socket: self.log.debug("Got handshake response: %s, ping: %s" % (message, ping))
|
||||
self.last_ping_delay = ping
|
||||
self.handshake = message
|
||||
self.port = message["fileserver_port"] # Set peer fileserver port
|
||||
else:
|
||||
|
@ -180,29 +187,35 @@ class Connection:
|
|||
|
||||
|
||||
# Send data to connection
|
||||
def send(self, data):
|
||||
if config.debug_socket: self.log.debug("Send: %s" % data.get("cmd"))
|
||||
def send(self, message):
|
||||
if config.debug_socket: self.log.debug("Send: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id")))
|
||||
self.last_send_time = time.time()
|
||||
if self.protocol == "zeromq":
|
||||
if self.zmq_sock: # Outgoing connection
|
||||
self.zmq_queue.append(data)
|
||||
self.zmq_queue.append(message)
|
||||
if self.zmq_working:
|
||||
self.log.debug("ZeroMQ already working...")
|
||||
return
|
||||
while self.zmq_queue:
|
||||
self.zmq_working = True
|
||||
data = self.zmq_queue.pop(0)
|
||||
self.zmq_sock.send(msgpack.packb(data))
|
||||
message = self.zmq_queue.pop(0)
|
||||
self.zmq_sock.send(msgpack.packb(message))
|
||||
self.handleMessage(msgpack.unpackb(self.zmq_sock.recv()))
|
||||
self.zmq_working = False
|
||||
|
||||
else: # Incoming request
|
||||
self.server.zmq_sock.send(msgpack.packb(data))
|
||||
self.server.zmq_sock.send(msgpack.packb(message))
|
||||
else: # Normal connection
|
||||
self.sock.sendall(msgpack.packb(data))
|
||||
data = msgpack.packb(message)
|
||||
self.bytes_sent += len(data)
|
||||
self.sock.sendall(data)
|
||||
self.last_sent_time = time.time()
|
||||
if config.debug_socket: self.log.debug("Sent: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id")))
|
||||
|
||||
|
||||
# Create and send a request to peer
|
||||
def request(self, cmd, params={}):
|
||||
self.last_req_time = time.time()
|
||||
self.req_id += 1
|
||||
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
|
||||
event = gevent.event.AsyncResult() # Create new event for response
|
||||
|
|
|
@ -13,7 +13,7 @@ class ConnectionServer:
|
|||
self.ip = ip
|
||||
self.port = port
|
||||
self.last_connection_id = 1 # Connection id incrementer
|
||||
self.log = logging.getLogger(__name__)
|
||||
self.log = logging.getLogger("ConnServer")
|
||||
|
||||
self.connections = [] # Connections
|
||||
self.ips = {} # Connection by ip
|
||||
|
@ -57,18 +57,25 @@ class ConnectionServer:
|
|||
|
||||
|
||||
|
||||
def connect(self, ip=None, port=None, peer_id=None):
|
||||
def getConnection(self, ip=None, port=None, peer_id=None):
|
||||
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
|
||||
return self.peer_ids.get(peer_id)
|
||||
connection = self.peer_ids.get(peer_id)
|
||||
connection.event_connected.get() # Wait for connection
|
||||
return connection
|
||||
if ip in self.ips: # Find connection by ip
|
||||
return self.ips[ip]
|
||||
connection = self.ips[ip]
|
||||
connection.event_connected.get() # Wait for connection
|
||||
return connection
|
||||
|
||||
# No connection found yet
|
||||
try:
|
||||
connection = Connection(self, ip, port)
|
||||
self.ips[ip] = connection
|
||||
self.connections.append(connection)
|
||||
connection.connect()
|
||||
except Exception, err:
|
||||
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||
connection.close()
|
||||
raise err
|
||||
return connection
|
||||
|
||||
|
@ -77,10 +84,10 @@ class ConnectionServer:
|
|||
def removeConnection(self, connection):
|
||||
if self.ips.get(connection.ip) == connection: # Delete if same as in registry
|
||||
del self.ips[connection.ip]
|
||||
if connection in self.connections:
|
||||
self.connections.remove(connection)
|
||||
if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry
|
||||
del self.peer_ids[connection.peer_id]
|
||||
if connection in self.connections:
|
||||
self.connections.remove(connection)
|
||||
|
||||
|
||||
def zmqServer(self):
|
||||
|
@ -204,7 +211,7 @@ def testZmqSlowClient(num):
|
|||
|
||||
def testConnection():
|
||||
global server
|
||||
connection = server.connect("127.0.0.1", 1234)
|
||||
connection = server.getConnection("127.0.0.1", 1234)
|
||||
connection.send({"res": "Sending: Hello!"})
|
||||
print connection
|
||||
|
||||
|
|
|
@ -22,9 +22,9 @@ class FileServer(ConnectionServer):
|
|||
# Handle request to fileserver
|
||||
def handleRequest(self, connection, message):
|
||||
if "params" in message:
|
||||
self.log.debug("FileRequest: %s %s %s" % (message["cmd"], message["params"].get("site"), message["params"].get("inner_path")))
|
||||
self.log.debug("FileRequest: %s %s %s %s" % (str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path")))
|
||||
else:
|
||||
self.log.debug("FileRequest: %s" % req["cmd"])
|
||||
self.log.debug("FileRequest: %s %s" % (str(connection), req["cmd"]))
|
||||
req = FileRequest(self, connection)
|
||||
req.route(message["cmd"], message.get("req_id"), message.get("params"))
|
||||
|
||||
|
|
|
@ -28,15 +28,18 @@ class Peer:
|
|||
|
||||
# Connect to host
|
||||
def connect(self):
|
||||
if self.connection: self.connection.close()
|
||||
if not self.log: self.log = logging.getLogger("Peer:%s:%s %s" % (self.ip, self.port, self.site.address_short))
|
||||
if self.connection:
|
||||
self.log.debug("Getting connection (Closing %s)..." % self.connection)
|
||||
self.connection.close()
|
||||
else:
|
||||
self.log.debug("Getting connection...")
|
||||
self.connection = None
|
||||
if not self.log: self.log = logging.getLogger("Peer:%s:%s" % (self.ip, self.port))
|
||||
|
||||
self.log.debug("Connecting...")
|
||||
try:
|
||||
self.connection = self.connection_server.connect(self.ip, self.port)
|
||||
self.connection = self.connection_server.getConnection(self.ip, self.port)
|
||||
except Exception, err:
|
||||
self.log.debug("Connecting error: %s" % Debug.formatException(err))
|
||||
self.log.debug("Getting connection error: %s" % Debug.formatException(err))
|
||||
self.onConnectionError()
|
||||
|
||||
def __str__(self):
|
||||
|
@ -118,6 +121,7 @@ class Peer:
|
|||
break # All fine, exit from for loop
|
||||
# Timeout reached or bad response
|
||||
self.onConnectionError()
|
||||
self.connect()
|
||||
time.sleep(1)
|
||||
|
||||
if response_time:
|
||||
|
|
|
@ -416,6 +416,9 @@ class Site:
|
|||
self.needFile("content.json", update=True) # Force update to fix corrupt file
|
||||
self.content_manager.loadContent() # Reload content.json
|
||||
for content_inner_path, content in self.content_manager.contents.items():
|
||||
if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file
|
||||
self.log.error("[MISSING] %s" % content_inner_path)
|
||||
bad_files.append(content_inner_path)
|
||||
for file_relative_path in content["files"].keys():
|
||||
file_inner_path = self.content_manager.toDir(content_inner_path)+file_relative_path # Relative to content.json
|
||||
file_inner_path = file_inner_path.strip("/") # Strip leading /
|
||||
|
|
|
@ -276,22 +276,54 @@ class UiRequest:
|
|||
raise Exception("Here is your console")
|
||||
|
||||
|
||||
def formatTableRow(self, row):
|
||||
back = []
|
||||
for format, val in row:
|
||||
if val == None:
|
||||
formatted = "n/a"
|
||||
elif format == "since":
|
||||
if val:
|
||||
formatted = "%.0f" % (time.time()-val)
|
||||
else:
|
||||
formatted = "n/a"
|
||||
else:
|
||||
formatted = format % val
|
||||
back.append("<td>%s</td>" % formatted)
|
||||
return "<tr>%s</tr>" % "".join(back)
|
||||
|
||||
def actionStats(self):
|
||||
import gc, sys
|
||||
from greenlet import greenlet
|
||||
greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
|
||||
self.sendHeader()
|
||||
main = sys.modules["src.main"]
|
||||
yield """
|
||||
<style>
|
||||
* { font-family: monospace }
|
||||
table * { text-align: right; padding: 0px 10px }
|
||||
</style>
|
||||
"""
|
||||
|
||||
yield "<pre>"
|
||||
yield "Connections (%s):<br>" % len(main.file_server.connections)
|
||||
yield "<table><tr> <th>id</th> <th>protocol</th> <th>ip</th> <th>zmqs</th> <th>ping</th> <th>buff</th> <th>idle</th> <th>delay</th> <th>sent</th> <th>received</th> </tr>"
|
||||
for connection in main.file_server.connections:
|
||||
yield "%s: %s %s<br>" % (connection.protocol, connection.ip, connection.zmq_sock)
|
||||
yield self.formatTableRow([
|
||||
("%3d", connection.id),
|
||||
("%s", connection.protocol),
|
||||
("%s", connection.ip),
|
||||
("%s", bool(connection.zmq_sock)),
|
||||
("%6.3f", connection.last_ping_delay),
|
||||
("%s", connection.incomplete_buff_recv),
|
||||
("since", max(connection.last_send_time, connection.last_recv_time)),
|
||||
("%.3f", connection.last_sent_time-connection.last_send_time),
|
||||
("%.0fkB", connection.bytes_sent/1024),
|
||||
("%.0fkB", connection.bytes_recv/1024)
|
||||
])
|
||||
yield "</table>"
|
||||
|
||||
yield "Greenlets (%s):<br>" % len(greenlets)
|
||||
for thread in greenlets:
|
||||
yield " - %s<br>" % cgi.escape(repr(thread))
|
||||
yield "</pre>"
|
||||
|
||||
|
||||
# - Tests -
|
||||
|
|
|
@ -15,6 +15,8 @@ class UiWebsocket:
|
|||
self.next_message_id = 1
|
||||
self.waiting_cb = {} # Waiting for callback. Key: message_id, Value: function pointer
|
||||
self.channels = [] # Channels joined to
|
||||
self.sending = False # Currently sending to client
|
||||
self.send_queue = [] # Messages to send to client
|
||||
|
||||
|
||||
# Start listener loop
|
||||
|
@ -69,10 +71,16 @@ class UiWebsocket:
|
|||
def send(self, message, cb = None):
|
||||
message["id"] = self.next_message_id # Add message id to allow response
|
||||
self.next_message_id += 1
|
||||
if cb: # Callback after client responsed
|
||||
self.waiting_cb[message["id"]] = cb
|
||||
if self.sending: return # Already sending
|
||||
self.send_queue.append(message)
|
||||
try:
|
||||
self.ws.send(json.dumps(message))
|
||||
if cb: # Callback after client responsed
|
||||
self.waiting_cb[message["id"]] = cb
|
||||
while self.send_queue:
|
||||
self.sending = True
|
||||
message = self.send_queue.pop(0)
|
||||
self.ws.send(json.dumps(message))
|
||||
self.sending = False
|
||||
except Exception, err:
|
||||
self.log.debug("Websocket send error: %s" % Debug.formatException(err))
|
||||
|
||||
|
@ -177,7 +185,7 @@ class UiWebsocket:
|
|||
"next_size_limit": site.getNextSizeLimit(),
|
||||
"last_downloads": len(site.last_downloads),
|
||||
"peers": site.settings.get("peers", len(site.peers)),
|
||||
"tasks": len([task["inner_path"] for task in site.worker_manager.tasks]),
|
||||
"tasks": len(site.worker_manager.tasks),
|
||||
"content": content
|
||||
}
|
||||
if site.settings["serving"] and content: ret["peers"] += 1 # Add myself if serving
|
||||
|
|
|
@ -249,6 +249,10 @@ class Wrapper
|
|||
@ws.cmd "siteSetLimit", [site_info.next_size_limit], (res) =>
|
||||
@notifications.add("size_limit", "done", res, 5000)
|
||||
return false
|
||||
|
||||
if @loading.screen_visible and @inner_loaded and site_info.settings.size < site_info.size_limit*1024*1024 # Loading screen still visible, but inner loaded
|
||||
@loading.hideScreen()
|
||||
|
||||
@site_info = site_info
|
||||
|
||||
|
||||
|
|
|
@ -1057,6 +1057,9 @@ jQuery.extend( jQuery.easing,
|
|||
})(this));
|
||||
}
|
||||
}
|
||||
if (this.loading.screen_visible && this.inner_loaded && site_info.settings.size < site_info.size_limit * 1024 * 1024) {
|
||||
this.loading.hideScreen();
|
||||
}
|
||||
return this.site_info = site_info;
|
||||
};
|
||||
|
||||
|
|
|
@ -58,8 +58,9 @@ def getCurrent():
|
|||
|
||||
# Debug: Reload User.py
|
||||
def reload():
|
||||
import imp
|
||||
return False # Disabled
|
||||
"""import imp
|
||||
global users, User
|
||||
User = imp.load_source("User", "src/User/User.py").User # Reload source
|
||||
users.clear() # Remove all items
|
||||
load()
|
||||
load()"""
|
||||
|
|
Loading…
Reference in a new issue