diff --git a/src/Config.py b/src/Config.py index 15a0c87f..a9e05a6f 100644 --- a/src/Config.py +++ b/src/Config.py @@ -285,6 +285,8 @@ class Config(object): self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False) + self.parser.add_argument('--simultaneous_connection_throttle_threshold', help='Throttle opening new connections when the number of outgoing connections in not fully established state exceeds the threshold.', default=15, type=int, metavar='simultaneous_connection_throttle_threshold') + self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip') self.parser.add_argument('--fileserver_port', help='FileServer bind port (0: randomize)', default=0, type=int, metavar='port') self.parser.add_argument('--fileserver_port_range', help='FileServer randomization range', default="10000-40000", metavar='port') diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index c147ee35..44665b2f 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -17,7 +17,7 @@ from util import helper class Connection(object): __slots__ = ( "sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type", - "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection", + "handshake", "crypt", "connected", "connecting", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock", "last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams" ) @@ -50,6 +50,7 @@ class Connection(object): self.crypt = None # Connection encryption method self.sock_wrapped = False # Socket wrapped to encryption + self.connecting = False self.connected = False self.event_connected = gevent.event.AsyncResult() # Solves on handshake received self.closed = False @@ -118,6 +119,15 @@ class Connection(object): # Open connection to peer and wait for handshake def connect(self): + self.connecting = True + try: + return self._connect() + except Exception as err: + self.connecting = False + self.connected = False + raise + + def _connect(self): self.updateOnlineStatus(outgoing_activity=True) if not self.event_connected or self.event_connected.ready(): @@ -236,6 +246,7 @@ class Connection(object): self.protocol = "v2" self.updateName() self.connected = True + self.connecting = False buff_len = 0 req_len = 0 self.unpacker_bytes = 0 @@ -634,6 +645,7 @@ class Connection(object): return False # Already closed self.closed = True self.connected = False + self.connecting = False if self.event_connected: self.event_connected.set(False) diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 9f24e377..9c16b774 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -298,6 +298,7 @@ class ConnectionServer(object): raise Exception("This peer is blacklisted") try: + #self.log.info("Connection to: %s:%s", ip, port) if has_per_site_onion: # Lock connection to site connection = Connection(self, ip, port, target_onion=site_onion, is_tracker_connection=is_tracker_connection) else: @@ -312,6 +313,7 @@ class ConnectionServer(object): raise Exception("Connection event return error") except Exception as err: + #self.log.info("Connection error (%s, %s): %s", ip, port, Debug.formatException(err)) connection.close("%s Connect error: %s" % (ip, Debug.formatException(err))) raise err @@ -429,6 +431,21 @@ class ConnectionServer(object): )) return num_closed + # Returns True if we should slow down opening new connections as at the moment + # there are too many connections being established and not connected completely + # (not entered the message loop yet). + def shouldThrottleNewConnections(self): + threshold = config.simultaneous_connection_throttle_threshold + if len(self.connections) <= threshold: + return False + nr_connections_being_established = 0 + for connection in self.connections[:]: # Make a copy + if connection.connecting and not connection.connected and connection.type == "out": + nr_connections_being_established += 1 + if nr_connections_being_established > threshold: + return True + return False + # Internet outage detection def updateOnlineStatus(self, connection, outgoing_activity=False, successful_activity=False): diff --git a/src/File/FileServer.py b/src/File/FileServer.py index ac4b8c55..66cefd39 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -401,6 +401,9 @@ class FileServer(ConnectionServer): self.sleep(1) self.waitForInternetOnline() + while self.isActiveMode() and self.shouldThrottleNewConnections(): + self.sleep(1) + if not self.isActiveMode(): break @@ -463,6 +466,9 @@ class FileServer(ConnectionServer): self.sleep(short_timeout) self.waitForInternetOnline() + while self.isActiveMode() and self.shouldThrottleNewConnections(): + self.sleep(1) + if not self.isActiveMode(): break @@ -509,6 +515,9 @@ class FileServer(ConnectionServer): while self.isActiveMode(): self.sleep(long_timeout) + while self.isActiveMode() and self.shouldThrottleNewConnections(): + self.sleep(1) + if not self.isActiveMode(): break @@ -591,6 +600,10 @@ class FileServer(ConnectionServer): threshold = self.internet_outage_threshold / 2.0 self.sleep(threshold / 2.0) + + while self.isActiveMode() and self.shouldThrottleNewConnections(): + self.sleep(1) + if not self.isActiveMode(): break