more detailed stats, memory optimalizations, connection pinging and timeout, request timeout, validate content after signing, only recompile changed coffeescripts, remove unnecessary js logs
This commit is contained in:
parent
bd7e76628b
commit
b35d21d643
13 changed files with 222 additions and 59 deletions
|
@ -17,12 +17,14 @@ class Connection:
|
|||
self.id = server.last_connection_id
|
||||
server.last_connection_id += 1
|
||||
self.protocol = "?"
|
||||
self.type = "?"
|
||||
|
||||
self.server = server
|
||||
self.log = logging.getLogger(str(self))
|
||||
self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here
|
||||
self.req_id = 0 # Last request id
|
||||
self.handshake = None # Handshake info got from peer
|
||||
self.handshake = {} # Handshake info got from peer
|
||||
self.connected = False
|
||||
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
|
||||
self.closed = False
|
||||
|
||||
|
@ -42,6 +44,7 @@ class Connection:
|
|||
self.bytes_sent = 0
|
||||
self.last_ping_delay = None
|
||||
self.last_req_time = 0
|
||||
self.last_cmd = None
|
||||
|
||||
self.waiting_requests = {} # Waiting sent requests
|
||||
|
||||
|
@ -56,6 +59,7 @@ class Connection:
|
|||
# Open connection to peer and wait for handshake
|
||||
def connect(self):
|
||||
self.log.debug("Connecting...")
|
||||
self.type = "out"
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.connect((self.ip, self.port))
|
||||
# Detect protocol
|
||||
|
@ -67,12 +71,14 @@ class Connection:
|
|||
|
||||
# Handle incoming connection
|
||||
def handleIncomingConnection(self, sock):
|
||||
self.type = "in"
|
||||
firstchar = sock.recv(1) # Find out if pure socket or zeromq
|
||||
if firstchar == "\xff": # Backward compatiblity: forward data to zmq
|
||||
if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ")
|
||||
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol)
|
||||
|
||||
if self.server.zmq_running:
|
||||
|
@ -100,10 +106,12 @@ class Connection:
|
|||
return False
|
||||
if firstchar == "\xff": # Backward compatibility to zmq
|
||||
self.sock.close() # Close normal socket
|
||||
del firstchar
|
||||
if zmq:
|
||||
if config.debug_socket: self.log.debug("Connecting as ZeroMQ")
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
try:
|
||||
|
@ -116,12 +124,13 @@ class Connection:
|
|||
zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port))
|
||||
self.zmq_sock = zmq_sock
|
||||
except Exception, err:
|
||||
self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
else:
|
||||
return False # No zeromq connection supported
|
||||
else: # Normal socket
|
||||
self.protocol = "v2"
|
||||
self.log.name = str(self)
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
unpacker = self.unpacker
|
||||
|
@ -137,8 +146,10 @@ class Connection:
|
|||
for message in unpacker:
|
||||
self.incomplete_buff_recv = 0
|
||||
self.handleMessage(message)
|
||||
message = None
|
||||
buf = None
|
||||
except Exception, err:
|
||||
self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
self.close() # MessageLoop ended, close connection
|
||||
|
||||
|
||||
|
@ -188,7 +199,7 @@ class Connection:
|
|||
|
||||
# Send data to connection
|
||||
def send(self, message):
|
||||
if config.debug_socket: self.log.debug("Send: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id")))
|
||||
if config.debug_socket: self.log.debug("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
self.last_send_time = time.time()
|
||||
if self.protocol == "zeromq":
|
||||
if self.zmq_sock: # Outgoing connection
|
||||
|
@ -210,27 +221,50 @@ class Connection:
|
|||
self.bytes_sent += len(data)
|
||||
self.sock.sendall(data)
|
||||
self.last_sent_time = time.time()
|
||||
if config.debug_socket: self.log.debug("Sent: %s, to: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("req_id")))
|
||||
if config.debug_socket: self.log.debug("Sent: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
return True
|
||||
|
||||
|
||||
# Create and send a request to peer
|
||||
def request(self, cmd, params={}):
|
||||
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: # Last command sent more than 10 sec ago, timeout
|
||||
self.log.debug("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time))
|
||||
self.close()
|
||||
return False
|
||||
|
||||
self.last_req_time = time.time()
|
||||
self.last_cmd = cmd
|
||||
self.req_id += 1
|
||||
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
|
||||
event = gevent.event.AsyncResult() # Create new event for response
|
||||
self.waiting_requests[self.req_id] = event
|
||||
self.send(data) # Send request
|
||||
res = event.get() # Wait until event solves
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def ping(self):
|
||||
s = time.time()
|
||||
response = None
|
||||
with gevent.Timeout(10.0, False):
|
||||
try:
|
||||
response = self.request("ping")
|
||||
except Exception, err:
|
||||
self.log.debug("Ping error: %s" % Debug.formatException(err))
|
||||
if response and "body" in response and response["body"] == "Pong!":
|
||||
self.last_ping_delay = time.time()-s
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
|
||||
# Close connection
|
||||
def close(self):
|
||||
if self.closed: return False # Already closed
|
||||
self.closed = True
|
||||
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s..." % len(self.waiting_requests))
|
||||
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv))
|
||||
for request in self.waiting_requests.values(): # Mark pending requests failed
|
||||
request.set(False)
|
||||
self.waiting_requests = {}
|
||||
|
@ -245,3 +279,8 @@ class Connection:
|
|||
self.sock.close()
|
||||
except Exception, err:
|
||||
if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err))
|
||||
|
||||
# Little cleanup
|
||||
del self.log
|
||||
del self.unpacker
|
||||
del self.sock
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from gevent.server import StreamServer
|
||||
from gevent.pool import Pool
|
||||
import socket, os, logging, random, string
|
||||
import socket, os, logging, random, string, time
|
||||
import gevent, msgpack
|
||||
import cStringIO as StringIO
|
||||
from Debug import Debug
|
||||
|
@ -20,6 +20,8 @@ class ConnectionServer:
|
|||
self.peer_ids = {} # Connections by peer_ids
|
||||
|
||||
self.running = True
|
||||
self.thread_checker = gevent.spawn(self.checkConnections)
|
||||
|
||||
self.zmq_running = False
|
||||
self.zmq_last_connection = None # Last incoming message client
|
||||
|
||||
|
@ -60,14 +62,20 @@ class ConnectionServer:
|
|||
def getConnection(self, ip=None, port=None, peer_id=None):
|
||||
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
|
||||
connection = self.peer_ids.get(peer_id)
|
||||
connection.event_connected.get() # Wait for connection
|
||||
if not connection.connected: connection.event_connected.get() # Wait for connection
|
||||
return connection
|
||||
if ip in self.ips: # Find connection by ip
|
||||
connection = self.ips[ip]
|
||||
connection.event_connected.get() # Wait for connection
|
||||
if not connection.connected: connection.event_connected.get() # Wait for connection
|
||||
return connection
|
||||
|
||||
# No connection found yet
|
||||
# Recover from connection pool
|
||||
for connection in self.connections:
|
||||
if connection.ip == ip:
|
||||
if not connection.connected: connection.event_connected.get() # Wait for connection
|
||||
return connection
|
||||
|
||||
# No connection found
|
||||
try:
|
||||
connection = Connection(self, ip, port)
|
||||
self.ips[ip] = connection
|
||||
|
@ -90,6 +98,33 @@ class ConnectionServer:
|
|||
self.connections.remove(connection)
|
||||
|
||||
|
||||
|
||||
def checkConnections(self):
|
||||
while self.running:
|
||||
time.sleep(60) # Sleep 1 min
|
||||
for connection in self.connections[:]: # Make a copy
|
||||
if connection.protocol == "zeromq": continue # No stat on ZeroMQ sockets
|
||||
idle = time.time() - max(connection.last_recv_time, connection.start_time)
|
||||
|
||||
if idle > 60*60: # Wake up after 1h
|
||||
connection.close()
|
||||
|
||||
elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec
|
||||
if connection.protocol == "?": connection.close() # Got no handshake response, close it
|
||||
else:
|
||||
if not connection.ping(): # send ping request
|
||||
connection.close()
|
||||
|
||||
elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle
|
||||
connection.log.debug("[Cleanup] Connection buff stalled, content: %s" % connection.u.read_bytes(1024))
|
||||
connection.close()
|
||||
|
||||
elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10: # Sent command and no response in 10 sec
|
||||
connection.log.debug("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time))
|
||||
connection.close()
|
||||
|
||||
|
||||
|
||||
def zmqServer(self):
|
||||
self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port)
|
||||
try:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue