Merge pull request #254 from caryoscelus/filter-cert

Optional ignoring of compromised id certificates
This commit is contained in:
caryoscelus 2024-05-08 23:53:39 +00:00 committed by GitHub
commit e9eec9bb27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 69 additions and 13 deletions

View file

@ -10,6 +10,7 @@
- fix chromium compatibility (@caryoscelus)
- better fix of local sites leak (@caryoscelus)
- ipython-based repl via --repl for debug/interactive development (@caryoscelus)
- optional blocking of compromised id certificates for spam protection (@caryoscelus)
- various improvements
### zeronet-conservancy 0.7.10 (2023-07-26) (18d35d3bed4f0683e99)

View file

@ -311,6 +311,8 @@ class Config:
self.parser.add_argument('--download-optional', choices=["manual", "auto"], default="manual")
self.parser.add_argument('--lax-cert-check', action=argparse.BooleanOptionalAction, default=True, help="Enabling lax cert check allows users getting site writing priviledges by employing compromized (i.e. with leaked private keys) cert issuer. Disable for spam protection")
self.parser.add_argument('--tor', help='enable: Use only for Tor peers, always: Use Tor for every connection', choices=["disable", "enable", "always"], default='enable')
self.parser.add_argument('--tor-controller', help='Tor controller address', metavar='ip:port', default='127.0.0.1:9051')
self.parser.add_argument('--tor-proxy', help='Tor proxy address', metavar='ip:port', default='127.0.0.1:9050')

View file

@ -29,7 +29,17 @@ class SignError(Exception):
@PluginManager.acceptPlugins
class ContentManager(object):
class ContentManager:
"""Manage site content verifying and other related stuff"""
def loadBadCerts():
try:
with open(f'{config.data_dir}/badcerts.json') as f:
return set(json.load(f))
except FileNotFoundError:
return set()
bad_certs = loadBadCerts()
def __init__(self, site):
self.site = site
@ -38,6 +48,13 @@ class ContentManager(object):
self.hashfield = PeerHashfield()
self.has_optional_files = False
def addBadCert(self, sign):
addr = CryptBitcoin.get_sign_address_64('compromised', sign)
if addr:
self.bad_certs.add(addr)
with open(f'{config.data_dir}/badcerts.json', 'w') as f:
json.dump(list(self.bad_certs), f)
# Load all content.json files
def loadContents(self):
if len(self.contents) == 0:
@ -478,6 +495,9 @@ class ContentManager(object):
return self.getUserContentRules(parent_content, inner_path, content)
return False
def isGoodCert(self, cert):
return cert not in self.bad_certs
# Get rules for a user file
# Return: The rules of the file or False if not allowed
def getUserContentRules(self, parent_content, inner_path, content):
@ -511,7 +531,20 @@ class ContentManager(object):
banned = False
if "signers" in rules:
rules["signers"] = rules["signers"][:] # Make copy of the signers
for permission_pattern, permission_rules in list(user_contents["permission_rules"].items()): # Regexp rules
if content is not None:
name, domain = content['cert_user_id'].rsplit('@', 1)
cert_addresses = parent_content['user_contents']['cert_signers'].get(domain)
else:
cert_addresses = None
# to prevent spam, if cert is compromised, only allow personal rules
if config.lax_cert_check or cert_addresses is None or self.isGoodCert(cert_addresses[0]):
permission_rules = user_contents.get('permission_rules', {}).items()
if not self.isGoodCert(cert_addresses[0]):
self.log.warning('Accepting compromised certificate! This may lead to spam attack. Turn off with --no_lax_cert_check. Default behaviour may change in the future.')
else:
permission_rules = {}
for permission_pattern, permission_rules in permission_rules: # Regexp rules
if not SafeRe.match(permission_pattern, user_urn):
continue # Rule is not valid for user
if permission_rules is None:
@ -892,9 +925,9 @@ class ContentManager(object):
raise VerifyError("No rules")
# Check include size limit
if rules.get("max_size") is not None: # Include size limit
if content_size > rules["max_size"]:
raise VerifyError("Include too large %sB > %sB" % (content_size, rules["max_size"]))
max_size = rules.get("max_size", 0)
if content_size > max_size:
raise VerifyError(f'Include too large {content_size}B > {max_size}B')
if rules.get("max_size_optional") is not None: # Include optional files limit
if content_size_optional > rules["max_size_optional"]:

View file

@ -4,6 +4,9 @@ import binascii
import time
import hashlib
from collections.abc import Container
from typing import Optional
from util.Electrum import dbl_format
from Config import config
@ -69,7 +72,8 @@ def privatekeyToAddress(privatekey): # Return address from private key
return False
def sign(data, privatekey): # Return sign to data using private key
def sign(data: str, privatekey: str) -> str:
"""Sign data with privatekey, return base64 string signature"""
if privatekey.startswith("23") and len(privatekey) > 52:
return None # Old style private key not supported
return base64.b64encode(sslcurve.sign(
@ -79,13 +83,13 @@ def sign(data, privatekey): # Return sign to data using private key
hash=dbl_format
)).decode()
def verify(data, valid_address, sign, lib_verify=None): # Verify data using address and sign
def get_sign_address_64(data: str, sign: str, lib_verify=None) -> Optional[str]:
"""Returns pubkey/address of signer if any"""
if not lib_verify:
lib_verify = lib_verify_best
if not sign:
return False
return None
if lib_verify == "libsecp256k1":
sign_address = libsecp256k1message.recover_address(data.encode("utf8"), sign).decode("utf8")
@ -95,10 +99,23 @@ def verify(data, valid_address, sign, lib_verify=None): # Verify data using add
else:
raise Exception("No library enabled for signature verification")
if type(valid_address) is list: # Any address in the list
return sign_address in valid_address
else: # One possible address
return sign_address == valid_address
return sign_address
def verify(*args, **kwargs):
"""Default verify, see verify64"""
return verify64(*args, **kwargs)
def verify64(data: str, addresses: str | Container[str], sign: str, lib_verify=None) -> bool:
"""Verify that sign is a valid signature for data by one of addresses
Expecting signature to be in base64
"""
sign_address = get_sign_address_64(data, sign, lib_verify)
if isinstance(addresses, str):
return sign_address == addresses
else:
return sign_address in addresses
def isValidAddress(addr):
'''Check if provided address is valid bitcoin address'''

View file

@ -407,6 +407,9 @@ class UiWebsocket(object):
def actionSiteBadFiles(self, to):
return list(self.site.bad_files.keys())
def actionBadCert(self, to, sign):
self.site.content_manager.addBadCert(sign)
# Join to an event channel
def actionChannelJoin(self, to, channels):
if type(channels) != list: