diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 23fdf03c..bf37dde7 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -79,7 +79,7 @@ class Connection(object): def badAction(self, weight=1): self.bad_actions += weight if self.bad_actions > 40: - self.close() + self.close("Too many bad actions") elif self.bad_actions > 20: time.sleep(5) @@ -166,7 +166,7 @@ class Connection(object): except Exception, err: if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) - self.close() # MessageLoop ended, close connection + self.close("MessageLoop ended") # MessageLoop ended, close connection # My handshake info def getHandshakeInfo(self): @@ -257,8 +257,7 @@ class Connection(object): self.sock_wrapped = True if not self.sock_wrapped and self.cert_pin: - self.log("Crypt connection error: Socket not encrypted, but certificate pin present") - self.close() + self.close("Crypt connection error: Socket not encrypted, but certificate pin present") return self.setHandshake(message) @@ -296,11 +295,10 @@ class Connection(object): except Exception, err: self.log("Crypt connection error: %s, adding peerid %s as broken ssl." % (err, message["params"]["peer_id"])) self.server.broken_ssl_peer_ids[message["params"]["peer_id"]] = True - self.close() + self.close("Broken ssl") if not self.sock_wrapped and self.cert_pin: - self.log("Crypt connection error: Socket not encrypted, but certificate pin present") - self.close() + self.close("Crypt connection error: Socket not encrypted, but certificate pin present") # Stream socket directly to a file def handleStream(self, message): @@ -367,8 +365,7 @@ class Connection(object): self.server.bytes_sent += len(data) self.sock.sendall(data) except Exception, err: - self.log("Send errror: %s" % Debug.formatException(err)) - self.close() + self.close("Send errror: %s" % Debug.formatException(err)) return False self.last_sent_time = time.time() return True @@ -393,8 +390,7 @@ class Connection(object): def request(self, cmd, params={}, stream_to=None): # Last command sent more than 10 sec ago, timeout if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: - self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) - self.close() + self.close("Request %s timeout: %.3fs" % (self.last_cmd, time.time() - self.last_send_time)) return False self.last_req_time = time.time() @@ -424,7 +420,7 @@ class Connection(object): return False # Close connection - def close(self): + def close(self, reason="Unknown"): if self.closed: return False # Already closed self.closed = True @@ -432,11 +428,10 @@ class Connection(object): if self.event_connected: self.event_connected.set(False) - if config.debug_socket: - self.log( - "Closing connection, waiting_requests: %s, buff: %s..." % - (len(self.waiting_requests), self.incomplete_buff_recv) - ) + self.log( + "Closing connection: %s, waiting_requests: %s, sites: %s, buff: %s..." % + (reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv) + ) for request in self.waiting_requests.values(): # Mark pending requests failed request.set(False) self.waiting_requests = {} diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index fad9260b..f9d9c9da 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -138,19 +138,17 @@ class ConnectionServer: self.connections.append(connection) succ = connection.connect() if not succ: - connection.close() + connection.close("Connection event return error") raise Exception("Connection event return error") except Exception, err: - self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err))) - connection.close() + connection.close("%s Connect error: %s" % (ip, Debug.formatException(err))) raise err return connection else: return None def removeConnection(self, connection): - self.log.debug("Removing %s..." % connection) # Delete if same as in registry if self.ips.get(connection.ip) == connection: del self.ips[connection.ip] @@ -182,42 +180,40 @@ class ConnectionServer: connection.unpacker = None elif connection.last_cmd == "announce" and idle > 20: # Bootstrapper connection close after 20 sec - connection.log("[Cleanup] Tracker connection: %s" % idle) - connection.close() + connection.close("[Cleanup] Tracker connection: %s" % idle) if idle > 60 * 60: # Wake up after 1h - connection.log("[Cleanup] After wakeup, idle: %s" % idle) - connection.close() + connection.close("[Cleanup] After wakeup, idle: %s" % idle) elif idle > 20 * 60 and connection.last_send_time < time.time() - 10: # Idle more than 20 min and we have not sent request in last 10 sec if not connection.ping(): - connection.close() + connection.close("[Cleanup] Ping timeout") elif idle > 10 and connection.incomplete_buff_recv > 0: # Incomplete data with more than 10 sec idle - connection.log("[Cleanup] Connection buff stalled") - connection.close() + connection.close("[Cleanup] Connection buff stalled") elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10: # Sent command and no response in 10 sec - connection.log( - "[Cleanup] Command %s timeout: %s" % (connection.last_cmd, time.time() - connection.last_send_time) + connection.close( + "[Cleanup] Command %s timeout: %.3fs" % (connection.last_cmd, time.time() - connection.last_send_time) ) - connection.close() - elif idle > 60 and connection.protocol == "?": # No connection after 1 min connection.log("[Cleanup] Connect timeout: %s" % idle) connection.close() + elif idle > 30 and connection.protocol == "?": # No connection after 30 sec elif idle < 60 and connection.bad_actions > 40: - connection.log("[Cleanup] Too many bad actions: %s" % connection.bad_actions) - connection.close() + connection.close( + "[Cleanup] Too many bad actions: %s" % connection.bad_actions + ) elif idle > 5*60 and connection.sites == 0: - connection.log("[Cleanup] No site for connection") - connection.close() + connection.close( + "[Cleanup] No site for connection" + ) elif run_i % 30 == 0: # Reset bad action counter every 30 min diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 0ad03b77..4d5e6074 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -77,7 +77,7 @@ class FileRequest(object): self.log.debug("Delay %s %s, cpu_time used by connection: %.3fs" % (self.connection.ip, cmd, self.connection.cpu_time)) time.sleep(self.connection.cpu_time) if self.connection.cpu_time > 5: - self.connection.close() + self.connection.close("Cpu time: %.3fs" % self.connection.cpu_time) if func: func(params) else: diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index ad5cf8dd..e05bc346 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -63,7 +63,7 @@ class Peer(object): def connect(self, connection=None): if self.connection: self.log("Getting connection (Closing %s)..." % self.connection) - self.connection.close() + self.connection.close("Connection change") else: self.log("Getting connection...") @@ -83,7 +83,7 @@ class Peer(object): self.connection.sites += 1 except Exception, err: - self.onConnectionError() + self.onConnectionError("Getting connection error") self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed)) self.connection = None @@ -119,7 +119,7 @@ class Peer(object): if not self.connection or self.connection.closed: self.connect() if not self.connection: - self.onConnectionError() + self.onConnectionError("Reconnect error") return None # Connection failed self.log("Send request: %s %s" % (params.get("site", ""), cmd)) @@ -131,7 +131,8 @@ class Peer(object): raise Exception("Send error") if "error" in res: self.log("%s error: %s" % (cmd, res["error"])) - self.onConnectionError() + self.onConnectionError("Response error") + break else: # Successful request, reset connection error num self.connection_error = 0 self.time_response = time.time() @@ -141,7 +142,7 @@ class Peer(object): self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd)) break else: - self.onConnectionError() + self.onConnectionError("Request error") self.log( "%s (connection_error: %s, hash_failed: %s, retry: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed, retry) @@ -222,7 +223,7 @@ class Peer(object): response_time = time.time() - s break # All fine, exit from for loop # Timeout reached or bad response - self.onConnectionError() + self.onConnectionError("Ping timeout") self.connect() time.sleep(1) @@ -313,20 +314,20 @@ class Peer(object): return True # Stop and remove from site - def remove(self): + def remove(self, reason="Removing"): self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) if self.site and self.key in self.site.peers: del(self.site.peers[self.key]) if self.connection: - self.connection.close() + self.connection.close(reason) # - EVENTS - # On connection error - def onConnectionError(self): + def onConnectionError(self, reason="Unknown"): self.connection_error += 1 if self.connection_error >= 3: # Dead peer - self.remove() + self.remove("Peer connection: %s" % reason) # Done working with peer def onWorkerDone(self): diff --git a/src/Site/Site.py b/src/Site/Site.py index c627108a..f3b601fd 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -467,7 +467,7 @@ class Site(object): self.log.info("[OK] %s: %s %s/%s" % (peer.key, result["ok"], len(published), limit)) else: if result == {"exception": "Timeout"}: - peer.onConnectionError() + peer.onConnectionError("Publish timeout") self.log.info("[FAILED] %s: %s" % (peer.key, result)) time.sleep(0.01) @@ -935,7 +935,7 @@ class Site(object): if peer.connection and not peer.connection.connected: peer.connection = None # Dead connection if time.time() - peer.time_found > ttl: # Not found on tracker or via pex in last 4 hour - peer.remove() + peer.remove("Time found expired") removed += 1 if removed > len(peers) * 0.1: # Don't remove too much at once break