diff --git a/CHANGELOG.md b/CHANGELOG.md index 172f5f7b..edfcada8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - fix chromium compatibility (@caryoscelus) - better fix of local sites leak (@caryoscelus) - ipython-based repl via --repl for debug/interactive development (@caryoscelus) +- optional blocking of compromised id certificates for spam protection (@caryoscelus) - various improvements ### zeronet-conservancy 0.7.10 (2023-07-26) (18d35d3bed4f0683e99) diff --git a/src/Config.py b/src/Config.py index ba519a6b..72e10d92 100644 --- a/src/Config.py +++ b/src/Config.py @@ -311,6 +311,8 @@ class Config: self.parser.add_argument('--download-optional', choices=["manual", "auto"], default="manual") + self.parser.add_argument('--lax-cert-check', action=argparse.BooleanOptionalAction, default=True, help="Enabling lax cert check allows users getting site writing priviledges by employing compromized (i.e. with leaked private keys) cert issuer. Disable for spam protection") + self.parser.add_argument('--tor', help='enable: Use only for Tor peers, always: Use Tor for every connection', choices=["disable", "enable", "always"], default='enable') self.parser.add_argument('--tor-controller', help='Tor controller address', metavar='ip:port', default='127.0.0.1:9051') self.parser.add_argument('--tor-proxy', help='Tor proxy address', metavar='ip:port', default='127.0.0.1:9050') diff --git a/src/Content/ContentManager.py b/src/Content/ContentManager.py index a06ba523..938f3d6f 100644 --- a/src/Content/ContentManager.py +++ b/src/Content/ContentManager.py @@ -29,7 +29,17 @@ class SignError(Exception): @PluginManager.acceptPlugins -class ContentManager(object): +class ContentManager: + """Manage site content verifying and other related stuff""" + + def loadBadCerts(): + try: + with open(f'{config.data_dir}/badcerts.json') as f: + return set(json.load(f)) + except FileNotFoundError: + return set() + + bad_certs = loadBadCerts() def __init__(self, site): self.site = site @@ -38,6 +48,13 @@ class ContentManager(object): self.hashfield = PeerHashfield() self.has_optional_files = False + def addBadCert(self, sign): + addr = CryptBitcoin.get_sign_address_64('compromised', sign) + if addr: + self.bad_certs.add(addr) + with open(f'{config.data_dir}/badcerts.json', 'w') as f: + json.dump(list(self.bad_certs), f) + # Load all content.json files def loadContents(self): if len(self.contents) == 0: @@ -478,6 +495,9 @@ class ContentManager(object): return self.getUserContentRules(parent_content, inner_path, content) return False + def isGoodCert(self, cert): + return cert not in self.bad_certs + # Get rules for a user file # Return: The rules of the file or False if not allowed def getUserContentRules(self, parent_content, inner_path, content): @@ -511,7 +531,20 @@ class ContentManager(object): banned = False if "signers" in rules: rules["signers"] = rules["signers"][:] # Make copy of the signers - for permission_pattern, permission_rules in list(user_contents["permission_rules"].items()): # Regexp rules + + if content is not None: + name, domain = content['cert_user_id'].rsplit('@', 1) + cert_addresses = parent_content['user_contents']['cert_signers'].get(domain) + else: + cert_addresses = None + # to prevent spam, if cert is compromised, only allow personal rules + if config.lax_cert_check or cert_addresses is None or self.isGoodCert(cert_addresses[0]): + permission_rules = user_contents.get('permission_rules', {}).items() + if not self.isGoodCert(cert_addresses[0]): + self.log.warning('Accepting compromised certificate! This may lead to spam attack. Turn off with --no_lax_cert_check. Default behaviour may change in the future.') + else: + permission_rules = {} + for permission_pattern, permission_rules in permission_rules: # Regexp rules if not SafeRe.match(permission_pattern, user_urn): continue # Rule is not valid for user if permission_rules is None: @@ -892,9 +925,9 @@ class ContentManager(object): raise VerifyError("No rules") # Check include size limit - if rules.get("max_size") is not None: # Include size limit - if content_size > rules["max_size"]: - raise VerifyError("Include too large %sB > %sB" % (content_size, rules["max_size"])) + max_size = rules.get("max_size", 0) + if content_size > max_size: + raise VerifyError(f'Include too large {content_size}B > {max_size}B') if rules.get("max_size_optional") is not None: # Include optional files limit if content_size_optional > rules["max_size_optional"]: diff --git a/src/Crypt/CryptBitcoin.py b/src/Crypt/CryptBitcoin.py index a0807187..25377f5d 100644 --- a/src/Crypt/CryptBitcoin.py +++ b/src/Crypt/CryptBitcoin.py @@ -4,6 +4,9 @@ import binascii import time import hashlib +from collections.abc import Container +from typing import Optional + from util.Electrum import dbl_format from Config import config @@ -69,7 +72,8 @@ def privatekeyToAddress(privatekey): # Return address from private key return False -def sign(data, privatekey): # Return sign to data using private key +def sign(data: str, privatekey: str) -> str: + """Sign data with privatekey, return base64 string signature""" if privatekey.startswith("23") and len(privatekey) > 52: return None # Old style private key not supported return base64.b64encode(sslcurve.sign( @@ -79,13 +83,13 @@ def sign(data, privatekey): # Return sign to data using private key hash=dbl_format )).decode() - -def verify(data, valid_address, sign, lib_verify=None): # Verify data using address and sign +def get_sign_address_64(data: str, sign: str, lib_verify=None) -> Optional[str]: + """Returns pubkey/address of signer if any""" if not lib_verify: lib_verify = lib_verify_best if not sign: - return False + return None if lib_verify == "libsecp256k1": sign_address = libsecp256k1message.recover_address(data.encode("utf8"), sign).decode("utf8") @@ -95,10 +99,23 @@ def verify(data, valid_address, sign, lib_verify=None): # Verify data using add else: raise Exception("No library enabled for signature verification") - if type(valid_address) is list: # Any address in the list - return sign_address in valid_address - else: # One possible address - return sign_address == valid_address + return sign_address + +def verify(*args, **kwargs): + """Default verify, see verify64""" + return verify64(*args, **kwargs) + +def verify64(data: str, addresses: str | Container[str], sign: str, lib_verify=None) -> bool: + """Verify that sign is a valid signature for data by one of addresses + + Expecting signature to be in base64 + """ + sign_address = get_sign_address_64(data, sign, lib_verify) + + if isinstance(addresses, str): + return sign_address == addresses + else: + return sign_address in addresses def isValidAddress(addr): '''Check if provided address is valid bitcoin address''' diff --git a/src/Ui/UiWebsocket.py b/src/Ui/UiWebsocket.py index 086e4444..a87abd62 100644 --- a/src/Ui/UiWebsocket.py +++ b/src/Ui/UiWebsocket.py @@ -407,6 +407,9 @@ class UiWebsocket(object): def actionSiteBadFiles(self, to): return list(self.site.bad_files.keys()) + def actionBadCert(self, to, sign): + self.site.content_manager.addBadCert(sign) + # Join to an event channel def actionChannelJoin(self, to, channels): if type(channels) != list: