Add option to ignore compromised certificates

This would allow to stop spam attacks without pausing sites that allow
compromised certificates and without action from site owners. However,
without mass adoption of adding individual permissions on sites for
valid users of such certificates, this will also block updates from
such users.

In any case this is more of a temporary measure in case we face such
an attack before the whole user id issue is resolved
This commit is contained in:
caryoscelus 2023-12-22 12:59:04 +00:00
parent ee0829d95e
commit 31d94a16b6
No known key found for this signature in database
GPG key ID: 254EDDB85B66CB1F
3 changed files with 43 additions and 5 deletions

View file

@ -297,6 +297,8 @@ class Config(object):
self.parser.add_argument('--download-optional', choices=["manual", "auto"], default="manual") self.parser.add_argument('--download-optional', choices=["manual", "auto"], default="manual")
self.parser.add_argument('--lax-cert-check', action=argparse.BooleanOptionalAction, default=True, help="Enabling lax cert check allows users getting site writing priviledges by employing compromized (i.e. with leaked private keys) cert issuer. Disable for spam protection")
self.parser.add_argument('--tor', help='enable: Use only for Tor peers, always: Use Tor for every connection', choices=["disable", "enable", "always"], default='enable') self.parser.add_argument('--tor', help='enable: Use only for Tor peers, always: Use Tor for every connection', choices=["disable", "enable", "always"], default='enable')
self.parser.add_argument('--tor-controller', help='Tor controller address', metavar='ip:port', default='127.0.0.1:9051') self.parser.add_argument('--tor-controller', help='Tor controller address', metavar='ip:port', default='127.0.0.1:9051')
self.parser.add_argument('--tor-proxy', help='Tor proxy address', metavar='ip:port', default='127.0.0.1:9050') self.parser.add_argument('--tor-proxy', help='Tor proxy address', metavar='ip:port', default='127.0.0.1:9050')

View file

@ -29,7 +29,17 @@ class SignError(Exception):
@PluginManager.acceptPlugins @PluginManager.acceptPlugins
class ContentManager(object): class ContentManager:
"""Manage site content verifying and other related stuff"""
def loadBadCerts():
try:
with open(f'{config.data_dir}/badcerts.json') as f:
return set(json.load(f))
except FileNotFoundError:
return set()
bad_certs = loadBadCerts()
def __init__(self, site): def __init__(self, site):
self.site = site self.site = site
@ -38,6 +48,13 @@ class ContentManager(object):
self.hashfield = PeerHashfield() self.hashfield = PeerHashfield()
self.has_optional_files = False self.has_optional_files = False
def addBadCert(self, sign):
addr = CryptBitcoin.get_sign_address_64('compromised', sign)
if addr:
self.bad_certs.add(addr)
with open(f'{config.data_dir}/badcerts.json', 'w') as f:
json.dump(list(self.bad_certs), f)
# Load all content.json files # Load all content.json files
def loadContents(self): def loadContents(self):
if len(self.contents) == 0: if len(self.contents) == 0:
@ -478,6 +495,9 @@ class ContentManager(object):
return self.getUserContentRules(parent_content, inner_path, content) return self.getUserContentRules(parent_content, inner_path, content)
return False return False
def isGoodCert(self, cert):
return cert not in self.bad_certs
# Get rules for a user file # Get rules for a user file
# Return: The rules of the file or False if not allowed # Return: The rules of the file or False if not allowed
def getUserContentRules(self, parent_content, inner_path, content): def getUserContentRules(self, parent_content, inner_path, content):
@ -511,7 +531,20 @@ class ContentManager(object):
banned = False banned = False
if "signers" in rules: if "signers" in rules:
rules["signers"] = rules["signers"][:] # Make copy of the signers rules["signers"] = rules["signers"][:] # Make copy of the signers
for permission_pattern, permission_rules in list(user_contents["permission_rules"].items()): # Regexp rules
if content is not None:
name, domain = content['cert_user_id'].rsplit('@', 1)
cert_addresses = parent_content['user_contents']['cert_signers'].get(domain)
else:
cert_addresses = None
# to prevent spam, if cert is compromised, only allow personal rules
if config.lax_cert_check or cert_addresses is None or self.isGoodCert(cert_addresses[0]):
permission_rules = user_contents.get('permission_rules', {}).items()
if not self.isGoodCert(cert_addresses[0]):
self.log.warning('Accepting compromised certificate! This may lead to spam attack. Turn off with --no_lax_cert_check. Default behaviour may change in the future.')
else:
permission_rules = {}
for permission_pattern, permission_rules in permission_rules: # Regexp rules
if not SafeRe.match(permission_pattern, user_urn): if not SafeRe.match(permission_pattern, user_urn):
continue # Rule is not valid for user continue # Rule is not valid for user
if permission_rules is None: if permission_rules is None:
@ -892,9 +925,9 @@ class ContentManager(object):
raise VerifyError("No rules") raise VerifyError("No rules")
# Check include size limit # Check include size limit
if rules.get("max_size") is not None: # Include size limit max_size = rules.get("max_size", 0)
if content_size > rules["max_size"]: if content_size > max_size:
raise VerifyError("Include too large %sB > %sB" % (content_size, rules["max_size"])) raise VerifyError(f'Include too large {content_size}B > {max_size}B')
if rules.get("max_size_optional") is not None: # Include optional files limit if rules.get("max_size_optional") is not None: # Include optional files limit
if content_size_optional > rules["max_size_optional"]: if content_size_optional > rules["max_size_optional"]:

View file

@ -405,6 +405,9 @@ class UiWebsocket(object):
def actionSiteBadFiles(self, to): def actionSiteBadFiles(self, to):
return list(self.site.bad_files.keys()) return list(self.site.bad_files.keys())
def actionBadCert(self, to, sign):
self.site.content_manager.addBadCert(sign)
# Join to an event channel # Join to an event channel
def actionChannelJoin(self, to, channels): def actionChannelJoin(self, to, channels):
if type(channels) != list: if type(channels) != list: