rev134, Removed ZeroMQ dependency and support, GC after every stat page, GC call stat command, Streaming files directly to socket without msgpack overhead, Use listModified to query changed content.json files, Fix urllib memory leak onolder pythons, Fix security tests, Sitemanager testsuite, Announce on site resume, Site publish serves files max 60s
This commit is contained in:
parent
34b7cb0292
commit
eddf3eb8fc
14 changed files with 286 additions and 248 deletions
|
@ -3,17 +3,10 @@ from cStringIO import StringIO
|
|||
import gevent, msgpack
|
||||
from Config import config
|
||||
from Debug import Debug
|
||||
zmq = None
|
||||
if not config.disable_zeromq:
|
||||
try:
|
||||
import zmq.green as zmq
|
||||
except:
|
||||
zmq = None
|
||||
|
||||
|
||||
from util import StreamingMsgpack
|
||||
|
||||
class Connection(object):
|
||||
__slots__ = ("sock", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "connected", "event_connected", "closed", "zmq_sock", "zmq_queue", "zmq_working", "forward_thread", "start_time", "last_recv_time", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests")
|
||||
__slots__ = ("sock", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "connected", "event_connected", "closed", "start_time", "last_recv_time", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests")
|
||||
|
||||
def __init__(self, server, ip, port, sock=None):
|
||||
self.sock = sock
|
||||
|
@ -33,11 +26,6 @@ class Connection(object):
|
|||
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
|
||||
self.closed = False
|
||||
|
||||
self.zmq_sock = None # Zeromq sock if outgoing connection
|
||||
self.zmq_queue = [] # Messages queued to send
|
||||
self.zmq_working = False # Zmq currently working, just add to queue
|
||||
self.forward_thread = None # Zmq forwarder thread
|
||||
|
||||
# Stats
|
||||
self.start_time = time.time()
|
||||
self.last_recv_time = 0
|
||||
|
@ -82,103 +70,46 @@ class Connection(object):
|
|||
# Detect protocol
|
||||
self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()})
|
||||
gevent.spawn(self.messageLoop)
|
||||
return self.event_connected.get() # Wait for first char
|
||||
return self.event_connected.get() # Wait for handshake
|
||||
|
||||
|
||||
|
||||
# Handle incoming connection
|
||||
def handleIncomingConnection(self, sock):
|
||||
self.type = "in"
|
||||
try:
|
||||
firstchar = sock.recv(1) # Find out if pure socket or zeromq
|
||||
except Exception, err:
|
||||
self.log("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.close()
|
||||
return False
|
||||
if firstchar == "\xff": # Backward compatiblity: forward data to zmq
|
||||
if config.debug_socket: self.log("Fallback incoming connection to ZeroMQ")
|
||||
|
||||
self.protocol = "zeromq"
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol)
|
||||
|
||||
if self.server.zmq_running:
|
||||
zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
zmq_sock.connect(("127.0.0.1", self.server.zmq_port))
|
||||
zmq_sock.send(firstchar)
|
||||
|
||||
self.forward_thread = gevent.spawn(self.server.forward, self, zmq_sock, sock)
|
||||
self.server.forward(self, sock, zmq_sock)
|
||||
self.close() # Forward ended close connection
|
||||
else:
|
||||
self.log("ZeroMQ Server not running, exiting!")
|
||||
else: # Normal socket
|
||||
self.messageLoop(firstchar)
|
||||
self.messageLoop()
|
||||
|
||||
|
||||
# Message loop for connection
|
||||
def messageLoop(self, firstchar=None):
|
||||
def messageLoop(self):
|
||||
if not self.sock:
|
||||
self.log("Socket error: No socket found")
|
||||
return False
|
||||
self.protocol = "v2"
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
|
||||
self.unpacker = msgpack.Unpacker()
|
||||
sock = self.sock
|
||||
try:
|
||||
if not firstchar: firstchar = sock.recv(1)
|
||||
while True:
|
||||
buff = sock.recv(16*1024)
|
||||
if not buff: break # Connection closed
|
||||
self.last_recv_time = time.time()
|
||||
self.incomplete_buff_recv += 1
|
||||
self.bytes_recv += len(buff)
|
||||
self.server.bytes_recv += len(buff)
|
||||
if not self.unpacker:
|
||||
self.unpacker = msgpack.Unpacker()
|
||||
self.unpacker.feed(buff)
|
||||
for message in self.unpacker:
|
||||
self.incomplete_buff_recv = 0
|
||||
self.handleMessage(message)
|
||||
message = None
|
||||
buf = None
|
||||
except Exception, err:
|
||||
self.log("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.close()
|
||||
return False
|
||||
if firstchar == "\xff": # Backward compatibility to zmq
|
||||
self.sock.close() # Close normal socket
|
||||
del firstchar
|
||||
if zmq:
|
||||
if config.debug_socket: self.log("Connecting as ZeroMQ")
|
||||
self.protocol = "zeromq"
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
try:
|
||||
context = zmq.Context()
|
||||
zmq_sock = context.socket(zmq.REQ)
|
||||
zmq_sock.hwm = 1
|
||||
zmq_sock.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive
|
||||
zmq_sock.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send
|
||||
zmq_sock.setsockopt(zmq.LINGER, 500) # Wait for zmq_sock close
|
||||
zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port))
|
||||
self.zmq_sock = zmq_sock
|
||||
except Exception, err:
|
||||
if not self.closed: self.log("Socket error: %s" % Debug.formatException(err))
|
||||
else:
|
||||
return False # No zeromq connection supported
|
||||
else: # Normal socket
|
||||
self.protocol = "v2"
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
self.unpacker = msgpack.Unpacker()
|
||||
self.unpacker.feed(firstchar) # Feed the first char we already requested
|
||||
try:
|
||||
while True:
|
||||
buff = sock.recv(16*1024)
|
||||
if not buff: break # Connection closed
|
||||
self.last_recv_time = time.time()
|
||||
self.incomplete_buff_recv += 1
|
||||
self.bytes_recv += len(buff)
|
||||
self.server.bytes_recv += len(buff)
|
||||
if not self.unpacker:
|
||||
self.unpacker = msgpack.Unpacker()
|
||||
self.unpacker.feed(buff)
|
||||
for message in self.unpacker:
|
||||
self.incomplete_buff_recv = 0
|
||||
self.handleMessage(message)
|
||||
message = None
|
||||
buf = None
|
||||
except Exception, err:
|
||||
if not self.closed: self.log("Socket error: %s" % Debug.formatException(err))
|
||||
self.close() # MessageLoop ended, close connection
|
||||
if not self.closed: self.log("Socket error: %s" % Debug.formatException(err))
|
||||
self.close() # MessageLoop ended, close connection
|
||||
|
||||
|
||||
# My handshake info
|
||||
|
@ -209,6 +140,7 @@ class Connection(object):
|
|||
self.port = 0
|
||||
else:
|
||||
self.port = message["fileserver_port"] # Set peer fileserver port
|
||||
self.event_connected.set(True) # Mark handshake as done
|
||||
else:
|
||||
self.log("Unknown response: %s" % message)
|
||||
elif message.get("cmd"): # Handhsake request
|
||||
|
@ -218,6 +150,7 @@ class Connection(object):
|
|||
self.port = 0
|
||||
else:
|
||||
self.port = self.handshake["fileserver_port"] # Set peer fileserver port
|
||||
self.event_connected.set(True) # Mark handshake as done
|
||||
if config.debug_socket: self.log("Handshake request: %s" % message)
|
||||
data = self.handshakeInfo()
|
||||
data["cmd"] = "response"
|
||||
|
@ -234,26 +167,17 @@ class Connection(object):
|
|||
|
||||
|
||||
# Send data to connection
|
||||
def send(self, message):
|
||||
if config.debug_socket: self.log("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
def send(self, message, streaming=False):
|
||||
if config.debug_socket: self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), streaming, message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
self.last_send_time = time.time()
|
||||
if self.protocol == "zeromq":
|
||||
if self.zmq_sock: # Outgoing connection
|
||||
self.zmq_queue.append(message)
|
||||
if self.zmq_working:
|
||||
self.log("ZeroMQ already working...")
|
||||
return
|
||||
while self.zmq_queue:
|
||||
self.zmq_working = True
|
||||
message = self.zmq_queue.pop(0)
|
||||
self.zmq_sock.send(msgpack.packb(message))
|
||||
self.handleMessage(msgpack.unpackb(self.zmq_sock.recv()))
|
||||
self.zmq_working = False
|
||||
|
||||
else: # Incoming request
|
||||
self.server.zmq_sock.send(msgpack.packb(message))
|
||||
else: # Normal connection
|
||||
if streaming:
|
||||
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
|
||||
message = None
|
||||
self.bytes_sent += bytes_sent
|
||||
self.server.bytes_sent += bytes_sent
|
||||
else:
|
||||
data = msgpack.packb(message)
|
||||
message = None
|
||||
self.bytes_sent += len(data)
|
||||
self.server.bytes_sent += len(data)
|
||||
self.sock.sendall(data)
|
||||
|
@ -292,8 +216,6 @@ class Connection(object):
|
|||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
|
||||
# Close connection
|
||||
|
@ -309,10 +231,6 @@ class Connection(object):
|
|||
self.waiting_requests = {}
|
||||
self.server.removeConnection(self) # Remove connection from server registry
|
||||
try:
|
||||
if self.forward_thread:
|
||||
self.forward_thread.kill(exception=Debug.Notify("Closing connection"))
|
||||
if self.zmq_sock:
|
||||
self.zmq_sock.close()
|
||||
if self.sock:
|
||||
self.sock.shutdown(gevent.socket.SHUT_WR)
|
||||
self.sock.close()
|
||||
|
@ -320,7 +238,5 @@ class Connection(object):
|
|||
if config.debug_socket: self.log("Close error: %s" % err)
|
||||
|
||||
# Little cleanup
|
||||
del self.unpacker
|
||||
del self.sock
|
||||
self.sock = None
|
||||
self.unpacker = None
|
||||
|
|
|
@ -26,17 +26,12 @@ class ConnectionServer:
|
|||
self.bytes_recv = 0
|
||||
self.bytes_sent = 0
|
||||
|
||||
self.zmq_running = False
|
||||
self.zmq_last_connection = None # Last incoming message client
|
||||
|
||||
self.peer_id = "-ZN0"+config.version.replace(".", "")+"-"+''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12)) # Bittorrent style peerid
|
||||
|
||||
if port: # Listen server on a port
|
||||
self.zmq_port = port-1
|
||||
self.pool = Pool(1000) # do not accept more than 1000 connections
|
||||
self.stream_server = StreamServer((ip.replace("*", ""), port), self.handleIncomingConnection, spawn=self.pool, backlog=100)
|
||||
if request_handler: self.handleRequest = request_handler
|
||||
gevent.spawn(self.zmqServer) # Start ZeroMQ server for backward compatibility
|
||||
|
||||
|
||||
|
||||
|
@ -63,45 +58,48 @@ class ConnectionServer:
|
|||
|
||||
|
||||
|
||||
def getConnection(self, ip=None, port=None, peer_id=None):
|
||||
def getConnection(self, ip=None, port=None, peer_id=None, create=True):
|
||||
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
|
||||
connection = self.peer_ids.get(peer_id)
|
||||
if not connection.connected:
|
||||
if not connection.connected and create:
|
||||
succ = connection.event_connected.get() # Wait for connection
|
||||
if not succ: raise Exception("Connection event return error")
|
||||
return connection
|
||||
# Find connection by ip
|
||||
if ip in self.ips:
|
||||
connection = self.ips[ip]
|
||||
if not connection.connected:
|
||||
if not connection.connected and create:
|
||||
succ = connection.event_connected.get() # Wait for connection
|
||||
if not succ: raise Exception("Connection event return error")
|
||||
return connection
|
||||
# Recover from connection pool
|
||||
for connection in self.connections:
|
||||
if connection.ip == ip:
|
||||
if not connection.connected:
|
||||
if not connection.connected and create:
|
||||
succ = connection.event_connected.get() # Wait for connection
|
||||
if not succ: raise Exception("Connection event return error")
|
||||
return connection
|
||||
|
||||
# No connection found
|
||||
if port == 0:
|
||||
raise Exception("This peer is not connectable")
|
||||
try:
|
||||
connection = Connection(self, ip, port)
|
||||
self.ips[ip] = connection
|
||||
self.connections.append(connection)
|
||||
succ = connection.connect()
|
||||
if not succ:
|
||||
connection.close()
|
||||
raise Exception("Connection event return error")
|
||||
if create: # Allow to create new connection if not found
|
||||
if port == 0:
|
||||
raise Exception("This peer is not connectable")
|
||||
try:
|
||||
connection = Connection(self, ip, port)
|
||||
self.ips[ip] = connection
|
||||
self.connections.append(connection)
|
||||
succ = connection.connect()
|
||||
if not succ:
|
||||
connection.close()
|
||||
raise Exception("Connection event return error")
|
||||
|
||||
except Exception, err:
|
||||
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||
connection.close()
|
||||
raise err
|
||||
return connection
|
||||
except Exception, err:
|
||||
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||
connection.close()
|
||||
raise err
|
||||
return connection
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
@ -132,12 +130,8 @@ class ConnectionServer:
|
|||
connection.close()
|
||||
|
||||
elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec
|
||||
if connection.protocol == "zeromq":
|
||||
if idle > 50*60 and not connection.ping(): # Only ping every 50 sec
|
||||
connection.close()
|
||||
else:
|
||||
if not connection.ping(): # send ping request
|
||||
connection.close()
|
||||
if not connection.ping(): # send ping request
|
||||
connection.close()
|
||||
|
||||
elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle
|
||||
connection.log("[Cleanup] Connection buff stalled")
|
||||
|
@ -153,52 +147,6 @@ class ConnectionServer:
|
|||
|
||||
|
||||
|
||||
def zmqServer(self):
|
||||
if config.disable_zeromq:
|
||||
self.log.debug("ZeroMQ disabled by config")
|
||||
return False
|
||||
self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port)
|
||||
try:
|
||||
import zmq.green as zmq
|
||||
context = zmq.Context()
|
||||
self.zmq_sock = context.socket(zmq.REP)
|
||||
self.zmq_sock.bind("tcp://127.0.0.1:%s" % self.zmq_port)
|
||||
self.zmq_sock.hwm = 1
|
||||
self.zmq_sock.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive
|
||||
self.zmq_sock.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send
|
||||
self.zmq_running = True
|
||||
except Exception, err:
|
||||
self.log.debug("ZeroMQ start error: %s" % Debug.formatException(err))
|
||||
return False
|
||||
|
||||
while True:
|
||||
try:
|
||||
data = self.zmq_sock.recv()
|
||||
if not data: break
|
||||
message = msgpack.unpackb(data)
|
||||
self.zmq_last_connection.handleMessage(message)
|
||||
except Exception, err:
|
||||
self.log.debug("ZMQ Server error: %s" % Debug.formatException(err))
|
||||
self.zmq_sock.send(msgpack.packb({"error": "%s" % err}, use_bin_type=True))
|
||||
|
||||
|
||||
# Forward incoming data to other socket
|
||||
def forward(self, connection, source, dest):
|
||||
data = True
|
||||
try:
|
||||
while data:
|
||||
data = source.recv(16*1024)
|
||||
self.zmq_last_connection = connection
|
||||
if data:
|
||||
dest.sendall(data)
|
||||
else:
|
||||
source.shutdown(socket.SHUT_RD)
|
||||
dest.shutdown(socket.SHUT_WR)
|
||||
except Exception, err:
|
||||
self.log.debug("%s ZMQ forward error: %s" % (connection.ip, Debug.formatException(err)))
|
||||
connection.close()
|
||||
|
||||
|
||||
# -- TESTING --
|
||||
|
||||
def testCreateServer():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue