Rev423, Rewrite and reorganize test using pytest, New PhantomJS based browser tests, Fix html requests error without wrapper nonce, Indent json files with 1 space
This commit is contained in:
parent
93312ae129
commit
e00537ba57
16 changed files with 575 additions and 584 deletions
|
@ -8,7 +8,7 @@ class Config(object):
|
|||
|
||||
def __init__(self, argv):
|
||||
self.version = "0.3.2"
|
||||
self.rev = 420
|
||||
self.rev = 423
|
||||
self.argv = argv
|
||||
self.action = None
|
||||
self.createParser()
|
||||
|
|
|
@ -178,7 +178,7 @@ class SiteStorage:
|
|||
|
||||
# Write formatted json file
|
||||
def writeJson(self, inner_path, data):
|
||||
content = json.dumps(data, indent=2, sort_keys=True)
|
||||
content = json.dumps(data, indent=1, sort_keys=True)
|
||||
# Make it a little more compact by removing unnecessary white space
|
||||
|
||||
def compact_list(match):
|
||||
|
|
|
@ -1,140 +0,0 @@
|
|||
import time
|
||||
import socket
|
||||
import msgpack
|
||||
|
||||
|
||||
print "Connecting..."
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", 1234))
|
||||
|
||||
|
||||
print "1 Threaded: Send, receive 10000 ping request...",
|
||||
s = time.time()
|
||||
for i in range(10000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Ping"}))
|
||||
req = sock.recv(16 * 1024)
|
||||
print time.time() - s, repr(req), time.time() - s
|
||||
|
||||
|
||||
print "1 Threaded: Send, receive, decode 10000 ping request...",
|
||||
s = time.time()
|
||||
unpacker = msgpack.Unpacker()
|
||||
reqs = 0
|
||||
for i in range(10000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Ping"}))
|
||||
unpacker.feed(sock.recv(16 * 1024))
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
print "Found:", req, "x", reqs, time.time() - s
|
||||
|
||||
|
||||
print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...",
|
||||
s = time.time()
|
||||
unpacker = msgpack.Unpacker()
|
||||
reqs = 0
|
||||
for i in range(1000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Ping"}))
|
||||
unpacker.feed(sock.recv(16 * 1024))
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
sock.close()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", 1234))
|
||||
print "Found:", req, "x", reqs, time.time() - s
|
||||
|
||||
|
||||
print "1 Threaded: Request, receive, decode 10000 x 10k data request...",
|
||||
s = time.time()
|
||||
unpacker = msgpack.Unpacker()
|
||||
reqs = 0
|
||||
for i in range(10000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
|
||||
|
||||
"""buff = StringIO()
|
||||
data = sock.recv(16*1024)
|
||||
buff.write(data)
|
||||
if not data:
|
||||
break
|
||||
while not data.endswith("\n"):
|
||||
data = sock.recv(16*1024)
|
||||
if not data: break
|
||||
buff.write(data)
|
||||
req = msgpack.unpackb(buff.getvalue().strip("\n"))
|
||||
reqs += 1"""
|
||||
|
||||
req_found = False
|
||||
while not req_found:
|
||||
buff = sock.recv(16 * 1024)
|
||||
unpacker.feed(buff)
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
req_found = True
|
||||
break # Only process one request
|
||||
print "Found:", len(req["res"]), "x", reqs, time.time() - s
|
||||
|
||||
|
||||
print "10 Threaded: Request, receive, decode 10000 x 10k data request...",
|
||||
import gevent
|
||||
s = time.time()
|
||||
reqs = 0
|
||||
req = None
|
||||
|
||||
|
||||
def requester():
|
||||
global reqs, req
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", 1234))
|
||||
unpacker = msgpack.Unpacker()
|
||||
for i in range(1000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
|
||||
|
||||
req_found = False
|
||||
while not req_found:
|
||||
buff = sock.recv(16 * 1024)
|
||||
unpacker.feed(buff)
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
req_found = True
|
||||
break # Only process one request
|
||||
|
||||
threads = []
|
||||
for i in range(10):
|
||||
threads.append(gevent.spawn(requester))
|
||||
gevent.joinall(threads)
|
||||
print "Found:", len(req["res"]), "x", reqs, time.time() - s
|
||||
|
||||
|
||||
print "1 Threaded: ZeroMQ Send, receive 1000 ping request...",
|
||||
s = time.time()
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context()
|
||||
zmq_sock = c.socket(zmq.REQ)
|
||||
zmq_sock.connect('tcp://127.0.0.1:1234')
|
||||
for i in range(1000):
|
||||
zmq_sock.send(msgpack.packb({"cmd": "Ping"}))
|
||||
req = zmq_sock.recv(16 * 1024)
|
||||
print "Found:", req, time.time() - s
|
||||
|
||||
|
||||
print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...",
|
||||
s = time.time()
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context()
|
||||
zmq_sock = c.socket(zmq.REQ)
|
||||
zmq_sock.connect('tcp://127.0.0.1:1234')
|
||||
for i in range(1000):
|
||||
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
|
||||
req = msgpack.unpackb(zmq_sock.recv(1024 * 1024))
|
||||
print "Found:", len(req["res"]), time.time() - s
|
||||
|
||||
|
||||
print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...",
|
||||
s = time.time()
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context()
|
||||
zmq_sock = c.socket(zmq.REQ)
|
||||
zmq_sock.connect('tcp://127.0.0.1:1233')
|
||||
for i in range(1000):
|
||||
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
|
||||
req = msgpack.unpackb(zmq_sock.recv(1024 * 1024))
|
||||
print "Found:", len(req["res"]), time.time() - s
|
31
src/Test/TestConfig.py
Normal file
31
src/Test/TestConfig.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
import pytest
|
||||
|
||||
import Config
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestUser:
|
||||
def testParse(self):
|
||||
# Defaults
|
||||
config_test = Config.Config("zeronet.py".split(" "))
|
||||
config_test.parse(silent=True, parse_config=False)
|
||||
assert not config_test.debug
|
||||
assert not config_test.debug_socket
|
||||
|
||||
# Test parse command line with unknown parameters (ui_password)
|
||||
config_test = Config.Config("zeronet.py --debug --debug_socket --ui_password hello".split(" "))
|
||||
config_test.parse(silent=True, parse_config=False)
|
||||
assert config_test.debug
|
||||
assert config_test.debug_socket
|
||||
with pytest.raises(AttributeError):
|
||||
config_test.ui_password
|
||||
|
||||
# More complex test
|
||||
args = "zeronet.py --unknown_arg --debug --debug_socket --ui_restrict 127.0.0.1 1.2.3.4 "
|
||||
args += "--another_unknown argument --use_openssl False siteSign address privatekey --inner_path users/content.json"
|
||||
config_test = Config.Config(args.split(" "))
|
||||
config_test.parse(silent=True, parse_config=False)
|
||||
assert config_test.debug
|
||||
assert "1.2.3.4" in config_test.ui_restrict
|
||||
assert not config_test.use_openssl
|
||||
assert config_test.inner_path == "users/content.json"
|
68
src/Test/TestContent.py
Normal file
68
src/Test/TestContent.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import json
|
||||
from cStringIO import StringIO
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestContent:
|
||||
def testIncludes(self, site):
|
||||
# Rules defined in parent content.json
|
||||
rules = site.content_manager.getRules("data/test_include/content.json")
|
||||
|
||||
assert rules["signers"] == ["15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo"] # Valid signer
|
||||
assert rules["user_name"] == "test" # Extra data
|
||||
assert rules["max_size"] == 20000 # Max size of files
|
||||
assert not rules["includes_allowed"] # Don't allow more includes
|
||||
assert rules["files_allowed"] == "data.json" # Allowed file pattern
|
||||
|
||||
# Valid signers for "data/test_include/content.json"
|
||||
valid_signers = site.content_manager.getValidSigners("data/test_include/content.json")
|
||||
assert "15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo" in valid_signers # Extra valid signer defined in parent content.json
|
||||
assert "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT" in valid_signers # The site itself
|
||||
assert len(valid_signers) == 2 # No more
|
||||
|
||||
# Valid signers for "data/users/content.json"
|
||||
valid_signers = site.content_manager.getValidSigners("data/users/content.json")
|
||||
assert "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f" in valid_signers # Extra valid signer defined in parent content.json
|
||||
assert "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT" in valid_signers # The site itself
|
||||
assert len(valid_signers) == 2
|
||||
|
||||
# Valid signers for root content.json
|
||||
assert site.content_manager.getValidSigners("content.json") == ["1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"]
|
||||
|
||||
def testSizelimit(self, site):
|
||||
# Data validation
|
||||
data_dict = {
|
||||
"files": {
|
||||
"data.json": {
|
||||
"sha512": "369d4e780cc80504285f13774ca327fe725eed2d813aad229e62356b07365906",
|
||||
"size": 505
|
||||
}
|
||||
},
|
||||
"modified": 1431451896.656619,
|
||||
"signs": {
|
||||
"15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo":
|
||||
"G2QC+ZIozPQQ/XiOEOMzfekOP8ipi+rKaTy/R/3MnDf98mLIhSSA8927FW6D/ZyP7HHuII2y9d0zbAk+rr8ksQM="
|
||||
}
|
||||
}
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
|
||||
# Normal data
|
||||
assert site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False)
|
||||
|
||||
# Too large
|
||||
data_dict["files"]["data.json"]["size"] = 200000 # Emulate 2MB sized data.json
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
assert not site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False)
|
||||
data_dict["files"]["data.json"]["size"] = 505 # Reset
|
||||
|
||||
# Not allowed file
|
||||
data_dict["files"]["notallowed.exe"] = data_dict["files"]["data.json"]
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
assert not site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False)
|
||||
del data_dict["files"]["notallowed.exe"] # Reset
|
||||
|
||||
# Should work again
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
assert site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False)
|
44
src/Test/TestCryptBitcoin.py
Normal file
44
src/Test/TestCryptBitcoin.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
from Crypt import CryptBitcoin
|
||||
|
||||
|
||||
class TestCryptBitcoin:
|
||||
def testSignOld(self):
|
||||
privatekey = "23DKQpDz7bXM7w5KN5Wnmz7bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
privatekey_bad = "23DKQpDz7bXM7w5KN5Wnmz6bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
|
||||
# Get address by privatekey
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
assert address == "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez"
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
assert not address_bad == "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez"
|
||||
|
||||
# Text signing
|
||||
sign = CryptBitcoin.signOld("hello", privatekey)
|
||||
assert CryptBitcoin.verify("hello", address, sign) # Original text
|
||||
assert not CryptBitcoin.verify("not hello", address, sign) # Different text
|
||||
|
||||
# Signed by bad privatekey
|
||||
sign_bad = CryptBitcoin.signOld("hello", privatekey_bad)
|
||||
assert not CryptBitcoin.verify("hello", address, sign_bad)
|
||||
|
||||
def testSign(self):
|
||||
privatekey = "5K9S6dVpufGnroRgFrT6wsKiz2mJRYsC73eWDmajaHserAp3F1C"
|
||||
privatekey_bad = "5Jbm9rrusXyApAoM8YoM4Rja337zMMoBUMRJ1uijiguU2aZRnwC"
|
||||
|
||||
# Get address by privatekey
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
assert address == "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz"
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
assert address_bad != "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz"
|
||||
|
||||
# Text signing
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
|
||||
assert CryptBitcoin.verify("hello", address, sign)
|
||||
assert not CryptBitcoin.verify("not hello", address, sign)
|
||||
|
||||
# Signed by bad privatekey
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
assert not CryptBitcoin.verify("hello", address, sign_bad)
|
27
src/Test/TestCryptConnection.py
Normal file
27
src/Test/TestCryptConnection.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
import os
|
||||
|
||||
from Config import config
|
||||
from Crypt import CryptConnection
|
||||
|
||||
|
||||
class TestCryptConnection:
|
||||
def testSslCert(self):
|
||||
# Remove old certs
|
||||
if os.path.isfile("%s/cert-rsa.pem" % config.data_dir):
|
||||
os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
if os.path.isfile("%s/key-rsa.pem" % config.data_dir):
|
||||
os.unlink("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
# Generate certs
|
||||
CryptConnection.manager.loadCerts()
|
||||
|
||||
assert "tls-rsa" in CryptConnection.manager.crypt_supported
|
||||
assert CryptConnection.manager.selectCrypt(["tls-rsa", "unknown"]) == "tls-rsa" # It should choose the known crypt
|
||||
|
||||
# Check openssl cert generation
|
||||
assert os.path.isfile("%s/cert-rsa.pem" % config.data_dir)
|
||||
assert os.path.isfile("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
# Remove created files
|
||||
os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
os.unlink("%s/key-rsa.pem" % config.data_dir)
|
55
src/Test/TestDb.py
Normal file
55
src/Test/TestDb.py
Normal file
|
@ -0,0 +1,55 @@
|
|||
import os
|
||||
|
||||
from Config import config
|
||||
from Db import Db
|
||||
|
||||
|
||||
class TestDb:
|
||||
def testCheckTables(self):
|
||||
db_path = "%s/zeronet.db" % config.data_dir
|
||||
schema = {
|
||||
"db_name": "TestDb",
|
||||
"db_file": "%s/zeronet.db" % config.data_dir,
|
||||
"map": {
|
||||
"data.json": {
|
||||
"to_table": {
|
||||
"test": "test"
|
||||
}
|
||||
}
|
||||
},
|
||||
"tables": {
|
||||
"test": {
|
||||
"cols": [
|
||||
["test_id", "INTEGER"],
|
||||
["title", "TEXT"],
|
||||
],
|
||||
"indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"],
|
||||
"schema_changed": 1426195822
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if os.path.isfile(db_path):
|
||||
os.unlink(db_path)
|
||||
db = Db(schema, db_path)
|
||||
db.checkTables()
|
||||
db.close()
|
||||
|
||||
# Verify tables
|
||||
assert os.path.isfile(db_path)
|
||||
db = Db(schema, db_path)
|
||||
|
||||
tables = [row["name"] for row in db.execute("SELECT name FROM sqlite_master WHERE type='table'")]
|
||||
assert "keyvalue" in tables # To store simple key -> value
|
||||
assert "json" in tables # Json file path registry
|
||||
assert "test" in tables # The table defined in dbschema.json
|
||||
|
||||
# Verify test table
|
||||
cols = [col["name"] for col in db.execute("PRAGMA table_info(test)")]
|
||||
assert "test_id" in cols
|
||||
assert "title" in cols
|
||||
|
||||
db.close()
|
||||
|
||||
# Cleanup
|
||||
os.unlink(db_path)
|
54
src/Test/TestSite.py
Normal file
54
src/Test/TestSite.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
import shutil
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestSite:
|
||||
def testClone(self, site):
|
||||
assert site.storage.directory == "src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"
|
||||
|
||||
# Remove old files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
assert not os.path.isfile("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL/content.json")
|
||||
|
||||
# Clone 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT to 15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc
|
||||
new_site = site.clone(
|
||||
"159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL", "5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee", address_index=1
|
||||
)
|
||||
|
||||
# Check if clone was successful
|
||||
assert new_site.address == "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"
|
||||
assert new_site.storage.isFile("content.json")
|
||||
assert new_site.storage.isFile("index.html")
|
||||
assert new_site.storage.isFile("data/users/content.json")
|
||||
assert new_site.storage.isFile("data/zeroblog.db")
|
||||
|
||||
# Test re-cloning (updating)
|
||||
|
||||
# Changes in non-data files should be overwritten
|
||||
new_site.storage.write("index.html", "this will be overwritten")
|
||||
assert new_site.storage.read("index.html"), "this will be overwritten"
|
||||
|
||||
# Changes in data file should be kept after re-cloning
|
||||
changed_contentjson = new_site.storage.loadJson("content.json")
|
||||
changed_contentjson["description"] = "Update Description Test"
|
||||
new_site.storage.writeJson("content.json", changed_contentjson)
|
||||
|
||||
changed_data = new_site.storage.loadJson("data/data.json")
|
||||
changed_data["title"] = "UpdateTest"
|
||||
new_site.storage.writeJson("data/data.json", changed_data)
|
||||
|
||||
# Re-clone the site
|
||||
site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
|
||||
assert new_site.storage.loadJson("data/data.json")["title"] == "UpdateTest"
|
||||
assert new_site.storage.loadJson("content.json")["description"] == "Update Description Test"
|
||||
assert new_site.storage.read("index.html") != "this will be overwritten"
|
||||
|
||||
# Delete created files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
new_site.storage.closeDb()
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
23
src/Test/TestUser.py
Normal file
23
src/Test/TestUser.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
import pytest
|
||||
|
||||
from Crypt import CryptBitcoin
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestUser:
|
||||
def testNewsite(self, user):
|
||||
user.sites = {} # Reset user data
|
||||
assert user.master_address == "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc"
|
||||
address_index = 1458664252141532163166741013621928587528255888800826689784628722366466547364755811L
|
||||
assert user.getAddressAuthIndex("15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc") == address_index
|
||||
|
||||
# Re-generate privatekey based on address_index
|
||||
address, address_index, site_data = user.getNewSiteData()
|
||||
assert CryptBitcoin.hdPrivatekey(user.master_seed, address_index) == site_data["privatekey"]
|
||||
|
||||
user.sites = {} # Reset user data
|
||||
|
||||
# Site address and auth address is different
|
||||
assert user.getSiteData(address)["auth_address"] != address
|
||||
# Re-generate auth_privatekey for site
|
||||
assert user.getSiteData(address)["auth_privatekey"] == site_data["auth_privatekey"]
|
118
src/Test/TestUserContent.py
Normal file
118
src/Test/TestUserContent.py
Normal file
|
@ -0,0 +1,118 @@
|
|||
import json
|
||||
from cStringIO import StringIO
|
||||
|
||||
import pytest
|
||||
|
||||
from Crypt import CryptBitcoin
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestUserContent:
|
||||
def testSigners(self, site):
|
||||
# File info for not existing user file
|
||||
file_info = site.content_manager.getFileInfo("data/users/notexist/data.json")
|
||||
assert file_info["content_inner_path"] == "data/users/notexist/content.json"
|
||||
valid_signers = site.content_manager.getValidSigners("data/users/notexist/content.json")
|
||||
assert valid_signers == ["notexist", "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"]
|
||||
|
||||
# File info for exsitsing user file
|
||||
valid_signers = site.content_manager.getValidSigners("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
assert '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT' in valid_signers # The site address
|
||||
assert '14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet' in valid_signers # Admin user definied in data/users/content.json
|
||||
assert '1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C' in valid_signers # The user itself
|
||||
assert len(valid_signers) == 3 # No more valid signers
|
||||
|
||||
def testRules(self, site):
|
||||
# We going to manipulate it this test rules based on data/users/content.json
|
||||
user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
|
||||
# Known user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "nofish@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
assert rules["max_size"] == 100000
|
||||
|
||||
# Unknown user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
assert rules["max_size"] == 10000
|
||||
|
||||
# User with more size limit based on auth type
|
||||
user_content["cert_auth_type"] = "bitmsg"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
assert rules["max_size"] == 15000
|
||||
|
||||
# Banned user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "bad@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
assert rules is False
|
||||
|
||||
def testCert(self, site):
|
||||
# user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C"
|
||||
user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
|
||||
# cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet"
|
||||
cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"
|
||||
|
||||
# Check if the user file is loaded
|
||||
assert "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents
|
||||
user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
|
||||
rules_content = site.content_manager.contents["data/users/content.json"]
|
||||
|
||||
# Override valid cert signers for the test
|
||||
rules_content["user_contents"]["cert_signers"]["zeroid.bit"] = [
|
||||
"14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet",
|
||||
"1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"
|
||||
]
|
||||
|
||||
# Check valid cert signers
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
assert rules["cert_signers"] == {"zeroid.bit": [
|
||||
"14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet",
|
||||
"1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"
|
||||
]}
|
||||
|
||||
# Sign a valid cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign("1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (
|
||||
user_content["cert_auth_type"],
|
||||
user_content["cert_user_id"].split("@")[0]
|
||||
), cert_priv)
|
||||
|
||||
# Verify cert
|
||||
assert site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
|
||||
# Verify if the cert is valid for other address
|
||||
assert not site.content_manager.verifyCert("data/users/badaddress/content.json", user_content)
|
||||
|
||||
# Sign user content
|
||||
signed_content = site.content_manager.sign(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
|
||||
)
|
||||
|
||||
# Test user cert
|
||||
assert site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
)
|
||||
|
||||
# Test banned user
|
||||
cert_user_id = user_content["cert_user_id"] # My username
|
||||
site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][cert_user_id] = False
|
||||
assert not site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
)
|
||||
|
||||
# Test invalid cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv
|
||||
)
|
||||
signed_content = site.content_manager.sign(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False
|
||||
)
|
||||
assert not site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
)
|
77
src/Test/TestWeb.py
Normal file
77
src/Test/TestWeb.py
Normal file
|
@ -0,0 +1,77 @@
|
|||
import urllib
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support.expected_conditions import staleness_of
|
||||
from selenium.common.exceptions import NoSuchElementException
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
class WaitForPageLoad(object):
|
||||
def __init__(self, browser):
|
||||
self.browser = browser
|
||||
|
||||
def __enter__(self):
|
||||
self.old_page = self.browser.find_element_by_tag_name('html')
|
||||
|
||||
def __exit__(self, *args):
|
||||
WebDriverWait(self.browser, 20).until(staleness_of(self.old_page))
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestWeb:
|
||||
def testFileSecurity(self, site_url):
|
||||
assert "Forbidden" in urllib.urlopen("%s/media/./sites.json" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/media/../config.py" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json" % site_url).read()
|
||||
assert "Forbidden" in urllib.urlopen("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py" % site_url).read()
|
||||
|
||||
def testHomepage(self, browser, site_url):
|
||||
browser.get("%s" % site_url)
|
||||
assert browser.title == "ZeroHello - ZeroNet"
|
||||
|
||||
def testLinkSecurity(self, browser, site_url):
|
||||
browser.get("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/test/security.html" % site_url)
|
||||
assert browser.title == "ZeroHello - ZeroNet"
|
||||
assert browser.current_url == "%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/test/security.html" % site_url
|
||||
|
||||
# Switch to inner frame
|
||||
browser.switch_to.frame(browser.find_element_by_id("inner-iframe"))
|
||||
assert "wrapper_nonce" in browser.current_url
|
||||
browser.switch_to.default_content()
|
||||
|
||||
# Clicking on links without target
|
||||
browser.switch_to.frame(browser.find_element_by_id("inner-iframe"))
|
||||
with WaitForPageLoad(browser):
|
||||
browser.find_element_by_id("link_to_current").click()
|
||||
assert "wrapper_nonce" not in browser.current_url # The browser object back to default content
|
||||
assert "Forbidden" not in browser.page_source
|
||||
# Check if we have frame inside frame
|
||||
browser.switch_to.frame(browser.find_element_by_id("inner-iframe"))
|
||||
with pytest.raises(NoSuchElementException):
|
||||
assert not browser.find_element_by_id("inner-iframe")
|
||||
browser.switch_to.default_content()
|
||||
|
||||
# Clicking on link with target=_top
|
||||
browser.switch_to.frame(browser.find_element_by_id("inner-iframe"))
|
||||
with WaitForPageLoad(browser):
|
||||
browser.find_element_by_id("link_to_top").click()
|
||||
assert "wrapper_nonce" not in browser.current_url # The browser object back to default content
|
||||
assert "Forbidden" not in browser.page_source
|
||||
browser.switch_to.default_content()
|
||||
|
||||
# Try to escape from inner_frame
|
||||
browser.switch_to.frame(browser.find_element_by_id("inner-iframe"))
|
||||
assert "wrapper_nonce" in browser.current_url # Make sure we are inside of the inner-iframe
|
||||
with WaitForPageLoad(browser):
|
||||
browser.execute_script("window.top.location = window.location")
|
||||
assert "wrapper_nonce" in browser.current_url # We try to use nonce-ed html without iframe
|
||||
assert "Forbidden" in browser.page_source # Only allow to use nonce once-time
|
||||
browser.switch_to.default_content()
|
71
src/Test/conftest.py
Normal file
71
src/Test/conftest.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
import os
|
||||
import sys
|
||||
import urllib
|
||||
|
||||
import pytest
|
||||
|
||||
# Config
|
||||
if sys.platform == "win32":
|
||||
PHANTOMJS_PATH = "tools/phantomjs/bin/phantomjs.exe"
|
||||
else:
|
||||
PHANTOMJS_PATH = "phantomjs"
|
||||
SITE_URL = "http://127.0.0.1:43110"
|
||||
|
||||
# Imports relative to src dir
|
||||
sys.path.append(
|
||||
os.path.abspath(os.path.dirname(__file__)+"/..")
|
||||
)
|
||||
from Config import config
|
||||
config.argv = ["none"] # Dont pass any argv to config parser
|
||||
config.parse()
|
||||
config.data_dir = "src/Test/testdata" # Use test data for unittests
|
||||
|
||||
from Site import Site
|
||||
from User import UserManager
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def resetSettings(request):
|
||||
os.chdir(os.path.abspath(os.path.dirname(__file__)+"/../..")) # Set working dir
|
||||
open("%s/sites.json" % config.data_dir, "w").write("{}")
|
||||
open("%s/users.json" % config.data_dir, "w").write("""
|
||||
{
|
||||
"15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": {
|
||||
"certs": {},
|
||||
"master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a",
|
||||
"sites": {}
|
||||
}
|
||||
}
|
||||
""")
|
||||
def cleanup():
|
||||
os.unlink("%s/sites.json" % config.data_dir)
|
||||
os.unlink("%s/users.json" % config.data_dir)
|
||||
request.addfinalizer(cleanup)
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def site():
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
return site
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user():
|
||||
user = UserManager.user_manager.get()
|
||||
return user
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def browser():
|
||||
try:
|
||||
from selenium import webdriver
|
||||
browser = webdriver.PhantomJS(executable_path=PHANTOMJS_PATH, service_log_path=os.path.devnull)
|
||||
browser.set_window_size(1400, 1000)
|
||||
except Exception, err:
|
||||
raise pytest.skip("Test requires selenium + phantomjs: %s" % err)
|
||||
return browser
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def site_url():
|
||||
try:
|
||||
urllib.urlopen(SITE_URL).read()
|
||||
except Exception, err:
|
||||
raise pytest.skip("Test requires zeronet client running: %s" % err)
|
||||
return SITE_URL
|
4
src/Test/pytest.ini
Normal file
4
src/Test/pytest.ini
Normal file
|
@ -0,0 +1,4 @@
|
|||
[pytest]
|
||||
python_files = Test*.py
|
||||
addopts = -rsxX -v
|
||||
|
441
src/Test/test.py
441
src/Test/test.py
|
@ -1,441 +0,0 @@
|
|||
import sys
|
||||
import os
|
||||
import unittest
|
||||
import urllib
|
||||
import time
|
||||
sys.path.append(os.path.abspath("src")) # Imports relative to src dir
|
||||
|
||||
from Config import config
|
||||
config.parse()
|
||||
config.data_dir = "src/Test/testdata" # Use test data for unittests
|
||||
|
||||
from Crypt import CryptBitcoin
|
||||
|
||||
|
||||
class TestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if not os.path.isfile("%s/sites.json" % config.data_dir):
|
||||
open("%s/sites.json" % config.data_dir, "w").write("{}")
|
||||
if not os.path.isfile("%s/users.json" % config.data_dir):
|
||||
open("%s/users.json" % config.data_dir, "w").write("""
|
||||
{
|
||||
"15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": {
|
||||
"certs": {},
|
||||
"master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a",
|
||||
"sites": {}
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.isfile("src/Test/testdata/users.json"):
|
||||
os.unlink("src/Test/testdata/users.json")
|
||||
if os.path.isfile("src/Test/testdata/sites.json"):
|
||||
os.unlink("src/Test/testdata/sites.json")
|
||||
|
||||
def testMediaRoute(self):
|
||||
try:
|
||||
urllib.urlopen("http://127.0.0.1:43110").read()
|
||||
except Exception, err:
|
||||
raise unittest.SkipTest(err)
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/./sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/../config.py").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read())
|
||||
self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read())
|
||||
|
||||
def testBitcoinSignOld(self):
|
||||
s = time.time()
|
||||
privatekey = "23DKQpDz7bXM7w5KN5Wnmz7bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
privatekey_bad = "23DKQpDz7bXM7w5KN5Wnmz6bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez")
|
||||
|
||||
sign = CryptBitcoin.signOld("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.signOld("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time() - s),
|
||||
|
||||
def testBitcoinSign(self):
|
||||
s = time.time()
|
||||
privatekey = "5K9S6dVpufGnroRgFrT6wsKiz2mJRYsC73eWDmajaHserAp3F1C"
|
||||
privatekey_bad = "5Jbm9rrusXyApAoM8YoM4Rja337zMMoBUMRJ1uijiguU2aZRnwC"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz")
|
||||
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time() - s),
|
||||
|
||||
def testBitcoinSignCompressed(self):
|
||||
raise unittest.SkipTest("Not supported yet")
|
||||
s = time.time()
|
||||
privatekey = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKbZAgWdpFo1UETZSKdgHaNN2J"
|
||||
privatekey_bad = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKsZAgWdpFo1UETZSKdgHaNN2J"
|
||||
|
||||
address = CryptBitcoin.privatekeyToAddress(privatekey)
|
||||
self.assertEqual(address, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f")
|
||||
|
||||
address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad)
|
||||
self.assertNotEqual(address_bad, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f")
|
||||
|
||||
sign = CryptBitcoin.sign("hello", privatekey)
|
||||
|
||||
self.assertTrue(CryptBitcoin.verify("hello", address, sign))
|
||||
self.assertFalse(CryptBitcoin.verify("not hello", address, sign))
|
||||
|
||||
sign_bad = CryptBitcoin.sign("hello", privatekey_bad)
|
||||
self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad))
|
||||
|
||||
print "Taken: %.3fs, " % (time.time() - s),
|
||||
|
||||
def testTrackers(self):
|
||||
raise unittest.SkipTest("Notyet")
|
||||
from Site import SiteManager
|
||||
from lib.subtl.subtl import UdpTrackerClient
|
||||
import hashlib
|
||||
|
||||
ok = 0
|
||||
for protocol, ip, port in SiteManager.TRACKERS:
|
||||
address = "test"
|
||||
if protocol == "udp":
|
||||
tracker = UdpTrackerClient(ip, port)
|
||||
peers = None
|
||||
try:
|
||||
tracker.connect()
|
||||
tracker.poll_once()
|
||||
tracker.announce(info_hash=hashlib.sha1(address).hexdigest(), num_want=5)
|
||||
back = tracker.poll_once()
|
||||
peers = back["response"]["peers"]
|
||||
except Exception, err:
|
||||
peers = None
|
||||
print "Tracker error: %s://%s:%s %s" % (protocol, ip, port, err)
|
||||
if peers is not None:
|
||||
ok += 1
|
||||
|
||||
self.assertEqual(ok, len(SiteManager.TRACKERS))
|
||||
|
||||
def testDb(self):
|
||||
from Db import Db
|
||||
for db_path in [os.path.abspath("%s/test/zeronet.db" % config.data_dir), "%s/test/zeronet.db" % config.data_dir]:
|
||||
print "Creating db using %s..." % db_path,
|
||||
schema = {
|
||||
"db_name": "TestDb",
|
||||
"db_file": "%s/test/zeronet.db" % config.data_dir,
|
||||
"map": {
|
||||
"data.json": {
|
||||
"to_table": {
|
||||
"test": "test"
|
||||
}
|
||||
}
|
||||
},
|
||||
"tables": {
|
||||
"test": {
|
||||
"cols": [
|
||||
["test_id", "INTEGER"],
|
||||
["title", "TEXT"],
|
||||
],
|
||||
"indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"],
|
||||
"schema_changed": 1426195822
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if os.path.isfile("%s/test/zeronet.db" % config.data_dir):
|
||||
os.unlink("%s/test/zeronet.db" % config.data_dir)
|
||||
db = Db(schema, "%s/test/zeronet.db" % config.data_dir)
|
||||
db.checkTables()
|
||||
db.close()
|
||||
|
||||
# Cleanup
|
||||
os.unlink("%s/test/zeronet.db" % config.data_dir)
|
||||
os.rmdir("%s/test/" % config.data_dir)
|
||||
|
||||
def testContentManagerIncludes(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
# Include info
|
||||
rules = site.content_manager.getRules("data/test_include/content.json")
|
||||
self.assertEqual(rules["signers"], ['15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo'])
|
||||
self.assertEqual(rules["user_name"], 'test')
|
||||
self.assertEqual(rules["max_size"], 20000)
|
||||
self.assertEqual(rules["includes_allowed"], False)
|
||||
self.assertEqual(rules["files_allowed"], 'data.json')
|
||||
# Valid signers
|
||||
self.assertEqual(
|
||||
site.content_manager.getValidSigners("data/test_include/content.json"),
|
||||
['15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT']
|
||||
)
|
||||
self.assertEqual(
|
||||
site.content_manager.getValidSigners("data/users/content.json"),
|
||||
['1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT']
|
||||
)
|
||||
self.assertEqual(site.content_manager.getValidSigners("content.json"), ['1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT'])
|
||||
|
||||
# Data validation
|
||||
data_dict = {
|
||||
"files": {
|
||||
"data.json": {
|
||||
"sha512": "369d4e780cc80504285f13774ca327fe725eed2d813aad229e62356b07365906",
|
||||
"size": 505
|
||||
}
|
||||
},
|
||||
"modified": 1431451896.656619,
|
||||
"signs": {
|
||||
"15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo": "G2QC+ZIozPQQ/XiOEOMzfekOP8ipi+rKaTy/R/3MnDf98mLIhSSA8927FW6D/ZyP7HHuII2y9d0zbAk+rr8ksQM="
|
||||
}
|
||||
}
|
||||
# Normal data
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), True)
|
||||
# Too large
|
||||
data_dict["files"]["data.json"]["size"] = 200000 # Emulate 2MB sized data.json
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), False)
|
||||
data_dict["files"]["data.json"]["size"] = 505 # Reset
|
||||
# Not allowed file
|
||||
data_dict["files"]["data.html"] = data_dict["files"]["data.json"]
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), False)
|
||||
del data_dict["files"]["data.html"] # Reset
|
||||
# Should work again
|
||||
data = StringIO(json.dumps(data_dict))
|
||||
self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), True)
|
||||
|
||||
def testUserContentRules(self):
|
||||
from Site import Site
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
user_content = site.storage.loadJson("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json")
|
||||
|
||||
# File info for not exist file
|
||||
self.assertEqual(site.content_manager.getFileInfo("data/users/notexist/data.json")["content_inner_path"], "data/users/notexist/content.json")
|
||||
self.assertEqual(site.content_manager.getValidSigners("data/users/notexist/data.json"), ["notexist", "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"])
|
||||
|
||||
# File info for exsitsing file
|
||||
file_info = site.content_manager.getFileInfo("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/data.json")
|
||||
valid_signers = site.content_manager.getValidSigners(file_info["content_inner_path"], user_content)
|
||||
self.assertEqual(valid_signers, ['14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet', '1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT'])
|
||||
|
||||
# Known user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "nofish@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 100000)
|
||||
|
||||
# Unknown user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 10000)
|
||||
|
||||
# User with more size limit by auth type
|
||||
user_content["cert_auth_type"] = "bitmsg"
|
||||
user_content["cert_user_id"] = "noone@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertEqual(rules["max_size"], 15000)
|
||||
|
||||
# Banned user
|
||||
user_content["cert_auth_type"] = "web"
|
||||
user_content["cert_user_id"] = "bad@zeroid.bit"
|
||||
rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content)
|
||||
self.assertFalse(rules)
|
||||
|
||||
def testUserContentCert(self):
|
||||
from Site import Site
|
||||
from cStringIO import StringIO
|
||||
import json
|
||||
# user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C"
|
||||
user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A"
|
||||
# cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet"
|
||||
cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA"
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
# user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json")
|
||||
# site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"] = user_content # Add to content manager
|
||||
# Check if the user file is loaded
|
||||
self.assertTrue("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents)
|
||||
user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"]
|
||||
cert_content = site.content_manager.contents["data/users/content.json"]
|
||||
# Override cert signer
|
||||
cert_content["user_contents"]["cert_signers"]["zeroid.bit"] = ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]
|
||||
|
||||
# Valid cert providers
|
||||
rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)
|
||||
self.assertEqual(rules["cert_signers"], {"zeroid.bit": ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]})
|
||||
|
||||
# Add cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"].split("@")[0]), cert_priv
|
||||
)
|
||||
|
||||
# Verify cert
|
||||
self.assertTrue(site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content))
|
||||
self.assertFalse(site.content_manager.verifyCert("data/users/badaddress/content.json", user_content))
|
||||
|
||||
# Sign user content
|
||||
signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False)
|
||||
|
||||
# Test user cert
|
||||
self.assertTrue(site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
))
|
||||
|
||||
# Test banned user
|
||||
site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][user_content["cert_user_id"]] = False
|
||||
self.assertFalse(site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
))
|
||||
|
||||
# Test invalid cert
|
||||
user_content["cert_sign"] = CryptBitcoin.sign(
|
||||
"badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv
|
||||
)
|
||||
signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False)
|
||||
self.assertFalse(site.content_manager.verifyFile(
|
||||
"data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json",
|
||||
StringIO(json.dumps(signed_content)), ignore_same=False
|
||||
))
|
||||
|
||||
def testClone(self):
|
||||
from Site import Site
|
||||
import shutil
|
||||
|
||||
site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") # Privatekey: 5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv
|
||||
self.assertEqual(site.storage.directory, "src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
|
||||
# Remove old files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
self.assertFalse(os.path.isfile("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL/content.json"))
|
||||
|
||||
# Clone 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT to 15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc
|
||||
new_site = site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL", "5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee", address_index=1)
|
||||
|
||||
# Check if clone was successful
|
||||
self.assertEqual(new_site.address, "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
self.assertTrue(new_site.storage.isFile("content.json"))
|
||||
self.assertTrue(new_site.storage.isFile("index.html"))
|
||||
self.assertTrue(new_site.storage.isFile("data/users/content.json"))
|
||||
self.assertTrue(new_site.storage.isFile("data/zeroblog.db"))
|
||||
|
||||
# Test re-cloning (updating)
|
||||
|
||||
# Changes in non-data files should be overwritten
|
||||
new_site.storage.write("index.html", "this will be overwritten")
|
||||
self.assertEqual(new_site.storage.read("index.html"), "this will be overwritten")
|
||||
|
||||
# Changes in data file should be kept after re-cloning
|
||||
changed_contentjson = new_site.storage.loadJson("content.json")
|
||||
changed_contentjson["description"] = "Update Description Test"
|
||||
new_site.storage.writeJson("content.json", changed_contentjson)
|
||||
|
||||
changed_data = new_site.storage.loadJson("data/data.json")
|
||||
changed_data["title"] = "UpdateTest"
|
||||
new_site.storage.writeJson("data/data.json", changed_data)
|
||||
|
||||
# Re-clone the site
|
||||
site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
|
||||
self.assertEqual(new_site.storage.loadJson("data/data.json")["title"], "UpdateTest")
|
||||
self.assertEqual(new_site.storage.loadJson("content.json")["description"], "Update Description Test")
|
||||
self.assertNotEqual(new_site.storage.read("index.html"), "this will be overwritten")
|
||||
|
||||
# Delete created files
|
||||
if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"):
|
||||
new_site.storage.closeDb()
|
||||
shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL")
|
||||
|
||||
def testUserNewsite(self):
|
||||
from User import UserManager
|
||||
user = UserManager.user_manager.get()
|
||||
user.sites = {} # Reset user data
|
||||
self.assertEqual(user.master_address, "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc")
|
||||
self.assertEqual(
|
||||
user.getAddressAuthIndex("15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc"),
|
||||
1458664252141532163166741013621928587528255888800826689784628722366466547364755811L
|
||||
)
|
||||
|
||||
# Re-generate privatekey based on address_index
|
||||
address, address_index, site_data = user.getNewSiteData()
|
||||
self.assertEqual(CryptBitcoin.hdPrivatekey(user.master_seed, address_index), site_data["privatekey"])
|
||||
|
||||
user.sites = {} # Reset user data
|
||||
|
||||
# Site address and auth address is different
|
||||
self.assertNotEqual(user.getSiteData(address)["auth_address"], address)
|
||||
# Re-generate auth_privatekey for site
|
||||
self.assertEqual(user.getSiteData(address)["auth_privatekey"], site_data["auth_privatekey"])
|
||||
|
||||
def testSslCert(self):
|
||||
from Crypt import CryptConnection
|
||||
# Remove old certs
|
||||
if os.path.isfile("%s/cert-rsa.pem" % config.data_dir):
|
||||
os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
if os.path.isfile("%s/key-rsa.pem" % config.data_dir):
|
||||
os.unlink("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
CryptConnection.manager.loadCerts()
|
||||
|
||||
self.assertIn("tls-rsa", CryptConnection.manager.crypt_supported)
|
||||
self.assertEqual(CryptConnection.manager.selectCrypt(["tls-rsa", "unknown"]), "tls-rsa")
|
||||
self.assertTrue(os.path.isfile("%s/cert-rsa.pem" % config.data_dir))
|
||||
self.assertTrue(os.path.isfile("%s/key-rsa.pem" % config.data_dir))
|
||||
|
||||
# Remove created files
|
||||
os.unlink("%s/cert-rsa.pem" % config.data_dir)
|
||||
os.unlink("%s/key-rsa.pem" % config.data_dir)
|
||||
|
||||
def testConfigParse(self):
|
||||
import Config
|
||||
config_test = Config.Config("zeronet.py".split(" "))
|
||||
config_test.parse(silent=True, parse_config=False)
|
||||
self.assertFalse(config_test.debug)
|
||||
self.assertFalse(config_test.debug_socket)
|
||||
|
||||
config_test = Config.Config("zeronet.py --debug --debug_socket --ui_password hello".split(" "))
|
||||
config_test.parse(silent=True, parse_config=False)
|
||||
self.assertTrue(config_test.debug)
|
||||
self.assertTrue(config_test.debug_socket)
|
||||
|
||||
args = "zeronet.py --unknown_arg --debug --debug_socket --ui_restrict 127.0.0.1 1.2.3.4 "
|
||||
args += "--another_unknown argument --use_openssl False siteSign address privatekey --inner_path users/content.json"
|
||||
config_test = Config.Config(args.split(" "))
|
||||
config_test.parse(silent=True, parse_config=False)
|
||||
self.assertTrue(config_test.debug)
|
||||
self.assertIn("1.2.3.4", config_test.ui_restrict)
|
||||
self.assertFalse(config_test.use_openssl)
|
||||
self.assertEqual(config_test.inner_path, "users/content.json")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import logging
|
||||
logging.getLogger().setLevel(level=logging.CRITICAL)
|
||||
unittest.main(verbosity=2)
|
||||
# unittest.main(verbosity=2, defaultTest="TestCase.testConfigParse")
|
|
@ -298,7 +298,7 @@ class UiRequest(object):
|
|||
# Check wrapper nonce
|
||||
content_type = self.getContentType(path)
|
||||
if "htm" in content_type: # Valid nonce must present to render html files
|
||||
wrapper_nonce = self.get["wrapper_nonce"]
|
||||
wrapper_nonce = self.get.get("wrapper_nonce")
|
||||
if wrapper_nonce not in self.server.wrapper_nonces:
|
||||
return self.error403("Wrapper nonce error. Please reload the page.")
|
||||
self.server.wrapper_nonces.remove(self.get["wrapper_nonce"])
|
||||
|
|
Loading…
Reference in a new issue