Add new configuration option: simultaneous_connection_throttle_threshold
This commit is contained in:
parent
1863043505
commit
e000eae046
4 changed files with 45 additions and 1 deletions
|
@ -285,6 +285,8 @@ class Config(object):
|
||||||
|
|
||||||
self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False)
|
self.parser.add_argument('--expose_no_ownership', help='By default, ZeroNet tries checking updates for own sites more frequently. This can be used by a third party for revealing the network addresses of a site owner. If this option is enabled, ZeroNet performs the checks in the same way for any sites.', type='bool', choices=[True, False], default=False)
|
||||||
|
|
||||||
|
self.parser.add_argument('--simultaneous_connection_throttle_threshold', help='Throttle opening new connections when the number of outgoing connections in not fully established state exceeds the threshold.', default=15, type=int, metavar='simultaneous_connection_throttle_threshold')
|
||||||
|
|
||||||
self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip')
|
self.parser.add_argument('--fileserver_ip', help='FileServer bind address', default="*", metavar='ip')
|
||||||
self.parser.add_argument('--fileserver_port', help='FileServer bind port (0: randomize)', default=0, type=int, metavar='port')
|
self.parser.add_argument('--fileserver_port', help='FileServer bind port (0: randomize)', default=0, type=int, metavar='port')
|
||||||
self.parser.add_argument('--fileserver_port_range', help='FileServer randomization range', default="10000-40000", metavar='port')
|
self.parser.add_argument('--fileserver_port_range', help='FileServer randomization range', default="10000-40000", metavar='port')
|
||||||
|
|
|
@ -17,7 +17,7 @@ from util import helper
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
__slots__ = (
|
__slots__ = (
|
||||||
"sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type",
|
"sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type",
|
||||||
"handshake", "crypt", "connected", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection",
|
"handshake", "crypt", "connected", "connecting", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection",
|
||||||
"last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock",
|
"last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock",
|
||||||
"last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams"
|
"last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams"
|
||||||
)
|
)
|
||||||
|
@ -50,6 +50,7 @@ class Connection(object):
|
||||||
self.crypt = None # Connection encryption method
|
self.crypt = None # Connection encryption method
|
||||||
self.sock_wrapped = False # Socket wrapped to encryption
|
self.sock_wrapped = False # Socket wrapped to encryption
|
||||||
|
|
||||||
|
self.connecting = False
|
||||||
self.connected = False
|
self.connected = False
|
||||||
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
|
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
|
||||||
self.closed = False
|
self.closed = False
|
||||||
|
@ -118,6 +119,15 @@ class Connection(object):
|
||||||
|
|
||||||
# Open connection to peer and wait for handshake
|
# Open connection to peer and wait for handshake
|
||||||
def connect(self):
|
def connect(self):
|
||||||
|
self.connecting = True
|
||||||
|
try:
|
||||||
|
return self._connect()
|
||||||
|
except Exception as err:
|
||||||
|
self.connecting = False
|
||||||
|
self.connected = False
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _connect(self):
|
||||||
self.updateOnlineStatus(outgoing_activity=True)
|
self.updateOnlineStatus(outgoing_activity=True)
|
||||||
|
|
||||||
if not self.event_connected or self.event_connected.ready():
|
if not self.event_connected or self.event_connected.ready():
|
||||||
|
@ -236,6 +246,7 @@ class Connection(object):
|
||||||
self.protocol = "v2"
|
self.protocol = "v2"
|
||||||
self.updateName()
|
self.updateName()
|
||||||
self.connected = True
|
self.connected = True
|
||||||
|
self.connecting = False
|
||||||
buff_len = 0
|
buff_len = 0
|
||||||
req_len = 0
|
req_len = 0
|
||||||
self.unpacker_bytes = 0
|
self.unpacker_bytes = 0
|
||||||
|
@ -634,6 +645,7 @@ class Connection(object):
|
||||||
return False # Already closed
|
return False # Already closed
|
||||||
self.closed = True
|
self.closed = True
|
||||||
self.connected = False
|
self.connected = False
|
||||||
|
self.connecting = False
|
||||||
if self.event_connected:
|
if self.event_connected:
|
||||||
self.event_connected.set(False)
|
self.event_connected.set(False)
|
||||||
|
|
||||||
|
|
|
@ -298,6 +298,7 @@ class ConnectionServer(object):
|
||||||
raise Exception("This peer is blacklisted")
|
raise Exception("This peer is blacklisted")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
#self.log.info("Connection to: %s:%s", ip, port)
|
||||||
if has_per_site_onion: # Lock connection to site
|
if has_per_site_onion: # Lock connection to site
|
||||||
connection = Connection(self, ip, port, target_onion=site_onion, is_tracker_connection=is_tracker_connection)
|
connection = Connection(self, ip, port, target_onion=site_onion, is_tracker_connection=is_tracker_connection)
|
||||||
else:
|
else:
|
||||||
|
@ -312,6 +313,7 @@ class ConnectionServer(object):
|
||||||
raise Exception("Connection event return error")
|
raise Exception("Connection event return error")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
#self.log.info("Connection error (%s, %s): %s", ip, port, Debug.formatException(err))
|
||||||
connection.close("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
connection.close("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
@ -429,6 +431,21 @@ class ConnectionServer(object):
|
||||||
))
|
))
|
||||||
return num_closed
|
return num_closed
|
||||||
|
|
||||||
|
# Returns True if we should slow down opening new connections as at the moment
|
||||||
|
# there are too many connections being established and not connected completely
|
||||||
|
# (not entered the message loop yet).
|
||||||
|
def shouldThrottleNewConnections(self):
|
||||||
|
threshold = config.simultaneous_connection_throttle_threshold
|
||||||
|
if len(self.connections) <= threshold:
|
||||||
|
return False
|
||||||
|
nr_connections_being_established = 0
|
||||||
|
for connection in self.connections[:]: # Make a copy
|
||||||
|
if connection.connecting and not connection.connected and connection.type == "out":
|
||||||
|
nr_connections_being_established += 1
|
||||||
|
if nr_connections_being_established > threshold:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
# Internet outage detection
|
# Internet outage detection
|
||||||
def updateOnlineStatus(self, connection, outgoing_activity=False, successful_activity=False):
|
def updateOnlineStatus(self, connection, outgoing_activity=False, successful_activity=False):
|
||||||
|
|
||||||
|
|
|
@ -401,6 +401,9 @@ class FileServer(ConnectionServer):
|
||||||
self.sleep(1)
|
self.sleep(1)
|
||||||
self.waitForInternetOnline()
|
self.waitForInternetOnline()
|
||||||
|
|
||||||
|
while self.isActiveMode() and self.shouldThrottleNewConnections():
|
||||||
|
self.sleep(1)
|
||||||
|
|
||||||
if not self.isActiveMode():
|
if not self.isActiveMode():
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -463,6 +466,9 @@ class FileServer(ConnectionServer):
|
||||||
self.sleep(short_timeout)
|
self.sleep(short_timeout)
|
||||||
self.waitForInternetOnline()
|
self.waitForInternetOnline()
|
||||||
|
|
||||||
|
while self.isActiveMode() and self.shouldThrottleNewConnections():
|
||||||
|
self.sleep(1)
|
||||||
|
|
||||||
if not self.isActiveMode():
|
if not self.isActiveMode():
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -509,6 +515,9 @@ class FileServer(ConnectionServer):
|
||||||
while self.isActiveMode():
|
while self.isActiveMode():
|
||||||
self.sleep(long_timeout)
|
self.sleep(long_timeout)
|
||||||
|
|
||||||
|
while self.isActiveMode() and self.shouldThrottleNewConnections():
|
||||||
|
self.sleep(1)
|
||||||
|
|
||||||
if not self.isActiveMode():
|
if not self.isActiveMode():
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -591,6 +600,10 @@ class FileServer(ConnectionServer):
|
||||||
threshold = self.internet_outage_threshold / 2.0
|
threshold = self.internet_outage_threshold / 2.0
|
||||||
|
|
||||||
self.sleep(threshold / 2.0)
|
self.sleep(threshold / 2.0)
|
||||||
|
|
||||||
|
while self.isActiveMode() and self.shouldThrottleNewConnections():
|
||||||
|
self.sleep(1)
|
||||||
|
|
||||||
if not self.isActiveMode():
|
if not self.isActiveMode():
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue