Raise exception instead of using assert
This commit is contained in:
parent
80bfccd9d3
commit
ff32f822ba
7 changed files with 27 additions and 12 deletions
|
@ -2,7 +2,9 @@ import array
|
||||||
|
|
||||||
|
|
||||||
def packPiecefield(data):
|
def packPiecefield(data):
|
||||||
assert isinstance(data, bytes) or isinstance(data, bytearray)
|
if not isinstance(data, bytes) and not isinstance(data, bytearray):
|
||||||
|
raise Exception("Invalid data type: %s" % type(data))
|
||||||
|
|
||||||
res = []
|
res = []
|
||||||
if not data:
|
if not data:
|
||||||
return array.array("H", b"")
|
return array.array("H", b"")
|
||||||
|
@ -46,7 +48,9 @@ def unpackPiecefield(data):
|
||||||
|
|
||||||
|
|
||||||
def spliceBit(data, idx, bit):
|
def spliceBit(data, idx, bit):
|
||||||
assert bit == b"\x00" or bit == b"\x01"
|
if bit != b"\x00" and bit != b"\x01":
|
||||||
|
raise Exception("Invalid bit: %s" % bit)
|
||||||
|
|
||||||
if len(data) < idx:
|
if len(data) < idx:
|
||||||
data = data.ljust(idx + 1, b"\x00")
|
data = data.ljust(idx + 1, b"\x00")
|
||||||
return data[:idx] + bit + data[idx+ 1:]
|
return data[:idx] + bit + data[idx+ 1:]
|
||||||
|
@ -63,7 +67,8 @@ class BigfilePiecefield(Piecefield):
|
||||||
self.data = b""
|
self.data = b""
|
||||||
|
|
||||||
def frombytes(self, s):
|
def frombytes(self, s):
|
||||||
assert isinstance(s, bytes) or isinstance(s, bytearray)
|
if not isinstance(s, bytes) and not isinstance(s, bytearray):
|
||||||
|
raise Exception("Invalid type: %s" % type(s))
|
||||||
self.data = s
|
self.data = s
|
||||||
|
|
||||||
def tobytes(self):
|
def tobytes(self):
|
||||||
|
@ -91,7 +96,8 @@ class BigfilePiecefieldPacked(Piecefield):
|
||||||
self.data = b""
|
self.data = b""
|
||||||
|
|
||||||
def frombytes(self, data):
|
def frombytes(self, data):
|
||||||
assert isinstance(data, bytes) or isinstance(data, bytearray)
|
if not isinstance(data, bytes) and not isinstance(data, bytearray):
|
||||||
|
raise Exception("Invalid type: %s" % type(data))
|
||||||
self.data = packPiecefield(data).tobytes()
|
self.data = packPiecefield(data).tobytes()
|
||||||
|
|
||||||
def tobytes(self):
|
def tobytes(self):
|
||||||
|
|
|
@ -137,7 +137,9 @@ class UiWebsocketPlugin(object):
|
||||||
@PluginManager.registerTo("User")
|
@PluginManager.registerTo("User")
|
||||||
class UserPlugin(object):
|
class UserPlugin(object):
|
||||||
def getEncryptPrivatekey(self, address, param_index=0):
|
def getEncryptPrivatekey(self, address, param_index=0):
|
||||||
assert param_index >= 0 and param_index <= 1000
|
if param_index < 0 or param_index > 1000:
|
||||||
|
raise Exception("Param_index out of range")
|
||||||
|
|
||||||
site_data = self.getSiteData(address)
|
site_data = self.getSiteData(address)
|
||||||
|
|
||||||
if site_data.get("cert"): # Different privatekey for different cert provider
|
if site_data.get("cert"): # Different privatekey for different cert provider
|
||||||
|
@ -153,7 +155,9 @@ class UserPlugin(object):
|
||||||
return site_data["encrypt_privatekey_%s" % index]
|
return site_data["encrypt_privatekey_%s" % index]
|
||||||
|
|
||||||
def getEncryptPublickey(self, address, param_index=0):
|
def getEncryptPublickey(self, address, param_index=0):
|
||||||
assert param_index >= 0 and param_index <= 1000
|
if param_index < 0 or param_index > 1000:
|
||||||
|
raise Exception("Param_index out of range")
|
||||||
|
|
||||||
site_data = self.getSiteData(address)
|
site_data = self.getSiteData(address)
|
||||||
|
|
||||||
if site_data.get("cert"): # Different privatekey for different cert provider
|
if site_data.get("cert"): # Different privatekey for different cert provider
|
||||||
|
|
|
@ -13,7 +13,8 @@ class WsLogStreamer(logging.StreamHandler):
|
||||||
self.ui_websocket = ui_websocket
|
self.ui_websocket = ui_websocket
|
||||||
|
|
||||||
if filter:
|
if filter:
|
||||||
assert SafeRe.isSafePattern(filter)
|
if not SafeRe.isSafePattern(filter):
|
||||||
|
raise Exception("Not a safe prex pattern")
|
||||||
self.filter_re = re.compile(".*" + filter)
|
self.filter_re = re.compile(".*" + filter)
|
||||||
else:
|
else:
|
||||||
self.filter_re = None
|
self.filter_re = None
|
||||||
|
|
|
@ -52,7 +52,8 @@ class CryptConnectionManager:
|
||||||
sock_wrapped = ssl.wrap_socket(sock, ciphers=ciphers)
|
sock_wrapped = ssl.wrap_socket(sock, ciphers=ciphers)
|
||||||
if cert_pin:
|
if cert_pin:
|
||||||
cert_hash = hashlib.sha256(sock_wrapped.getpeercert(True)).hexdigest()
|
cert_hash = hashlib.sha256(sock_wrapped.getpeercert(True)).hexdigest()
|
||||||
assert cert_hash == cert_pin, "Socket certificate does not match (%s != %s)" % (cert_hash, cert_pin)
|
if cert_hash != cert_pin:
|
||||||
|
raise Exception("Socket certificate does not match (%s != %s)" % (cert_hash, cert_pin))
|
||||||
return sock_wrapped
|
return sock_wrapped
|
||||||
else:
|
else:
|
||||||
return sock
|
return sock
|
||||||
|
|
|
@ -33,7 +33,7 @@ class TestHelper:
|
||||||
with pytest.raises(socket.error):
|
with pytest.raises(socket.error):
|
||||||
helper.packAddress("999.1.1.1", 1)
|
helper.packAddress("999.1.1.1", 1)
|
||||||
|
|
||||||
with pytest.raises(AssertionError):
|
with pytest.raises(Exception):
|
||||||
helper.unpackAddress("X")
|
helper.unpackAddress("X")
|
||||||
|
|
||||||
def testGetDirname(self):
|
def testGetDirname(self):
|
||||||
|
|
|
@ -159,12 +159,14 @@ class TorManager(object):
|
||||||
else:
|
else:
|
||||||
res_auth = self.send("AUTHENTICATE", conn)
|
res_auth = self.send("AUTHENTICATE", conn)
|
||||||
|
|
||||||
assert "250 OK" in res_auth, "Authenticate error %s" % res_auth
|
if "250 OK" not in res_auth:
|
||||||
|
raise Exception("Authenticate error %s" % res_auth)
|
||||||
|
|
||||||
# Version 0.2.7.5 required because ADD_ONION support
|
# Version 0.2.7.5 required because ADD_ONION support
|
||||||
res_version = self.send("GETINFO version", conn)
|
res_version = self.send("GETINFO version", conn)
|
||||||
version = re.search(r'version=([0-9\.]+)', res_version).group(1)
|
version = re.search(r'version=([0-9\.]+)', res_version).group(1)
|
||||||
assert float(version.replace(".", "0", 2)) >= 207.5, "Tor version >=0.2.7.5 required, found: %s" % version
|
if float(version.replace(".", "0", 2)) < 207.5:
|
||||||
|
raise Exception("Tor version >=0.2.7.5 required, found: %s" % version)
|
||||||
|
|
||||||
self.setStatus("Connected (%s)" % res_auth)
|
self.setStatus("Connected (%s)" % res_auth)
|
||||||
self.event_started.set(True)
|
self.event_started.set(True)
|
||||||
|
|
|
@ -110,7 +110,8 @@ def unpackAddress(packed):
|
||||||
if len(packed) == 18:
|
if len(packed) == 18:
|
||||||
return socket.inet_ntop(socket.AF_INET6, packed[0:16]), struct.unpack_from("H", packed, 16)[0]
|
return socket.inet_ntop(socket.AF_INET6, packed[0:16]), struct.unpack_from("H", packed, 16)[0]
|
||||||
else:
|
else:
|
||||||
assert len(packed) == 6, "Invalid length ip4 packed address: %s" % len(packed)
|
if len(packed) != 6:
|
||||||
|
raise Exception("Invalid length ip4 packed address: %s" % len(packed))
|
||||||
return socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]
|
return socket.inet_ntoa(packed[0:4]), struct.unpack_from("H", packed, 4)[0]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue