total object stat, proper zeronet updater timeout catch, connection using connectionserver logger, trigger sitestorage onupdated when signing new file, named once events, only publish once same updated files, retry bad files every 20 min, trigger connection error on failed connection
This commit is contained in:
parent
d361f66362
commit
67783bd494
12 changed files with 130 additions and 58 deletions
|
@ -24,7 +24,6 @@ class Connection:
|
|||
self.type = "?"
|
||||
|
||||
self.server = server
|
||||
self.log = logging.getLogger(str(self))
|
||||
self.unpacker = None # Stream incoming socket messages here
|
||||
self.req_id = 0 # Last request id
|
||||
self.handshake = {} # Handshake info got from peer
|
||||
|
@ -50,19 +49,31 @@ class Connection:
|
|||
self.last_req_time = 0
|
||||
self.last_cmd = None
|
||||
|
||||
self.name = None
|
||||
self.updateName()
|
||||
|
||||
self.waiting_requests = {} # Waiting sent requests
|
||||
|
||||
|
||||
def updateName(self):
|
||||
self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
|
||||
return self.name
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s>" % self.__str__()
|
||||
|
||||
|
||||
def log(self, text):
|
||||
self.server.log.debug("%s > %s" % (self.name, text))
|
||||
|
||||
|
||||
# Open connection to peer and wait for handshake
|
||||
def connect(self):
|
||||
self.log.debug("Connecting...")
|
||||
self.log("Connecting...")
|
||||
self.type = "out"
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.connect((self.ip, self.port))
|
||||
|
@ -79,14 +90,14 @@ class Connection:
|
|||
try:
|
||||
firstchar = sock.recv(1) # Find out if pure socket or zeromq
|
||||
except Exception, err:
|
||||
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.log("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.close()
|
||||
return False
|
||||
if firstchar == "\xff": # Backward compatiblity: forward data to zmq
|
||||
if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ")
|
||||
if config.debug_socket: self.log("Fallback incoming connection to ZeroMQ")
|
||||
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol)
|
||||
|
||||
|
@ -99,7 +110,7 @@ class Connection:
|
|||
self.server.forward(self, sock, zmq_sock)
|
||||
self.close() # Forward ended close connection
|
||||
else:
|
||||
self.config.debug("ZeroMQ Server not running, exiting!")
|
||||
self.log("ZeroMQ Server not running, exiting!")
|
||||
else: # Normal socket
|
||||
self.messageLoop(firstchar)
|
||||
|
||||
|
@ -110,16 +121,16 @@ class Connection:
|
|||
try:
|
||||
if not firstchar: firstchar = sock.recv(1)
|
||||
except Exception, err:
|
||||
self.log.debug("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.log("Socket firstchar error: %s" % Debug.formatException(err))
|
||||
self.close()
|
||||
return False
|
||||
if firstchar == "\xff": # Backward compatibility to zmq
|
||||
self.sock.close() # Close normal socket
|
||||
del firstchar
|
||||
if zmq:
|
||||
if config.debug_socket: self.log.debug("Connecting as ZeroMQ")
|
||||
if config.debug_socket: self.log("Connecting as ZeroMQ")
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
|
@ -133,12 +144,12 @@ class Connection:
|
|||
zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port))
|
||||
self.zmq_sock = zmq_sock
|
||||
except Exception, err:
|
||||
if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
if not self.closed: self.log("Socket error: %s" % Debug.formatException(err))
|
||||
else:
|
||||
return False # No zeromq connection supported
|
||||
else: # Normal socket
|
||||
self.protocol = "v2"
|
||||
self.log.name = str(self)
|
||||
self.updateName()
|
||||
self.connected = True
|
||||
self.event_connected.set(self.protocol) # Mark handshake as done
|
||||
|
||||
|
@ -152,7 +163,6 @@ class Connection:
|
|||
self.incomplete_buff_recv += 1
|
||||
self.bytes_recv += len(buff)
|
||||
if not self.unpacker:
|
||||
self.log.debug("Unpacker created")
|
||||
self.unpacker = msgpack.Unpacker()
|
||||
self.unpacker.feed(buff)
|
||||
for message in self.unpacker:
|
||||
|
@ -161,7 +171,7 @@ class Connection:
|
|||
message = None
|
||||
buf = None
|
||||
except Exception, err:
|
||||
if not self.closed: self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
if not self.closed: self.log("Socket error: %s" % Debug.formatException(err))
|
||||
self.close() # MessageLoop ended, close connection
|
||||
|
||||
|
||||
|
@ -184,17 +194,17 @@ class Connection:
|
|||
del self.waiting_requests[message["to"]]
|
||||
elif message["to"] == 0: # Other peers handshake
|
||||
ping = time.time()-self.start_time
|
||||
if config.debug_socket: self.log.debug("Got handshake response: %s, ping: %s" % (message, ping))
|
||||
if config.debug_socket: self.log("Got handshake response: %s, ping: %s" % (message, ping))
|
||||
self.last_ping_delay = ping
|
||||
self.handshake = message
|
||||
self.port = message["fileserver_port"] # Set peer fileserver port
|
||||
else:
|
||||
self.log.debug("Unknown response: %s" % message)
|
||||
self.log("Unknown response: %s" % message)
|
||||
elif message.get("cmd"): # Handhsake request
|
||||
if message["cmd"] == "handshake":
|
||||
self.handshake = message["params"]
|
||||
self.port = self.handshake["fileserver_port"] # Set peer fileserver port
|
||||
if config.debug_socket: self.log.debug("Handshake request: %s" % message)
|
||||
if config.debug_socket: self.log("Handshake request: %s" % message)
|
||||
data = self.handshakeInfo()
|
||||
data["cmd"] = "response"
|
||||
data["to"] = message["req_id"]
|
||||
|
@ -202,7 +212,7 @@ class Connection:
|
|||
else:
|
||||
self.server.handleRequest(self, message)
|
||||
else: # Old style response, no req_id definied
|
||||
if config.debug_socket: self.log.debug("Old style response, waiting: %s" % self.waiting_requests.keys())
|
||||
if config.debug_socket: self.log("Old style response, waiting: %s" % self.waiting_requests.keys())
|
||||
last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true
|
||||
self.waiting_requests[last_req_id].set(message)
|
||||
del self.waiting_requests[last_req_id] # Remove from waiting request
|
||||
|
@ -211,13 +221,13 @@ class Connection:
|
|||
|
||||
# Send data to connection
|
||||
def send(self, message):
|
||||
if config.debug_socket: self.log.debug("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
if config.debug_socket: self.log("Send: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
self.last_send_time = time.time()
|
||||
if self.protocol == "zeromq":
|
||||
if self.zmq_sock: # Outgoing connection
|
||||
self.zmq_queue.append(message)
|
||||
if self.zmq_working:
|
||||
self.log.debug("ZeroMQ already working...")
|
||||
self.log("ZeroMQ already working...")
|
||||
return
|
||||
while self.zmq_queue:
|
||||
self.zmq_working = True
|
||||
|
@ -233,14 +243,13 @@ class Connection:
|
|||
self.bytes_sent += len(data)
|
||||
self.sock.sendall(data)
|
||||
self.last_sent_time = time.time()
|
||||
if config.debug_socket: self.log.debug("Sent: %s, to: %s, site: %s, inner_path: %s, req_id: %s" % (message.get("cmd"), message.get("to"), message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), message.get("req_id")))
|
||||
return True
|
||||
|
||||
|
||||
# Create and send a request to peer
|
||||
def request(self, cmd, params={}):
|
||||
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: # Last command sent more than 10 sec ago, timeout
|
||||
self.log.debug("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time))
|
||||
self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time))
|
||||
self.close()
|
||||
return False
|
||||
|
||||
|
@ -262,7 +271,7 @@ class Connection:
|
|||
try:
|
||||
response = self.request("ping")
|
||||
except Exception, err:
|
||||
self.log.debug("Ping error: %s" % Debug.formatException(err))
|
||||
self.log("Ping error: %s" % Debug.formatException(err))
|
||||
if response and "body" in response and response["body"] == "Pong!":
|
||||
self.last_ping_delay = time.time()-s
|
||||
return True
|
||||
|
@ -279,7 +288,7 @@ class Connection:
|
|||
self.connected = False
|
||||
self.event_connected.set(False)
|
||||
|
||||
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv))
|
||||
if config.debug_socket: self.log("Closing connection, waiting_requests: %s, buff: %s..." % (len(self.waiting_requests), self.incomplete_buff_recv))
|
||||
for request in self.waiting_requests.values(): # Mark pending requests failed
|
||||
request.set(False)
|
||||
self.waiting_requests = {}
|
||||
|
@ -293,7 +302,7 @@ class Connection:
|
|||
self.sock.shutdown(gevent.socket.SHUT_WR)
|
||||
self.sock.close()
|
||||
except Exception, err:
|
||||
if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err))
|
||||
if config.debug_socket: self.log("Close error: %s" % Debug.formatException(err))
|
||||
|
||||
# Little cleanup
|
||||
del self.unpacker
|
||||
|
|
|
@ -66,7 +66,8 @@ class ConnectionServer:
|
|||
succ = connection.event_connected.get() # Wait for connection
|
||||
if not succ: raise Exception("Connection event return error")
|
||||
return connection
|
||||
if ip in self.ips: # Find connection by ip
|
||||
# Find connection by ip
|
||||
if ip in self.ips:
|
||||
connection = self.ips[ip]
|
||||
if not connection.connected:
|
||||
succ = connection.event_connected.get() # Wait for connection
|
||||
|
@ -87,7 +88,9 @@ class ConnectionServer:
|
|||
self.connections.append(connection)
|
||||
succ = connection.connect()
|
||||
if not succ:
|
||||
connection.close()
|
||||
raise Exception("Connection event return error")
|
||||
|
||||
except Exception, err:
|
||||
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||
connection.close()
|
||||
|
@ -97,6 +100,7 @@ class ConnectionServer:
|
|||
|
||||
|
||||
def removeConnection(self, connection):
|
||||
self.log.debug("Removing %s..." % connection)
|
||||
if self.ips.get(connection.ip) == connection: # Delete if same as in registry
|
||||
del self.ips[connection.ip]
|
||||
if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry
|
||||
|
@ -115,10 +119,10 @@ class ConnectionServer:
|
|||
if connection.unpacker and idle > 30: # Delete the unpacker if not needed
|
||||
del connection.unpacker
|
||||
connection.unpacker = None
|
||||
connection.log.debug("Unpacker deleted")
|
||||
connection.log("Unpacker deleted")
|
||||
|
||||
if idle > 60*60: # Wake up after 1h
|
||||
connection.log.debug("[Cleanup] After wakeup, idle: %s" % idle)
|
||||
connection.log("[Cleanup] After wakeup, idle: %s" % idle)
|
||||
connection.close()
|
||||
|
||||
elif idle > 20*60 and connection.last_send_time < time.time()-10: # Idle more than 20 min and we not send request in last 10 sec
|
||||
|
@ -130,15 +134,15 @@ class ConnectionServer:
|
|||
connection.close()
|
||||
|
||||
elif idle > 10 and connection.incomplete_buff_recv > 0: # Incompelte data with more than 10 sec idle
|
||||
connection.log.debug("[Cleanup] Connection buff stalled")
|
||||
connection.log("[Cleanup] Connection buff stalled")
|
||||
connection.close()
|
||||
|
||||
elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10: # Sent command and no response in 10 sec
|
||||
connection.log.debug("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time))
|
||||
connection.log("[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time))
|
||||
connection.close()
|
||||
|
||||
elif idle > 60 and connection.protocol == "?": # No connection after 1 min
|
||||
connection.log.debug("[Cleanup] Connect timeout: %s" % idle)
|
||||
connection.log("[Cleanup] Connect timeout: %s" % idle)
|
||||
connection.close()
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue