rev247, Fix content validation error caused by ContentManager refactor, Refactor unittests make use of testdata and format with PEP8
This commit is contained in:
parent
1f53212d62
commit
f63b711972
13 changed files with 508 additions and 488 deletions
|
@ -4,7 +4,7 @@ import ConfigParser
|
|||
class Config(object):
|
||||
def __init__(self):
|
||||
self.version = "0.3.1"
|
||||
self.rev = 246
|
||||
self.rev = 247
|
||||
self.parser = self.createArguments()
|
||||
argv = sys.argv[:] # Copy command line arguments
|
||||
argv = self.parseConfig(argv) # Add arguments from config file
|
||||
|
|
|
@ -165,7 +165,7 @@ class ContentManager(object):
|
|||
user_urn = "%s/%s" % (content["cert_auth_type"], content["cert_user_id"]) # web/nofish@zeroid.bit
|
||||
|
||||
rules = copy.copy(user_contents["permissions"].get(content["cert_user_id"], {})) # Default rules by username
|
||||
if not rules:
|
||||
if rules == False:
|
||||
return False # User banned
|
||||
if "signers" in rules:
|
||||
rules["signers"] = rules["signers"][:] # Make copy of the signers
|
||||
|
|
794
src/Test/test.py
794
src/Test/test.py
|
@ -1,394 +1,416 @@
|
|||
import sys, os, unittest, urllib, time
|
||||
sys.path.append(os.path.abspath("src")) # Imports relative to src dir
|
||||
import sys
|
||||
import os
|
||||
import unittest
|
||||
import urllib
|
||||
import time
|
||||
sys.path.append(os.path.abspath("src")) # Imports relative to src dir
|
||||
|
||||
from Config import config
|
||||
config.data_dir = "src/Test/testdata" # Use test data for unittests
|
||||
config.data_dir = "src/Test/testdata" # Use test data for unittests
|
||||
|
||||
from Crypt import CryptBitcoin
|
||||
from Ui import UiRequest
|
||||
|
||||
|
||||
class TestCase(unittest.TestCase):
|
||||
|
||||
def testMediaRoute(self):
|
||||
try:
|
||||
urllib.urlopen("http://127.0.0.1:43110").read()
|
||||
except Exception, err:
|
||||
raise unittest.SkipTest(err)
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/./sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/../config.py").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read())
|
||||
|
||||
|
||||
def testBitcoinSignOld(self):
|
||||
s = time.time()
|
||||
privatekey = "23DKQpDz7bXM7w5KN5Wnmz7bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
privatekey_bad = "23DKQpDz7bXM7w5KN5Wnmz6bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez")
|
||||
|
||||
sign = CryptBitcoin.signOld("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.signOld("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time()-s),
|
||||
|
||||
|
||||
def testBitcoinSign(self):
|
||||
s = time.time()
|
||||
privatekey = "5K9S6dVpufGnroRgFrT6wsKiz2mJRYsC73eWDmajaHserAp3F1C"
|
||||
privatekey_bad = "5Jbm9rrusXyApAoM8YoM4Rja337zMMoBUMRJ1uijiguU2aZRnwC"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz")
|
||||
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time()-s),
|
||||
|
||||
|
||||
|
||||
def testBitcoinSignCompressed(self):
|
||||
raise unittest.SkipTest("Not supported yet")
|
||||
s = time.time()
|
||||
privatekey = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKbZAgWdpFo1UETZSKdgHaNN2J"
|
||||
privatekey_bad = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKsZAgWdpFo1UETZSKdgHaNN2J"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f")
|
||||
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
print sign
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time()-s),
|
||||
|
||||
|
||||
def testTrackers(self):
|
||||
raise unittest.SkipTest("Notyet")
|
||||
from Site import SiteManager
|
||||
from lib.subtl.subtl import UdpTrackerClient
|
||||
import hashlib
|
||||
|
||||
ok = 0
|
||||
for protocol, ip, port in SiteManager.TRACKERS:
|
||||
address = "test"
|
||||
if protocol == "udp":
|
||||
tracker = UdpTrackerClient(ip, port)
|
||||
peers = None
|
||||
try:
|
||||
tracker.connect()
|
||||
tracker.poll_once()
|
||||
tracker.announce(info_hash=hashlib.sha1(address).hexdigest(), num_want=5)
|
||||
back = tracker.poll_once()
|
||||
peers = back["response"]["peers"]
|
||||
except Exception, err:
|
||||
peers = None
|
||||
print "Tracker error: %s://%s:%s %s" % (protocol, ip, port, err)
|
||||
if peers != None:
|
||||
ok += 1
|
||||
|
||||
self.assertEqual(ok, len(SiteManager.TRACKERS))
|
||||
|
||||
|
||||
def testDb(self):
|
||||
print "Importing db..."
|
||||
from Db import Db
|
||||
for db_path in [os.path.abspath("%s/test/zeronet.db" % config.data_dir), "%s/test/zeronet.db" % config.data_dir]:
|
||||
print "Creating db using %s..." % db_path,
|
||||
schema = {
|
||||
"db_name": "TestDb",
|
||||
"db_file": "%s/test/zeronet.db" % config.data_dir,
|
||||
"map": {
|
||||
"data.json": {
|
||||
"to_table": {
|
||||
"test": "test"
|
||||
}
|
||||
}
|
||||
},
|
||||
"tables": {
|
||||
"test": {
|
||||
"cols": [
|
||||
["test_id", "INTEGER"],
|
||||
["title", "TEXT"],
|
||||
],
|
||||
"indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"],
|
||||
"schema_changed": 1426195822
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if os.path.isfile("%s/test/zeronet.db" % config.data_dir): os.unlink("%s/test/zeronet.db" % config.data_dir)
|
||||
db = Db(schema, "%s/test/zeronet.db" % config.data_dir)
|
||||
db.checkTables()
|
||||
db.close()
|
||||
|
||||
# Cleanup
|
||||
os.unlink("%s/test/zeronet.db" % config.data_dir)
|
||||
os.rmdir("%s/test/" % config.data_dir)
|
||||
|
||||
|
||||
def testContentManagerIncludes(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
|
||||
site = Site("1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ")
|
||||
# Include info
|
||||
rules = site.content_manager.getRules("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json")
|
||||
self.assertEqual(rules["signers"], ['1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB'])
|
||||
self.assertEqual(rules["user_name"], 'testuser4')
|
||||
self.assertEqual(rules["max_size"], 10000)
|
||||
self.assertEqual(rules["includes_allowed"], False)
|
||||
self.assertEqual(rules["files_allowed"], 'data.json')
|
||||
# Valid signers
|
||||
self.assertEqual(
|
||||
site.content_manager.getValidSigners("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json"),
|
||||
['1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB', '1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ']
|
||||
)
|
||||
self.assertEqual(site.content_manager.getValidSigners("data/content.json"), ['1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ'])
|
||||
self.assertEqual(site.content_manager.getValidSigners("content.json"), ['1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ'])
|
||||
|
||||
# Data validation
|
||||
data_dict = {
|
||||
"files": {
|
||||
"data.json": {
|
||||
"sha512": "be589f313e7b2d8b9b41280e603e8ba72c3f74d3cfdb771a7c418a0a64598035",
|
||||
"size": 216
|
||||
}
|
||||
},
|
||||
"modified": 1428591454.423,
|
||||
"signs": {
|
||||
"1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB": "HM1sv686/aIdgqyFF2t0NmZY5pv1TALo6H0zOmOJ63VOnNg2LSCpbuubb+IcHTUIJq3StUDo6okczJDeowyjOUo="
|
||||
}
|
||||
}
|
||||
# Normal data
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), True)
|
||||
# Too large
|
||||
data_dict["files"]["data.json"]["size"] = 200000
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), False)
|
||||
data_dict["files"]["data.json"]["size"] = 216 # Reset
|
||||
# Not allowed file
|
||||
data_dict["files"]["data.html"] = data_dict["files"]["data.json"]
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), False)
|
||||
del data_dict["files"]["data.html"] # Reset
|
||||
# Should work again
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/users/1BhcaqWViN1YBnNgXb5aq5NtEhKtKdKZMB/content.json", data, ignore_same=False), True)
|
||||
|
||||
|
||||
def testUserContentRules(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
|
||||
site = Site("1Hb9rY98TNnA6TYeozJv4w36bqEiBn6x8Y")
|
||||
user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
|
||||
# File info for not exist file
|
||||
self.assertEqual(site.content_manager.getFileInfo("data/users/notexist/data.json")["content_inner_path"], "data/users/notexist/content.json")
|
||||
self.assertEqual(site.content_manager.getValidSigners("data/users/notexist/data.json"), ["notexist", "1Hb9rY98TNnA6TYeozJv4w36bqEiBn6x8Y"])
|
||||
|
||||
# File info for exsitsing file
|
||||
file_info = site.content_manager.getFileInfo("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/data.json")
|
||||
valid_signers = site.content_manager.getValidSigners(file_info["content_inner_path"], user_content)
|
||||
self.assertEqual(valid_signers, ['14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet', '1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C', '1Hb9rY98TNnA6TYeozJv4w36bqEiBn6x8Y'])
|
||||
|
||||
# Known user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "nofish@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 100000)
|
||||
|
||||
# Unknown user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 10000)
|
||||
|
||||
# User with more size limit by auth type
|
||||
user_content["cert_auth_type"] = "bitmsg"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 20000)
|
||||
|
||||
# Banned user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "bad@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertFalse(rules)
|
||||
|
||||
|
||||
def testUserContentCert(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C"
|
||||
user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
|
||||
cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet"
|
||||
cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"
|
||||
|
||||
site = Site("1Hb9rY98TNnA6TYeozJv4w36bqEiBn6x8Y")
|
||||
#user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
# site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"] = user_content # Add to content manager
|
||||
# Check if the user file is loaded
|
||||
self.assertTrue("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents)
|
||||
user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
|
||||
cert_content = site.content_manager.contents["data/users/content.json"]
|
||||
# Override cert signer
|
||||
cert_content["user_contents"]["cert_signers"]["zeroid.bit"] = ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]
|
||||
|
||||
|
||||
# Valid cert providers
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertEqual(rules["cert_signers"], {"zeroid.bit": ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]} )
|
||||
|
||||
# Add cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"].split("@")[0]), cert_priv
|
||||
)
|
||||
|
||||
# Verify cert
|
||||
self.assertTrue(site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content))
|
||||
self.assertFalse(site.content_manager.verifyCert("data/users/badaddress/content.json", user_content))
|
||||
|
||||
|
||||
# Sign user content
|
||||
#signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False)
|
||||
signed_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
|
||||
# Test user cert
|
||||
self.assertTrue(site.content_manager.verifyFile("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", StringIO(json.dumps(signed_content)), ignore_same=False))
|
||||
|
||||
# Test banned user
|
||||
site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][user_content["cert_user_id"]] = False
|
||||
self.assertFalse(site.content_manager.verifyFile("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", StringIO(json.dumps(signed_content)), ignore_same=False))
|
||||
|
||||
# Test invalid cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv
|
||||
)
|
||||
signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False)
|
||||
self.assertFalse(site.content_manager.verifyFile("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", StringIO(json.dumps(signed_content)), ignore_same=False))
|
||||
|
||||
|
||||
def testClone(self):
|
||||
from Site import Site
|
||||
from Site import SiteManager
|
||||
from User import UserManager
|
||||
import shutil
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") # Privatekey: 5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv
|
||||
self.assertEqual(site.storage.directory, "src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
|
||||
# Remove old files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
self.assertFalse(os.path.isfile("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL/content.json"))
|
||||
|
||||
# Clone 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT to 15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc
|
||||
new_site = site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL", "5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee", address_index=1) # Privatekey: 5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee
|
||||
|
||||
# Check if clone was successful
|
||||
self.assertEqual(new_site.address, "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
self.assertTrue(new_site.storage.isFile("content.json"))
|
||||
self.assertTrue(new_site.storage.isFile("index.html"))
|
||||
self.assertTrue(new_site.storage.isFile("data/users/content.json"))
|
||||
self.assertTrue(new_site.storage.isFile("data/zeroblog.db"))
|
||||
|
||||
|
||||
# Test re-cloning (updating)
|
||||
|
||||
# Changes in non-data files should be overwritten
|
||||
new_site.storage.write("index.html", "this will be overwritten")
|
||||
self.assertEqual(new_site.storage.read("index.html"), "this will be overwritten")
|
||||
|
||||
# Changes in data file should be kept after re-cloning
|
||||
changed_contentjson = new_site.storage.loadJson("content.json")
|
||||
changed_contentjson["description"] = "Update Description Test"
|
||||
new_site.storage.writeJson("content.json", changed_contentjson)
|
||||
|
||||
changed_data = new_site.storage.loadJson("data/data.json")
|
||||
changed_data["title"] = "UpdateTest"
|
||||
new_site.storage.writeJson("data/data.json", changed_data)
|
||||
|
||||
# Re-clone the site
|
||||
site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
|
||||
self.assertEqual(new_site.storage.loadJson("data/data.json")["title"], "UpdateTest")
|
||||
self.assertEqual(new_site.storage.loadJson("content.json")["description"], "Update Description Test")
|
||||
self.assertNotEqual(new_site.storage.read("index.html"), "this will be overwritten")
|
||||
|
||||
|
||||
def testUserNewsite(self):
|
||||
from User import UserManager
|
||||
user = UserManager.user_manager.get()
|
||||
user.sites = {} # Reset user data
|
||||
self.assertEqual(user.master_address, "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc")
|
||||
self.assertEqual(user.getAddressAuthIndex("15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc"), 1458664252141532163166741013621928587528255888800826689784628722366466547364755811L)
|
||||
|
||||
address, address_index, site_data = user.getNewSiteData()
|
||||
self.assertEqual(CryptBitcoin.hdPrivatekey(user.master_seed, address_index), site_data["privatekey"]) # Re-generate privatekey based on address_index
|
||||
|
||||
user.sites = {} # Reset user data
|
||||
|
||||
self.assertNotEqual(user.getSiteData(address)["auth_address"], address) # Site address and auth address is different
|
||||
self.assertEqual(user.getSiteData(address)["auth_privatekey"], site_data["auth_privatekey"]) # Re-generate auth_privatekey for site
|
||||
|
||||
|
||||
|
||||
def testSslCert(self):
|
||||
from Crypt import CryptConnection
|
||||
# Remove old certs
|
||||
if os.path.isfile("%s/cert-rsa.pem" % config.data_dir): os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
if os.path.isfile("%s/key-rsa.pem" % config.data_dir): os.unlink("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
CryptConnection.manager.loadCerts()
|
||||
|
||||
self.assertIn("tls-rsa", CryptConnection.manager.crypt_supported)
|
||||
self.assertEqual(CryptConnection.manager.selectCrypt(["tls-rsa", "unknown"]), "tls-rsa")
|
||||
self.assertTrue(os.path.isfile("%s/cert-rsa.pem" % config.data_dir))
|
||||
self.assertTrue(os.path.isfile("%s/key-rsa.pem" % config.data_dir))
|
||||
|
||||
|
||||
|
||||
|
||||
def setUp(self):
|
||||
if not os.path.isfile("%s/sites.json" % config.data_dir):
|
||||
open("%s/sites.json" % config.data_dir, "w").write("{}")
|
||||
if not os.path.isfile("%s/users.json" % config.data_dir):
|
||||
open("%s/users.json" % config.data_dir, "w").write("""
|
||||
{
|
||||
"15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": {
|
||||
"certs": {},
|
||||
"master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a",
|
||||
"sites": {}
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.isfile("src/Test/testdata/users.json"):
|
||||
os.unlink("src/Test/testdata/users.json")
|
||||
if os.path.isfile("src/Test/testdata/sites.json"):
|
||||
os.unlink("src/Test/testdata/sites.json")
|
||||
|
||||
def testMediaRoute(self):
|
||||
try:
|
||||
urllib.urlopen("http://127.0.0.1:43110").read()
|
||||
except Exception, err:
|
||||
raise unittest.SkipTest(err)
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/./sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/../config.py").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read())
|
||||
|
||||
def testBitcoinSignOld(self):
|
||||
s = time.time()
|
||||
privatekey = "23DKQpDz7bXM7w5KN5Wnmz7bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
privatekey_bad = "23DKQpDz7bXM7w5KN5Wnmz6bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez")
|
||||
|
||||
sign = CryptBitcoin.signOld("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.signOld("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time() - s),
|
||||
|
||||
def testBitcoinSign(self):
|
||||
s = time.time()
|
||||
privatekey = "5K9S6dVpufGnroRgFrT6wsKiz2mJRYsC73eWDmajaHserAp3F1C"
|
||||
privatekey_bad = "5Jbm9rrusXyApAoM8YoM4Rja337zMMoBUMRJ1uijiguU2aZRnwC"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz")
|
||||
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time() - s),
|
||||
|
||||
def testBitcoinSignCompressed(self):
|
||||
raise unittest.SkipTest("Not supported yet")
|
||||
s = time.time()
|
||||
privatekey = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKbZAgWdpFo1UETZSKdgHaNN2J"
|
||||
privatekey_bad = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKsZAgWdpFo1UETZSKdgHaNN2J"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f")
|
||||
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time() - s),
|
||||
|
||||
def testTrackers(self):
|
||||
raise unittest.SkipTest("Notyet")
|
||||
from Site import SiteManager
|
||||
from lib.subtl.subtl import UdpTrackerClient
|
||||
import hashlib
|
||||
|
||||
ok = 0
|
||||
for protocol, ip, port in SiteManager.TRACKERS:
|
||||
address = "test"
|
||||
if protocol == "udp":
|
||||
tracker = UdpTrackerClient(ip, port)
|
||||
peers = None
|
||||
try:
|
||||
tracker.connect()
|
||||
tracker.poll_once()
|
||||
tracker.announce(info_hash=hashlib.sha1(address).hexdigest(), num_want=5)
|
||||
back = tracker.poll_once()
|
||||
peers = back["response"]["peers"]
|
||||
except Exception, err:
|
||||
peers = None
|
||||
print "Tracker error: %s://%s:%s %s" % (protocol, ip, port, err)
|
||||
if peers is not None:
|
||||
ok += 1
|
||||
|
||||
self.assertEqual(ok, len(SiteManager.TRACKERS))
|
||||
|
||||
def testDb(self):
|
||||
from Db import Db
|
||||
for db_path in [os.path.abspath("%s/test/zeronet.db" % config.data_dir), "%s/test/zeronet.db" % config.data_dir]:
|
||||
print "Creating db using %s..." % db_path,
|
||||
schema = {
|
||||
"db_name": "TestDb",
|
||||
"db_file": "%s/test/zeronet.db" % config.data_dir,
|
||||
"map": {
|
||||
"data.json": {
|
||||
"to_table": {
|
||||
"test": "test"
|
||||
}
|
||||
}
|
||||
},
|
||||
"tables": {
|
||||
"test": {
|
||||
"cols": [
|
||||
["test_id", "INTEGER"],
|
||||
["title", "TEXT"],
|
||||
],
|
||||
"indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"],
|
||||
"schema_changed": 1426195822
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if os.path.isfile("%s/test/zeronet.db" % config.data_dir):
|
||||
os.unlink("%s/test/zeronet.db" % config.data_dir)
|
||||
db = Db(schema, "%s/test/zeronet.db" % config.data_dir)
|
||||
db.checkTables()
|
||||
db.close()
|
||||
|
||||
# Cleanup
|
||||
os.unlink("%s/test/zeronet.db" % config.data_dir)
|
||||
os.rmdir("%s/test/" % config.data_dir)
|
||||
|
||||
def testContentManagerIncludes(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
# Include info
|
||||
rules = site.content_manager.getRules("data/test_include/content.json")
|
||||
self.assertEqual(rules["signers"], ['15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo'])
|
||||
self.assertEqual(rules["user_name"], 'test')
|
||||
self.assertEqual(rules["max_size"], 20000)
|
||||
self.assertEqual(rules["includes_allowed"], False)
|
||||
self.assertEqual(rules["files_allowed"], 'data.json')
|
||||
# Valid signers
|
||||
self.assertEqual(
|
||||
site.content_manager.getValidSigners("data/test_include/content.json"),
|
||||
['15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT']
|
||||
)
|
||||
self.assertEqual(
|
||||
site.content_manager.getValidSigners("data/users/content.json"),
|
||||
['1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT']
|
||||
)
|
||||
self.assertEqual(site.content_manager.getValidSigners("content.json"), ['1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT'])
|
||||
|
||||
# Data validation
|
||||
data_dict = {
|
||||
"files": {
|
||||
"data.json": {
|
||||
"sha512": "369d4e780cc80504285f13774ca327fe725eed2d813aad229e62356b07365906",
|
||||
"size": 505
|
||||
}
|
||||
},
|
||||
"modified": 1431451896.656619,
|
||||
"signs": {
|
||||
"15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo": "G2QC+ZIozPQQ/XiOEOMzfekOP8ipi+rKaTy/R/3MnDf98mLIhSSA8927FW6D/ZyP7HHuII2y9d0zbAk+rr8ksQM="
|
||||
}
|
||||
}
|
||||
# Normal data
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), True)
|
||||
# Too large
|
||||
data_dict["files"]["data.json"]["size"] = 200000 # Emulate 2MB sized data.json
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), False)
|
||||
data_dict["files"]["data.json"]["size"] = 505 # Reset
|
||||
# Not allowed file
|
||||
data_dict["files"]["data.html"] = data_dict["files"]["data.json"]
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), False)
|
||||
del data_dict["files"]["data.html"] # Reset
|
||||
# Should work again
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), True)
|
||||
|
||||
def testUserContentRules(self):
|
||||
from Site import Site
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
user_content = site.storage.loadJson("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json")
|
||||
|
||||
# File info for not exist file
|
||||
self.assertEqual(site.content_manager.getFileInfo("data/users/notexist/data.json")["content_inner_path"], "data/users/notexist/content.json")
|
||||
self.assertEqual(site.content_manager.getValidSigners("data/users/notexist/data.json"), ["notexist", "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"])
|
||||
|
||||
# File info for exsitsing file
|
||||
file_info = site.content_manager.getFileInfo("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/data.json")
|
||||
valid_signers = site.content_manager.getValidSigners(file_info["content_inner_path"], user_content)
|
||||
self.assertEqual(valid_signers, ['14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet', '1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT'])
|
||||
|
||||
# Known user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "nofish@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 100000)
|
||||
|
||||
# Unknown user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 10000)
|
||||
|
||||
# User with more size limit by auth type
|
||||
user_content["cert_auth_type"] = "bitmsg"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 15000)
|
||||
|
||||
# Banned user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "bad@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertFalse(rules)
|
||||
|
||||
def testUserContentCert(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
# user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C"
|
||||
user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
|
||||
# cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet"
|
||||
cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
# user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
# site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"] = user_content # Add to content manager
|
||||
# Check if the user file is loaded
|
||||
self.assertTrue("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents)
|
||||
user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
|
||||
cert_content = site.content_manager.contents["data/users/content.json"]
|
||||
# Override cert signer
|
||||
cert_content["user_contents"]["cert_signers"]["zeroid.bit"] = ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]
|
||||
|
||||
# Valid cert providers
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertEqual(rules["cert_signers"], {"zeroid.bit": ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]})
|
||||
|
||||
# Add cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"].split("@")[0]), cert_priv
|
||||
)
|
||||
|
||||
# Verify cert
|
||||
self.assertTrue(site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content))
|
||||
self.assertFalse(site.content_manager.verifyCert("data/users/badaddress/content.json", user_content))
|
||||
|
||||
# Sign user content
|
||||
signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False)
|
||||
|
||||
# Test user cert
|
||||
self.assertTrue(site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
))
|
||||
|
||||
# Test banned user
|
||||
site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][user_content["cert_user_id"]] = False
|
||||
self.assertFalse(site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
))
|
||||
|
||||
# Test invalid cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv
|
||||
)
|
||||
signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False)
|
||||
self.assertFalse(site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
))
|
||||
|
||||
def testClone(self):
|
||||
from Site import Site
|
||||
import shutil
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") # Privatekey: 5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv
|
||||
self.assertEqual(site.storage.directory, "src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
|
||||
# Remove old files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
self.assertFalse(os.path.isfile("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL/content.json"))
|
||||
|
||||
# Clone 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT to 15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc
|
||||
new_site = site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL", "5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee", address_index=1)
|
||||
|
||||
# Check if clone was successful
|
||||
self.assertEqual(new_site.address, "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
self.assertTrue(new_site.storage.isFile("content.json"))
|
||||
self.assertTrue(new_site.storage.isFile("index.html"))
|
||||
self.assertTrue(new_site.storage.isFile("data/users/content.json"))
|
||||
self.assertTrue(new_site.storage.isFile("data/zeroblog.db"))
|
||||
|
||||
# Test re-cloning (updating)
|
||||
|
||||
# Changes in non-data files should be overwritten
|
||||
new_site.storage.write("index.html", "this will be overwritten")
|
||||
self.assertEqual(new_site.storage.read("index.html"), "this will be overwritten")
|
||||
|
||||
# Changes in data file should be kept after re-cloning
|
||||
changed_contentjson = new_site.storage.loadJson("content.json")
|
||||
changed_contentjson["description"] = "Update Description Test"
|
||||
new_site.storage.writeJson("content.json", changed_contentjson)
|
||||
|
||||
changed_data = new_site.storage.loadJson("data/data.json")
|
||||
changed_data["title"] = "UpdateTest"
|
||||
new_site.storage.writeJson("data/data.json", changed_data)
|
||||
|
||||
# Re-clone the site
|
||||
site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
|
||||
self.assertEqual(new_site.storage.loadJson("data/data.json")["title"], "UpdateTest")
|
||||
self.assertEqual(new_site.storage.loadJson("content.json")["description"], "Update Description Test")
|
||||
self.assertNotEqual(new_site.storage.read("index.html"), "this will be overwritten")
|
||||
|
||||
# Delete created files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
new_site.storage.closeDb()
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
|
||||
def testUserNewsite(self):
|
||||
from User import UserManager
|
||||
user = UserManager.user_manager.get()
|
||||
user.sites = {} # Reset user data
|
||||
self.assertEqual(user.master_address, "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc")
|
||||
self.assertEqual(
|
||||
user.getAddressAuthIndex("15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc"),
|
||||
1458664252141532163166741013621928587528255888800826689784628722366466547364755811L
|
||||
)
|
||||
|
||||
address, address_index, site_data = user.getNewSiteData()
|
||||
self.assertEqual(CryptBitcoin.hdPrivatekey(user.master_seed, address_index), site_data["privatekey"]) # Re-generate privatekey based on address_index
|
||||
|
||||
user.sites = {} # Reset user data
|
||||
|
||||
self.assertNotEqual(user.getSiteData(address)["auth_address"], address) # Site address and auth address is different
|
||||
self.assertEqual(user.getSiteData(address)["auth_privatekey"], site_data["auth_privatekey"]) # Re-generate auth_privatekey for site
|
||||
|
||||
def testSslCert(self):
|
||||
from Crypt import CryptConnection
|
||||
# Remove old certs
|
||||
if os.path.isfile("%s/cert-rsa.pem" % config.data_dir):
|
||||
os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
if os.path.isfile("%s/key-rsa.pem" % config.data_dir):
|
||||
os.unlink("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
CryptConnection.manager.loadCerts()
|
||||
|
||||
self.assertIn("tls-rsa", CryptConnection.manager.crypt_supported)
|
||||
self.assertEqual(CryptConnection.manager.selectCrypt(["tls-rsa", "unknown"]), "tls-rsa")
|
||||
self.assertTrue(os.path.isfile("%s/cert-rsa.pem" % config.data_dir))
|
||||
self.assertTrue(os.path.isfile("%s/key-rsa.pem" % config.data_dir))
|
||||
|
||||
# Remove created files
|
||||
os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
os.unlink("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import logging
|
||||
logging.getLogger().setLevel(level=logging.DEBUG)
|
||||
#unittest.main(verbosity=2)
|
||||
unittest.main(verbosity=2, defaultTest="TestCase.testClone")
|
||||
|
||||
import logging
|
||||
logging.getLogger().setLevel(level=logging.CRITICAL)
|
||||
unittest.main(verbosity=2)
|
||||
# unittest.main(verbosity=2, defaultTest="TestCase.testUserContentCert")
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"address": "1BLogC9LN4oPDcruNz3qo1ysa133E9AGg8",
|
||||
"address": "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT",
|
||||
"background-color": "white",
|
||||
"description": "Blogging platform Demo",
|
||||
"domain": "Blog.ZeroNetwork.bit",
|
||||
|
@ -88,13 +88,9 @@
|
|||
"sha512": "8a42b98962faea74618113166886be488c09dad10ca47fe97005edc5fb40cc00",
|
||||
"size": 723
|
||||
},
|
||||
"img/post/slides.png": {
|
||||
"sha512": "1933db3b90ab93465befa1bd0843babe38173975e306286e08151be9992f767e",
|
||||
"size": 14439
|
||||
},
|
||||
"index.html": {
|
||||
"sha512": "4989c09b0f8ebcb84200bf39ac387d0cb8387846f44c993c553f5638472dc9c6",
|
||||
"size": 4665
|
||||
"sha512": "c4039ebfc4cb6f116cac05e803a18644ed70404474a572f0d8473f4572f05df3",
|
||||
"size": 4667
|
||||
},
|
||||
"js/all.js": {
|
||||
"sha512": "034c97535f3c9b3fbebf2dcf61a38711dae762acf1a99168ae7ddc7e265f582c",
|
||||
|
@ -104,20 +100,30 @@
|
|||
"ignore": "((js|css)/(?!all.(js|css))|data/.*db|data/users/.*/.*)",
|
||||
"includes": {
|
||||
"data/users/content.json": {
|
||||
"signers": [],
|
||||
"signers": ["1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f"],
|
||||
"signers_required": 1
|
||||
},
|
||||
"data/test_include/content.json": {
|
||||
"added": 1424976057,
|
||||
"files_allowed": "data.json",
|
||||
"includes_allowed": false,
|
||||
"max_size": 20000,
|
||||
"signers": [ "15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo" ],
|
||||
"signers_required": 1,
|
||||
"user_id": 47,
|
||||
"user_name": "test"
|
||||
}
|
||||
},
|
||||
"modified": 1433033839.187,
|
||||
"modified": 1434801613.8,
|
||||
"sign": [
|
||||
23288026089939741768236374425021091801956985530578920099613839712827977534400,
|
||||
96520925254285126876579021588512602074631663615139311676453830936350975122022
|
||||
107584248894581661953399064048991976924739126704981340547735906786807630121376,
|
||||
14052922268999375798453683972186312380248481029778104103759595432459320456230
|
||||
],
|
||||
"signers_sign": "G7W/oNvczE5nPTFYVOqv8+GOpQd23LS/Dc1Q6xQ1NRDDHlYzmoSE63UQ7Za05kD0rwIYXbuUSr8z8p6RhZmnUs8=",
|
||||
"signers_sign": "HDNmWJHM2diYln4pkdL+qYOvgE7MdwayzeG+xEUZBgp1HtOjBJS+knDEVQsBkjcOPicDG2it1r6R1eQrmogqSP0=",
|
||||
"signs": {
|
||||
"1BLogC9LN4oPDcruNz3qo1ysa133E9AGg8": "G9i0Q/ZkhZ0H5O8ofEvXbEgGhwVJYbLxyxOwsGYxgtzIaBjiDg/HKe9l7nPRqDD3/bRG9oPH5kIQd4152vY3lI8="
|
||||
"1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": "HO9esgmZhPJqOG+fEpStwK+u5P/+4Kx5VGApNnbsyA0lBfEV6aWviIP6FBlP3sZSH/EMuoEw42lToRmLppbzmuM="
|
||||
},
|
||||
"signs_required": 1,
|
||||
"title": "ZeroBlog",
|
||||
"zeronet_version": "0.3.0"
|
||||
"zeronet_version": "0.3.1"
|
||||
}
|
12
src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT/data/test_include/content.json
vendored
Normal file
12
src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT/data/test_include/content.json
vendored
Normal file
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"files": {
|
||||
"data.json": {
|
||||
"sha512": "f6ea25af270ba6a67c90a053c34da8ba94e6cf2177d4ee7979fd517a31ca6479",
|
||||
"size": 74
|
||||
}
|
||||
},
|
||||
"modified": 1424976057.772182,
|
||||
"signs": {
|
||||
"1TaLk3zM7ZRskJvrh3ZNCDVGXvkJusPKQ": "G1Jy36d3LLu+Lh8ikGqOozyiYZ+NvF8QF1OdC6PfDt26bflPPQ0gwWw8AQdFmc/S3BMBDNt0tJshiiJcRK46j/c="
|
||||
}
|
||||
}
|
37
src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT/data/test_include/data.json
vendored
Normal file
37
src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT/data/test_include/data.json
vendored
Normal file
|
@ -0,0 +1,37 @@
|
|||
{
|
||||
"next_topic_id": 1,
|
||||
"topics": [],
|
||||
"next_message_id": 5,
|
||||
"comments": {
|
||||
"1@2": [
|
||||
{
|
||||
"comment_id": 1,
|
||||
"body": "New user test!",
|
||||
"added": 1423442049
|
||||
},
|
||||
{
|
||||
"comment_id": 2,
|
||||
"body": "test 321",
|
||||
"added": 1423531445
|
||||
},
|
||||
{
|
||||
"comment_id": 3,
|
||||
"body": "0.2.4 test.",
|
||||
"added": 1424133003
|
||||
}
|
||||
]
|
||||
},
|
||||
"topic_votes": {
|
||||
"1@2": 1,
|
||||
"1@6": 1,
|
||||
"1@69": 1,
|
||||
"607@69": 1
|
||||
},
|
||||
"comment_votes": {
|
||||
"35@2": 1,
|
||||
"7@64": 1,
|
||||
"8@64": 1,
|
||||
"50@2": 1,
|
||||
"13@77": 1
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"cert_auth_type": "web",
|
||||
"cert_sign": "HBsTrjTmv+zD1iY93tSci8n9DqdEtYwzxJmRppn4/b+RYktcANGm5tXPOb+Duw3AJcgWDcGUvQVgN1D9QAwIlCw=",
|
||||
"cert_user_id": "toruser@zeroid.bit",
|
||||
"files": {
|
||||
"data.json": {
|
||||
"sha512": "4868b5e6d70a55d137db71c2e276bda80437e0235ac670962acc238071296b45",
|
||||
"size": 168
|
||||
}
|
||||
},
|
||||
"modified": 1432491109.11,
|
||||
"signs": {
|
||||
"1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9": "HMy7ZwwqE0Sk8O+5hTx/ejFW6KtIDbID6fGblCodUTpz4mJZ5GwApBHSVLMYL43vvGT/vKZOiQoJ5tQTeFVbbkk="
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"next_comment_id": 2,
|
||||
"comment": [
|
||||
{
|
||||
"comment_id": 1,
|
||||
"body": "hello from Tor!",
|
||||
"post_id": 38,
|
||||
"date_added": 1432491109
|
||||
}
|
||||
],
|
||||
"comment_vote": {}
|
||||
}
|
|
@ -12,7 +12,8 @@
|
|||
"permission_rules": {
|
||||
".*": {
|
||||
"files_allowed": "data.json",
|
||||
"max_size": 10000
|
||||
"max_size": 10000,
|
||||
"signers": [ "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet" ]
|
||||
},
|
||||
"bitid/.*@zeroid.bit": { "max_size": 40000 },
|
||||
"bitmsg/.*@zeroid.bit": { "max_size": 15000 }
|
||||
|
|
26
src/Test/testdata/cert-rsa.pem
vendored
26
src/Test/testdata/cert-rsa.pem
vendored
|
@ -1,26 +0,0 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIEVzCCAz+gAwIBAgIJAMT6z6CFolWoMA0GCSqGSIb3DQEBCwUAMH8xCzAJBgNV
|
||||
BAYTAlVTMQswCQYDVQQIDAJOWTERMA8GA1UEBwwITmV3IFlvcmsxFTATBgNVBAoM
|
||||
DEV4YW1wbGUsIExMQzEYMBYGA1UEAwwPRXhhbXBsZSBDb21wYW55MR8wHQYJKoZI
|
||||
hvcNAQkBFhB0ZXN0QGV4YW1wbGUuY29tMB4XDTE1MDYwNzIyNDcwMVoXDTE1MDcw
|
||||
NzIyNDcwMVowfzELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5ZMREwDwYDVQQHDAhO
|
||||
ZXcgWW9yazEVMBMGA1UECgwMRXhhbXBsZSwgTExDMRgwFgYDVQQDDA9FeGFtcGxl
|
||||
IENvbXBhbnkxHzAdBgkqhkiG9w0BCQEWEHRlc3RAZXhhbXBsZS5jb20wggEiMA0G
|
||||
CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCxQLG4PNW2okO2+seSyXduFDj6RsEE
|
||||
uLKmPrLzS8phW+VKZ7WaAnCQOX4ZX5oUNLIHSbsi57JBEzSxSMAbipaKilRbLtp4
|
||||
ObwDMUS+CTR2cohaYf7jCYrkWCeUWw/QT3zd65KXnysNmdPt8MTsgipiwACHPBZ6
|
||||
lMfzYPv0LoYNSubXAGp3gkXdh8oSFd1s4Dz0UC0g7D+McstyN72BWpQxkKpir8wN
|
||||
NYaXX02vXdXag1LHcG3tvQYE35Ssp0D7AyDOfnWOXIJBosPqAPdC9Bn2EYqs105u
|
||||
KxJvLL2fwRC5OWyFlxgl2/WbaVlxPTG+tCt1Z8PzSt/Ba4beXNXWZvdPAgMBAAGj
|
||||
gdUwgdIwHQYDVR0OBBYEFHpqhZLf14i9oSNLl6JCGn0Oj3ykMB8GA1UdIwQYMBaA
|
||||
FHpqhZLf14i9oSNLl6JCGn0Oj3ykMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMEoG
|
||||
A1UdEQRDMEGCC2V4YW1wbGUuY29tgg93d3cuZXhhbXBsZS5jb22CEG1haWwuZXhh
|
||||
bXBsZS5jb22CD2Z0cC5leGFtcGxlLmNvbTAsBglghkgBhvhCAQ0EHxYdT3BlblNT
|
||||
TCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwDQYJKoZIhvcNAQELBQADggEBADtW79uP
|
||||
gSB3iiOjvGTZqiYY0Uiw6CwUVtDGHSCv/daOboGnCsdKe4TcmjRg+GG8FQGxTpcg
|
||||
OJouUiCb+ATWnn0af+3wQefzMB1HISwhw+c62PcR6h+eFLITDJxVem/zK7Hhxwpq
|
||||
z8ud25+W1fLED2QdKIN7RyhtAS0bTq/XoZpkC0FXNa3GY2+HzP3gdIr4QGd+q+pf
|
||||
HWGg/+I9m7Uk/8trxvcQH+adohJXMIYJB8mGcHprtO28megMzQjzlnycu+fHDM/L
|
||||
hb6VxJU5rYe/Kfx2QHlCiRibTidCiaG1LJnpYxyNLF6pavstbziCfiruP6ki20+r
|
||||
bMzABTIXLaYxeUQ=
|
||||
-----END CERTIFICATE-----
|
28
src/Test/testdata/key-rsa.pem
vendored
28
src/Test/testdata/key-rsa.pem
vendored
|
@ -1,28 +0,0 @@
|
|||
-----BEGIN PRIVATE KEY-----
|
||||
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCxQLG4PNW2okO2
|
||||
+seSyXduFDj6RsEEuLKmPrLzS8phW+VKZ7WaAnCQOX4ZX5oUNLIHSbsi57JBEzSx
|
||||
SMAbipaKilRbLtp4ObwDMUS+CTR2cohaYf7jCYrkWCeUWw/QT3zd65KXnysNmdPt
|
||||
8MTsgipiwACHPBZ6lMfzYPv0LoYNSubXAGp3gkXdh8oSFd1s4Dz0UC0g7D+Mcsty
|
||||
N72BWpQxkKpir8wNNYaXX02vXdXag1LHcG3tvQYE35Ssp0D7AyDOfnWOXIJBosPq
|
||||
APdC9Bn2EYqs105uKxJvLL2fwRC5OWyFlxgl2/WbaVlxPTG+tCt1Z8PzSt/Ba4be
|
||||
XNXWZvdPAgMBAAECggEAFKWmjgF4G4gXPy9DJBQXt1jfsjefsD8bgEX5bmG7t0+P
|
||||
C/W8r63tEr+/spDyT7w7qKGsDMrUdsQX9Ta2Of8Qvh6S6PQyOqLvBagti71iwRFi
|
||||
VLTpjeTEqwihw6Q/2VIJB4izULoXt8PdbfAH4EzhRxN6fEZBkBHYoL8BWlY5Az8u
|
||||
tg6gjsoZFP1AiSByir0d+tf5adDVZieo5O8WYahi95TGqtkoM49YzX74I2dJ1jWY
|
||||
MNqMYxoPwvf+olNB3rMPZIGb4VMpNzsgbV73fNVyCMa9TNJSQUkELrrkym3us45w
|
||||
6kaBE7uU3wciQjeZOnRyDL4MnKGSUb0eJY0JjkQnkQKBgQDYx6GeXXDl6rcCovUR
|
||||
gdAvJco29rT4OPi1xbqHf9mUoBuNdXpwd2/bo4NlBfEVrZlSMZ/Vl2eTLAaYm6JI
|
||||
WwshIjnRU8Pe/4cffDsVRuXmdWPduXeqHqo8jwKDT9hOYcUiidWUgRdbfo108NQs
|
||||
XSvnGXXS0je58GZMTeC+A9Q1gwKBgQDRUlOkr2g7j5w2nuo2yRIKuRAdI4A1UZA/
|
||||
xddw3e6RLLq4vTF/6X0Wb1bsrGUhX9aJ/khWa7arN45q/KJfxwnV+I7wuPw3VQKA
|
||||
zH5tF1It7bSl7FgYFXJMqYkC77V7IdQL/PuM0vS8oewlpZInDCpJjPf/7bG7iUaw
|
||||
/Bb74N5ZRQKBgBGz+9ba+qVMDbYBaNINL9sp0uG6M/0xad4uT5VRM4uXp6hdt6oH
|
||||
lvLw34IYgh+rFaJIuyzOOH8kUUWVMCOIi9gg22fk11IWvAouMwUBzTSM0aMBymvy
|
||||
JSTc6O+gTaHZCihP6Uk/YZDvPM4X/LvCwBsXUS/uSu68Wx5QHdJmraXRAoGAGbr8
|
||||
/Slyrp+gnDY8pC9jQF4vVOWgRO2Zxb0UFpOxV7cf3MWk6AxTjAZzsPQgGlIllSDk
|
||||
03q6IaHap9wWOZ/F3b+IEp8qocKZZCu+/rn3KB4oLp021v8L5dCRPwMoU9J8tlyK
|
||||
r2zfGLDuzlHj/VjJefESKyuUxXDCd88FJEEoE+ECgYEAmsaUuUNP4/VH32DOzVEt
|
||||
wjhYvrph9fAcv67Mm+hbnBn7ACAnCJa0Ny8ZGzHSKck4WW+Drtyn6tv8naTsPnJn
|
||||
GYfZeQ6uqk46F2sMOJHpmRsoJ+1Jt3GtoEedqms2o3OsnMbseiBsPhLr4GaJOCc7
|
||||
+c+EexCfljR19TVV4EDhVRw=
|
||||
-----END PRIVATE KEY-----
|
19
src/Test/testdata/sites.json
vendored
19
src/Test/testdata/sites.json
vendored
|
@ -1,19 +0,0 @@
|
|||
{
|
||||
"159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL": {
|
||||
"auth_key": "4YV2yD0tRsQkujR313PyTaLI",
|
||||
"own": false,
|
||||
"permissions": [],
|
||||
"serving": true,
|
||||
"size": 0,
|
||||
"wrapper_key": "eRdtnDMCarLs"
|
||||
},
|
||||
"1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT": {
|
||||
"auth_key": "VyVxJU0AJqYGNTvITwASeSJA",
|
||||
"modified": 1433079881.276243,
|
||||
"own": false,
|
||||
"permissions": [],
|
||||
"serving": true,
|
||||
"size": 722338,
|
||||
"wrapper_key": "amGBy0GlD6nD"
|
||||
}
|
||||
}
|
12
src/Test/testdata/users.json
vendored
12
src/Test/testdata/users.json
vendored
|
@ -1,12 +0,0 @@
|
|||
{
|
||||
"15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": {
|
||||
"certs": {},
|
||||
"master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a",
|
||||
"sites": {
|
||||
"1LzBg7WGjJXwJtre2iKwbZtRXezWzUMXDJ": {
|
||||
"auth_address": "15DZCFDwPjFGf9E2DFVQzZJ2voTWZEesRX",
|
||||
"auth_privatekey": "5KC4tMJdA8wJkNiVTuWCjNVBm7eeXWCSTRAnGABRaH5RhMEYyet"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue