Reason for connection closing
This commit is contained in:
parent
75100c335f
commit
bad2a79f05
5 changed files with 41 additions and 49 deletions
|
@ -79,7 +79,7 @@ class Connection(object):
|
||||||
def badAction(self, weight=1):
|
def badAction(self, weight=1):
|
||||||
self.bad_actions += weight
|
self.bad_actions += weight
|
||||||
if self.bad_actions > 40:
|
if self.bad_actions > 40:
|
||||||
self.close()
|
self.close("Too many bad actions")
|
||||||
elif self.bad_actions > 20:
|
elif self.bad_actions > 20:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
@ -166,7 +166,7 @@ class Connection(object):
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
if not self.closed:
|
if not self.closed:
|
||||||
self.log("Socket error: %s" % Debug.formatException(err))
|
self.log("Socket error: %s" % Debug.formatException(err))
|
||||||
self.close() # MessageLoop ended, close connection
|
self.close("MessageLoop ended") # MessageLoop ended, close connection
|
||||||
|
|
||||||
# My handshake info
|
# My handshake info
|
||||||
def getHandshakeInfo(self):
|
def getHandshakeInfo(self):
|
||||||
|
@ -257,8 +257,7 @@ class Connection(object):
|
||||||
self.sock_wrapped = True
|
self.sock_wrapped = True
|
||||||
|
|
||||||
if not self.sock_wrapped and self.cert_pin:
|
if not self.sock_wrapped and self.cert_pin:
|
||||||
self.log("Crypt connection error: Socket not encrypted, but certificate pin present")
|
self.close("Crypt connection error: Socket not encrypted, but certificate pin present")
|
||||||
self.close()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
self.setHandshake(message)
|
self.setHandshake(message)
|
||||||
|
@ -296,11 +295,10 @@ class Connection(object):
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
self.log("Crypt connection error: %s, adding peerid %s as broken ssl." % (err, message["params"]["peer_id"]))
|
self.log("Crypt connection error: %s, adding peerid %s as broken ssl." % (err, message["params"]["peer_id"]))
|
||||||
self.server.broken_ssl_peer_ids[message["params"]["peer_id"]] = True
|
self.server.broken_ssl_peer_ids[message["params"]["peer_id"]] = True
|
||||||
self.close()
|
self.close("Broken ssl")
|
||||||
|
|
||||||
if not self.sock_wrapped and self.cert_pin:
|
if not self.sock_wrapped and self.cert_pin:
|
||||||
self.log("Crypt connection error: Socket not encrypted, but certificate pin present")
|
self.close("Crypt connection error: Socket not encrypted, but certificate pin present")
|
||||||
self.close()
|
|
||||||
|
|
||||||
# Stream socket directly to a file
|
# Stream socket directly to a file
|
||||||
def handleStream(self, message):
|
def handleStream(self, message):
|
||||||
|
@ -367,8 +365,7 @@ class Connection(object):
|
||||||
self.server.bytes_sent += len(data)
|
self.server.bytes_sent += len(data)
|
||||||
self.sock.sendall(data)
|
self.sock.sendall(data)
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
self.log("Send errror: %s" % Debug.formatException(err))
|
self.close("Send errror: %s" % Debug.formatException(err))
|
||||||
self.close()
|
|
||||||
return False
|
return False
|
||||||
self.last_sent_time = time.time()
|
self.last_sent_time = time.time()
|
||||||
return True
|
return True
|
||||||
|
@ -393,8 +390,7 @@ class Connection(object):
|
||||||
def request(self, cmd, params={}, stream_to=None):
|
def request(self, cmd, params={}, stream_to=None):
|
||||||
# Last command sent more than 10 sec ago, timeout
|
# Last command sent more than 10 sec ago, timeout
|
||||||
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
|
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
|
||||||
self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time))
|
self.close("Request %s timeout: %.3fs" % (self.last_cmd, time.time() - self.last_send_time))
|
||||||
self.close()
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self.last_req_time = time.time()
|
self.last_req_time = time.time()
|
||||||
|
@ -424,7 +420,7 @@ class Connection(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Close connection
|
# Close connection
|
||||||
def close(self):
|
def close(self, reason="Unknown"):
|
||||||
if self.closed:
|
if self.closed:
|
||||||
return False # Already closed
|
return False # Already closed
|
||||||
self.closed = True
|
self.closed = True
|
||||||
|
@ -432,10 +428,9 @@ class Connection(object):
|
||||||
if self.event_connected:
|
if self.event_connected:
|
||||||
self.event_connected.set(False)
|
self.event_connected.set(False)
|
||||||
|
|
||||||
if config.debug_socket:
|
|
||||||
self.log(
|
self.log(
|
||||||
"Closing connection, waiting_requests: %s, buff: %s..." %
|
"Closing connection: %s, waiting_requests: %s, sites: %s, buff: %s..." %
|
||||||
(len(self.waiting_requests), self.incomplete_buff_recv)
|
(reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv)
|
||||||
)
|
)
|
||||||
for request in self.waiting_requests.values(): # Mark pending requests failed
|
for request in self.waiting_requests.values(): # Mark pending requests failed
|
||||||
request.set(False)
|
request.set(False)
|
||||||
|
|
|
@ -138,19 +138,17 @@ class ConnectionServer:
|
||||||
self.connections.append(connection)
|
self.connections.append(connection)
|
||||||
succ = connection.connect()
|
succ = connection.connect()
|
||||||
if not succ:
|
if not succ:
|
||||||
connection.close()
|
connection.close("Connection event return error")
|
||||||
raise Exception("Connection event return error")
|
raise Exception("Connection event return error")
|
||||||
|
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
connection.close("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||||
connection.close()
|
|
||||||
raise err
|
raise err
|
||||||
return connection
|
return connection
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def removeConnection(self, connection):
|
def removeConnection(self, connection):
|
||||||
self.log.debug("Removing %s..." % connection)
|
|
||||||
# Delete if same as in registry
|
# Delete if same as in registry
|
||||||
if self.ips.get(connection.ip) == connection:
|
if self.ips.get(connection.ip) == connection:
|
||||||
del self.ips[connection.ip]
|
del self.ips[connection.ip]
|
||||||
|
@ -182,42 +180,40 @@ class ConnectionServer:
|
||||||
connection.unpacker = None
|
connection.unpacker = None
|
||||||
|
|
||||||
elif connection.last_cmd == "announce" and idle > 20: # Bootstrapper connection close after 20 sec
|
elif connection.last_cmd == "announce" and idle > 20: # Bootstrapper connection close after 20 sec
|
||||||
connection.log("[Cleanup] Tracker connection: %s" % idle)
|
connection.close("[Cleanup] Tracker connection: %s" % idle)
|
||||||
connection.close()
|
|
||||||
|
|
||||||
if idle > 60 * 60:
|
if idle > 60 * 60:
|
||||||
# Wake up after 1h
|
# Wake up after 1h
|
||||||
connection.log("[Cleanup] After wakeup, idle: %s" % idle)
|
connection.close("[Cleanup] After wakeup, idle: %s" % idle)
|
||||||
connection.close()
|
|
||||||
|
|
||||||
elif idle > 20 * 60 and connection.last_send_time < time.time() - 10:
|
elif idle > 20 * 60 and connection.last_send_time < time.time() - 10:
|
||||||
# Idle more than 20 min and we have not sent request in last 10 sec
|
# Idle more than 20 min and we have not sent request in last 10 sec
|
||||||
if not connection.ping():
|
if not connection.ping():
|
||||||
connection.close()
|
connection.close("[Cleanup] Ping timeout")
|
||||||
|
|
||||||
elif idle > 10 and connection.incomplete_buff_recv > 0:
|
elif idle > 10 and connection.incomplete_buff_recv > 0:
|
||||||
# Incomplete data with more than 10 sec idle
|
# Incomplete data with more than 10 sec idle
|
||||||
connection.log("[Cleanup] Connection buff stalled")
|
connection.close("[Cleanup] Connection buff stalled")
|
||||||
connection.close()
|
|
||||||
|
|
||||||
elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10:
|
elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10:
|
||||||
# Sent command and no response in 10 sec
|
# Sent command and no response in 10 sec
|
||||||
connection.log(
|
connection.close(
|
||||||
"[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time)
|
"[Cleanup] Command %s timeout: %.3fs" % (connection.last_cmd, time.time() - connection.last_send_time)
|
||||||
)
|
)
|
||||||
connection.close()
|
|
||||||
|
|
||||||
elif idle > 60 and connection.protocol == "?": # No connection after 1 min
|
|
||||||
connection.log("[Cleanup] Connect timeout: %s" % idle)
|
connection.log("[Cleanup] Connect timeout: %s" % idle)
|
||||||
connection.close()
|
connection.close()
|
||||||
|
elif idle > 30 and connection.protocol == "?": # No connection after 30 sec
|
||||||
|
|
||||||
elif idle < 60 and connection.bad_actions > 40:
|
elif idle < 60 and connection.bad_actions > 40:
|
||||||
connection.log("[Cleanup] Too many bad actions: %s" % connection.bad_actions)
|
connection.close(
|
||||||
connection.close()
|
"[Cleanup] Too many bad actions: %s" % connection.bad_actions
|
||||||
|
)
|
||||||
|
|
||||||
elif idle > 5*60 and connection.sites == 0:
|
elif idle > 5*60 and connection.sites == 0:
|
||||||
connection.log("[Cleanup] No site for connection")
|
connection.close(
|
||||||
connection.close()
|
"[Cleanup] No site for connection"
|
||||||
|
)
|
||||||
|
|
||||||
elif run_i % 30 == 0:
|
elif run_i % 30 == 0:
|
||||||
# Reset bad action counter every 30 min
|
# Reset bad action counter every 30 min
|
||||||
|
|
|
@ -77,7 +77,7 @@ class FileRequest(object):
|
||||||
self.log.debug("Delay %s %s, cpu_time used by connection: %.3fs" % (self.connection.ip, cmd, self.connection.cpu_time))
|
self.log.debug("Delay %s %s, cpu_time used by connection: %.3fs" % (self.connection.ip, cmd, self.connection.cpu_time))
|
||||||
time.sleep(self.connection.cpu_time)
|
time.sleep(self.connection.cpu_time)
|
||||||
if self.connection.cpu_time > 5:
|
if self.connection.cpu_time > 5:
|
||||||
self.connection.close()
|
self.connection.close("Cpu time: %.3fs" % self.connection.cpu_time)
|
||||||
if func:
|
if func:
|
||||||
func(params)
|
func(params)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -63,7 +63,7 @@ class Peer(object):
|
||||||
def connect(self, connection=None):
|
def connect(self, connection=None):
|
||||||
if self.connection:
|
if self.connection:
|
||||||
self.log("Getting connection (Closing %s)..." % self.connection)
|
self.log("Getting connection (Closing %s)..." % self.connection)
|
||||||
self.connection.close()
|
self.connection.close("Connection change")
|
||||||
else:
|
else:
|
||||||
self.log("Getting connection...")
|
self.log("Getting connection...")
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ class Peer(object):
|
||||||
self.connection.sites += 1
|
self.connection.sites += 1
|
||||||
|
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
self.onConnectionError()
|
self.onConnectionError("Getting connection error")
|
||||||
self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" %
|
self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" %
|
||||||
(Debug.formatException(err), self.connection_error, self.hash_failed))
|
(Debug.formatException(err), self.connection_error, self.hash_failed))
|
||||||
self.connection = None
|
self.connection = None
|
||||||
|
@ -119,7 +119,7 @@ class Peer(object):
|
||||||
if not self.connection or self.connection.closed:
|
if not self.connection or self.connection.closed:
|
||||||
self.connect()
|
self.connect()
|
||||||
if not self.connection:
|
if not self.connection:
|
||||||
self.onConnectionError()
|
self.onConnectionError("Reconnect error")
|
||||||
return None # Connection failed
|
return None # Connection failed
|
||||||
|
|
||||||
self.log("Send request: %s %s" % (params.get("site", ""), cmd))
|
self.log("Send request: %s %s" % (params.get("site", ""), cmd))
|
||||||
|
@ -131,7 +131,8 @@ class Peer(object):
|
||||||
raise Exception("Send error")
|
raise Exception("Send error")
|
||||||
if "error" in res:
|
if "error" in res:
|
||||||
self.log("%s error: %s" % (cmd, res["error"]))
|
self.log("%s error: %s" % (cmd, res["error"]))
|
||||||
self.onConnectionError()
|
self.onConnectionError("Response error")
|
||||||
|
break
|
||||||
else: # Successful request, reset connection error num
|
else: # Successful request, reset connection error num
|
||||||
self.connection_error = 0
|
self.connection_error = 0
|
||||||
self.time_response = time.time()
|
self.time_response = time.time()
|
||||||
|
@ -141,7 +142,7 @@ class Peer(object):
|
||||||
self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
|
self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self.onConnectionError()
|
self.onConnectionError("Request error")
|
||||||
self.log(
|
self.log(
|
||||||
"%s (connection_error: %s, hash_failed: %s, retry: %s)" %
|
"%s (connection_error: %s, hash_failed: %s, retry: %s)" %
|
||||||
(Debug.formatException(err), self.connection_error, self.hash_failed, retry)
|
(Debug.formatException(err), self.connection_error, self.hash_failed, retry)
|
||||||
|
@ -222,7 +223,7 @@ class Peer(object):
|
||||||
response_time = time.time() - s
|
response_time = time.time() - s
|
||||||
break # All fine, exit from for loop
|
break # All fine, exit from for loop
|
||||||
# Timeout reached or bad response
|
# Timeout reached or bad response
|
||||||
self.onConnectionError()
|
self.onConnectionError("Ping timeout")
|
||||||
self.connect()
|
self.connect()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
@ -313,20 +314,20 @@ class Peer(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Stop and remove from site
|
# Stop and remove from site
|
||||||
def remove(self):
|
def remove(self, reason="Removing"):
|
||||||
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
||||||
if self.site and self.key in self.site.peers:
|
if self.site and self.key in self.site.peers:
|
||||||
del(self.site.peers[self.key])
|
del(self.site.peers[self.key])
|
||||||
if self.connection:
|
if self.connection:
|
||||||
self.connection.close()
|
self.connection.close(reason)
|
||||||
|
|
||||||
# - EVENTS -
|
# - EVENTS -
|
||||||
|
|
||||||
# On connection error
|
# On connection error
|
||||||
def onConnectionError(self):
|
def onConnectionError(self, reason="Unknown"):
|
||||||
self.connection_error += 1
|
self.connection_error += 1
|
||||||
if self.connection_error >= 3: # Dead peer
|
if self.connection_error >= 3: # Dead peer
|
||||||
self.remove()
|
self.remove("Peer connection: %s" % reason)
|
||||||
|
|
||||||
# Done working with peer
|
# Done working with peer
|
||||||
def onWorkerDone(self):
|
def onWorkerDone(self):
|
||||||
|
|
|
@ -467,7 +467,7 @@ class Site(object):
|
||||||
self.log.info("[OK] %s: %s %s/%s" % (peer.key, result["ok"], len(published), limit))
|
self.log.info("[OK] %s: %s %s/%s" % (peer.key, result["ok"], len(published), limit))
|
||||||
else:
|
else:
|
||||||
if result == {"exception": "Timeout"}:
|
if result == {"exception": "Timeout"}:
|
||||||
peer.onConnectionError()
|
peer.onConnectionError("Publish timeout")
|
||||||
self.log.info("[FAILED] %s: %s" % (peer.key, result))
|
self.log.info("[FAILED] %s: %s" % (peer.key, result))
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
@ -935,7 +935,7 @@ class Site(object):
|
||||||
if peer.connection and not peer.connection.connected:
|
if peer.connection and not peer.connection.connected:
|
||||||
peer.connection = None # Dead connection
|
peer.connection = None # Dead connection
|
||||||
if time.time() - peer.time_found > ttl: # Not found on tracker or via pex in last 4 hour
|
if time.time() - peer.time_found > ttl: # Not found on tracker or via pex in last 4 hour
|
||||||
peer.remove()
|
peer.remove("Time found expired")
|
||||||
removed += 1
|
removed += 1
|
||||||
if removed > len(peers) * 0.1: # Don't remove too much at once
|
if removed > len(peers) * 0.1: # Don't remove too much at once
|
||||||
break
|
break
|
||||||
|
|
Loading…
Reference in a new issue