From e00537ba57d84736a9e58623735497774cea13f5 Mon Sep 17 00:00:00 2001 From: HelloZeroNet Date: Sun, 20 Sep 2015 00:27:54 +0200 Subject: [PATCH] Rev423, Rewrite and reorganize test using pytest, New PhantomJS based browser tests, Fix html requests error without wrapper nonce, Indent json files with 1 space --- src/Config.py | 2 +- src/Site/SiteStorage.py | 2 +- src/Test/BenchmarkConnection.py | 140 ---------- src/Test/TestConfig.py | 31 +++ src/Test/TestContent.py | 68 +++++ src/Test/TestCryptBitcoin.py | 44 ++++ src/Test/TestCryptConnection.py | 27 ++ src/Test/TestDb.py | 55 ++++ src/Test/TestSite.py | 54 ++++ src/Test/TestUser.py | 23 ++ src/Test/TestUserContent.py | 118 +++++++++ src/Test/TestWeb.py | 77 ++++++ src/Test/conftest.py | 71 +++++ src/Test/pytest.ini | 4 + src/Test/test.py | 441 -------------------------------- src/Ui/UiRequest.py | 2 +- 16 files changed, 575 insertions(+), 584 deletions(-) delete mode 100644 src/Test/BenchmarkConnection.py create mode 100644 src/Test/TestConfig.py create mode 100644 src/Test/TestContent.py create mode 100644 src/Test/TestCryptBitcoin.py create mode 100644 src/Test/TestCryptConnection.py create mode 100644 src/Test/TestDb.py create mode 100644 src/Test/TestSite.py create mode 100644 src/Test/TestUser.py create mode 100644 src/Test/TestUserContent.py create mode 100644 src/Test/TestWeb.py create mode 100644 src/Test/conftest.py create mode 100644 src/Test/pytest.ini delete mode 100644 src/Test/test.py diff --git a/src/Config.py b/src/Config.py index 835e85f3..e3873ee9 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.2" - self.rev = 420 + self.rev = 423 self.argv = argv self.action = None self.createParser() diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py index 2d7f0193..23516d90 100644 --- a/src/Site/SiteStorage.py +++ b/src/Site/SiteStorage.py @@ -178,7 +178,7 @@ class SiteStorage: # Write formatted json file def writeJson(self, inner_path, data): - content = json.dumps(data, indent=2, sort_keys=True) + content = json.dumps(data, indent=1, sort_keys=True) # Make it a little more compact by removing unnecessary white space def compact_list(match): diff --git a/src/Test/BenchmarkConnection.py b/src/Test/BenchmarkConnection.py deleted file mode 100644 index 28eae013..00000000 --- a/src/Test/BenchmarkConnection.py +++ /dev/null @@ -1,140 +0,0 @@ -import time -import socket -import msgpack - - -print "Connecting..." -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(("localhost", 1234)) - - -print "1 Threaded: Send, receive 10000 ping request...", -s = time.time() -for i in range(10000): - sock.sendall(msgpack.packb({"cmd": "Ping"})) - req = sock.recv(16 * 1024) -print time.time() - s, repr(req), time.time() - s - - -print "1 Threaded: Send, receive, decode 10000 ping request...", -s = time.time() -unpacker = msgpack.Unpacker() -reqs = 0 -for i in range(10000): - sock.sendall(msgpack.packb({"cmd": "Ping"})) - unpacker.feed(sock.recv(16 * 1024)) - for req in unpacker: - reqs += 1 -print "Found:", req, "x", reqs, time.time() - s - - -print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...", -s = time.time() -unpacker = msgpack.Unpacker() -reqs = 0 -for i in range(1000): - sock.sendall(msgpack.packb({"cmd": "Ping"})) - unpacker.feed(sock.recv(16 * 1024)) - for req in unpacker: - reqs += 1 - sock.close() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("localhost", 1234)) -print "Found:", req, "x", reqs, time.time() - s - - -print "1 Threaded: Request, receive, decode 10000 x 10k data request...", -s = time.time() -unpacker = msgpack.Unpacker() -reqs = 0 -for i in range(10000): - sock.sendall(msgpack.packb({"cmd": "Bigdata"})) - - """buff = StringIO() - data = sock.recv(16*1024) - buff.write(data) - if not data: - break - while not data.endswith("\n"): - data = sock.recv(16*1024) - if not data: break - buff.write(data) - req = msgpack.unpackb(buff.getvalue().strip("\n")) - reqs += 1""" - - req_found = False - while not req_found: - buff = sock.recv(16 * 1024) - unpacker.feed(buff) - for req in unpacker: - reqs += 1 - req_found = True - break # Only process one request -print "Found:", len(req["res"]), "x", reqs, time.time() - s - - -print "10 Threaded: Request, receive, decode 10000 x 10k data request...", -import gevent -s = time.time() -reqs = 0 -req = None - - -def requester(): - global reqs, req - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("localhost", 1234)) - unpacker = msgpack.Unpacker() - for i in range(1000): - sock.sendall(msgpack.packb({"cmd": "Bigdata"})) - - req_found = False - while not req_found: - buff = sock.recv(16 * 1024) - unpacker.feed(buff) - for req in unpacker: - reqs += 1 - req_found = True - break # Only process one request - -threads = [] -for i in range(10): - threads.append(gevent.spawn(requester)) -gevent.joinall(threads) -print "Found:", len(req["res"]), "x", reqs, time.time() - s - - -print "1 Threaded: ZeroMQ Send, receive 1000 ping request...", -s = time.time() -import zmq.green as zmq -c = zmq.Context() -zmq_sock = c.socket(zmq.REQ) -zmq_sock.connect('tcp://127.0.0.1:1234') -for i in range(1000): - zmq_sock.send(msgpack.packb({"cmd": "Ping"})) - req = zmq_sock.recv(16 * 1024) -print "Found:", req, time.time() - s - - -print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...", -s = time.time() -import zmq.green as zmq -c = zmq.Context() -zmq_sock = c.socket(zmq.REQ) -zmq_sock.connect('tcp://127.0.0.1:1234') -for i in range(1000): - zmq_sock.send(msgpack.packb({"cmd": "Bigdata"})) - req = msgpack.unpackb(zmq_sock.recv(1024 * 1024)) -print "Found:", len(req["res"]), time.time() - s - - -print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...", -s = time.time() -import zmq.green as zmq -c = zmq.Context() -zmq_sock = c.socket(zmq.REQ) -zmq_sock.connect('tcp://127.0.0.1:1233') -for i in range(1000): - zmq_sock.send(msgpack.packb({"cmd": "Bigdata"})) - req = msgpack.unpackb(zmq_sock.recv(1024 * 1024)) -print "Found:", len(req["res"]), time.time() - s diff --git a/src/Test/TestConfig.py b/src/Test/TestConfig.py new file mode 100644 index 00000000..d33f0161 --- /dev/null +++ b/src/Test/TestConfig.py @@ -0,0 +1,31 @@ +import pytest + +import Config + + +@pytest.mark.usefixtures("resetSettings") +class TestUser: + def testParse(self): + # Defaults + config_test = Config.Config("zeronet.py".split(" ")) + config_test.parse(silent=True, parse_config=False) + assert not config_test.debug + assert not config_test.debug_socket + + # Test parse command line with unknown parameters (ui_password) + config_test = Config.Config("zeronet.py --debug --debug_socket --ui_password hello".split(" ")) + config_test.parse(silent=True, parse_config=False) + assert config_test.debug + assert config_test.debug_socket + with pytest.raises(AttributeError): + config_test.ui_password + + # More complex test + args = "zeronet.py --unknown_arg --debug --debug_socket --ui_restrict 127.0.0.1 1.2.3.4 " + args += "--another_unknown argument --use_openssl False siteSign address privatekey --inner_path users/content.json" + config_test = Config.Config(args.split(" ")) + config_test.parse(silent=True, parse_config=False) + assert config_test.debug + assert "1.2.3.4" in config_test.ui_restrict + assert not config_test.use_openssl + assert config_test.inner_path == "users/content.json" diff --git a/src/Test/TestContent.py b/src/Test/TestContent.py new file mode 100644 index 00000000..5067b414 --- /dev/null +++ b/src/Test/TestContent.py @@ -0,0 +1,68 @@ +import json +from cStringIO import StringIO + +import pytest + + +@pytest.mark.usefixtures("resetSettings") +class TestContent: + def testIncludes(self, site): + # Rules defined in parent content.json + rules = site.content_manager.getRules("data/test_include/content.json") + + assert rules["signers"] == ["15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo"] # Valid signer + assert rules["user_name"] == "test" # Extra data + assert rules["max_size"] == 20000 # Max size of files + assert not rules["includes_allowed"] # Don't allow more includes + assert rules["files_allowed"] == "data.json" # Allowed file pattern + + # Valid signers for "data/test_include/content.json" + valid_signers = site.content_manager.getValidSigners("data/test_include/content.json") + assert "15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo" in valid_signers # Extra valid signer defined in parent content.json + assert "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT" in valid_signers # The site itself + assert len(valid_signers) == 2 # No more + + # Valid signers for "data/users/content.json" + valid_signers = site.content_manager.getValidSigners("data/users/content.json") + assert "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f" in valid_signers # Extra valid signer defined in parent content.json + assert "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT" in valid_signers # The site itself + assert len(valid_signers) == 2 + + # Valid signers for root content.json + assert site.content_manager.getValidSigners("content.json") == ["1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"] + + def testSizelimit(self, site): + # Data validation + data_dict = { + "files": { + "data.json": { + "sha512": "369d4e780cc80504285f13774ca327fe725eed2d813aad229e62356b07365906", + "size": 505 + } + }, + "modified": 1431451896.656619, + "signs": { + "15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo": + "G2QC+ZIozPQQ/XiOEOMzfekOP8ipi+rKaTy/R/3MnDf98mLIhSSA8927FW6D/ZyP7HHuII2y9d0zbAk+rr8ksQM=" + } + } + data = StringIO(json.dumps(data_dict)) + + # Normal data + assert site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False) + + # Too large + data_dict["files"]["data.json"]["size"] = 200000 # Emulate 2MB sized data.json + data = StringIO(json.dumps(data_dict)) + assert not site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False) + data_dict["files"]["data.json"]["size"] = 505 # Reset + + # Not allowed file + data_dict["files"]["notallowed.exe"] = data_dict["files"]["data.json"] + data = StringIO(json.dumps(data_dict)) + assert not site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False) + del data_dict["files"]["notallowed.exe"] # Reset + + # Should work again + data = StringIO(json.dumps(data_dict)) + assert site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False) diff --git a/src/Test/TestCryptBitcoin.py b/src/Test/TestCryptBitcoin.py new file mode 100644 index 00000000..dad17d0d --- /dev/null +++ b/src/Test/TestCryptBitcoin.py @@ -0,0 +1,44 @@ +from Crypt import CryptBitcoin + + +class TestCryptBitcoin: + def testSignOld(self): + privatekey = "23DKQpDz7bXM7w5KN5Wnmz7bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf" + privatekey_bad = "23DKQpDz7bXM7w5KN5Wnmz6bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf" + + # Get address by privatekey + address = CryptBitcoin.privatekeyToAddress(privatekey) + assert address == "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez" + + address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad) + assert not address_bad == "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez" + + # Text signing + sign = CryptBitcoin.signOld("hello", privatekey) + assert CryptBitcoin.verify("hello", address, sign) # Original text + assert not CryptBitcoin.verify("not hello", address, sign) # Different text + + # Signed by bad privatekey + sign_bad = CryptBitcoin.signOld("hello", privatekey_bad) + assert not CryptBitcoin.verify("hello", address, sign_bad) + + def testSign(self): + privatekey = "5K9S6dVpufGnroRgFrT6wsKiz2mJRYsC73eWDmajaHserAp3F1C" + privatekey_bad = "5Jbm9rrusXyApAoM8YoM4Rja337zMMoBUMRJ1uijiguU2aZRnwC" + + # Get address by privatekey + address = CryptBitcoin.privatekeyToAddress(privatekey) + assert address == "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz" + + address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad) + assert address_bad != "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz" + + # Text signing + sign = CryptBitcoin.sign("hello", privatekey) + + assert CryptBitcoin.verify("hello", address, sign) + assert not CryptBitcoin.verify("not hello", address, sign) + + # Signed by bad privatekey + sign_bad = CryptBitcoin.sign("hello", privatekey_bad) + assert not CryptBitcoin.verify("hello", address, sign_bad) diff --git a/src/Test/TestCryptConnection.py b/src/Test/TestCryptConnection.py new file mode 100644 index 00000000..4027f047 --- /dev/null +++ b/src/Test/TestCryptConnection.py @@ -0,0 +1,27 @@ +import os + +from Config import config +from Crypt import CryptConnection + + +class TestCryptConnection: + def testSslCert(self): + # Remove old certs + if os.path.isfile("%s/cert-rsa.pem" % config.data_dir): + os.unlink("%s/cert-rsa.pem" % config.data_dir) + if os.path.isfile("%s/key-rsa.pem" % config.data_dir): + os.unlink("%s/key-rsa.pem" % config.data_dir) + + # Generate certs + CryptConnection.manager.loadCerts() + + assert "tls-rsa" in CryptConnection.manager.crypt_supported + assert CryptConnection.manager.selectCrypt(["tls-rsa", "unknown"]) == "tls-rsa" # It should choose the known crypt + + # Check openssl cert generation + assert os.path.isfile("%s/cert-rsa.pem" % config.data_dir) + assert os.path.isfile("%s/key-rsa.pem" % config.data_dir) + + # Remove created files + os.unlink("%s/cert-rsa.pem" % config.data_dir) + os.unlink("%s/key-rsa.pem" % config.data_dir) diff --git a/src/Test/TestDb.py b/src/Test/TestDb.py new file mode 100644 index 00000000..00856619 --- /dev/null +++ b/src/Test/TestDb.py @@ -0,0 +1,55 @@ +import os + +from Config import config +from Db import Db + + +class TestDb: + def testCheckTables(self): + db_path = "%s/zeronet.db" % config.data_dir + schema = { + "db_name": "TestDb", + "db_file": "%s/zeronet.db" % config.data_dir, + "map": { + "data.json": { + "to_table": { + "test": "test" + } + } + }, + "tables": { + "test": { + "cols": [ + ["test_id", "INTEGER"], + ["title", "TEXT"], + ], + "indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"], + "schema_changed": 1426195822 + } + } + } + + if os.path.isfile(db_path): + os.unlink(db_path) + db = Db(schema, db_path) + db.checkTables() + db.close() + + # Verify tables + assert os.path.isfile(db_path) + db = Db(schema, db_path) + + tables = [row["name"] for row in db.execute("SELECT name FROM sqlite_master WHERE type='table'")] + assert "keyvalue" in tables # To store simple key -> value + assert "json" in tables # Json file path registry + assert "test" in tables # The table defined in dbschema.json + + # Verify test table + cols = [col["name"] for col in db.execute("PRAGMA table_info(test)")] + assert "test_id" in cols + assert "title" in cols + + db.close() + + # Cleanup + os.unlink(db_path) diff --git a/src/Test/TestSite.py b/src/Test/TestSite.py new file mode 100644 index 00000000..06e482d2 --- /dev/null +++ b/src/Test/TestSite.py @@ -0,0 +1,54 @@ +import shutil +import os + +import pytest + + +@pytest.mark.usefixtures("resetSettings") +class TestSite: + def testClone(self, site): + assert site.storage.directory == "src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT" + + # Remove old files + if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"): + shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") + assert not os.path.isfile("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL/content.json") + + # Clone 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT to 15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc + new_site = site.clone( + "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL", "5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee", address_index=1 + ) + + # Check if clone was successful + assert new_site.address == "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL" + assert new_site.storage.isFile("content.json") + assert new_site.storage.isFile("index.html") + assert new_site.storage.isFile("data/users/content.json") + assert new_site.storage.isFile("data/zeroblog.db") + + # Test re-cloning (updating) + + # Changes in non-data files should be overwritten + new_site.storage.write("index.html", "this will be overwritten") + assert new_site.storage.read("index.html"), "this will be overwritten" + + # Changes in data file should be kept after re-cloning + changed_contentjson = new_site.storage.loadJson("content.json") + changed_contentjson["description"] = "Update Description Test" + new_site.storage.writeJson("content.json", changed_contentjson) + + changed_data = new_site.storage.loadJson("data/data.json") + changed_data["title"] = "UpdateTest" + new_site.storage.writeJson("data/data.json", changed_data) + + # Re-clone the site + site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") + + assert new_site.storage.loadJson("data/data.json")["title"] == "UpdateTest" + assert new_site.storage.loadJson("content.json")["description"] == "Update Description Test" + assert new_site.storage.read("index.html") != "this will be overwritten" + + # Delete created files + if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"): + new_site.storage.closeDb() + shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") diff --git a/src/Test/TestUser.py b/src/Test/TestUser.py new file mode 100644 index 00000000..f3cfe3bf --- /dev/null +++ b/src/Test/TestUser.py @@ -0,0 +1,23 @@ +import pytest + +from Crypt import CryptBitcoin + + +@pytest.mark.usefixtures("resetSettings") +class TestUser: + def testNewsite(self, user): + user.sites = {} # Reset user data + assert user.master_address == "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc" + address_index = 1458664252141532163166741013621928587528255888800826689784628722366466547364755811L + assert user.getAddressAuthIndex("15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc") == address_index + + # Re-generate privatekey based on address_index + address, address_index, site_data = user.getNewSiteData() + assert CryptBitcoin.hdPrivatekey(user.master_seed, address_index) == site_data["privatekey"] + + user.sites = {} # Reset user data + + # Site address and auth address is different + assert user.getSiteData(address)["auth_address"] != address + # Re-generate auth_privatekey for site + assert user.getSiteData(address)["auth_privatekey"] == site_data["auth_privatekey"] diff --git a/src/Test/TestUserContent.py b/src/Test/TestUserContent.py new file mode 100644 index 00000000..238a65e1 --- /dev/null +++ b/src/Test/TestUserContent.py @@ -0,0 +1,118 @@ +import json +from cStringIO import StringIO + +import pytest + +from Crypt import CryptBitcoin + + +@pytest.mark.usefixtures("resetSettings") +class TestUserContent: + def testSigners(self, site): + # File info for not existing user file + file_info = site.content_manager.getFileInfo("data/users/notexist/data.json") + assert file_info["content_inner_path"] == "data/users/notexist/content.json" + valid_signers = site.content_manager.getValidSigners("data/users/notexist/content.json") + assert valid_signers == ["notexist", "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"] + + # File info for exsitsing user file + valid_signers = site.content_manager.getValidSigners("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json") + assert '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT' in valid_signers # The site address + assert '14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet' in valid_signers # Admin user definied in data/users/content.json + assert '1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C' in valid_signers # The user itself + assert len(valid_signers) == 3 # No more valid signers + + def testRules(self, site): + # We going to manipulate it this test rules based on data/users/content.json + user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json") + + # Known user + user_content["cert_auth_type"] = "web" + user_content["cert_user_id"] = "nofish@zeroid.bit" + rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) + assert rules["max_size"] == 100000 + + # Unknown user + user_content["cert_auth_type"] = "web" + user_content["cert_user_id"] = "noone@zeroid.bit" + rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) + assert rules["max_size"] == 10000 + + # User with more size limit based on auth type + user_content["cert_auth_type"] = "bitmsg" + user_content["cert_user_id"] = "noone@zeroid.bit" + rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) + assert rules["max_size"] == 15000 + + # Banned user + user_content["cert_auth_type"] = "web" + user_content["cert_user_id"] = "bad@zeroid.bit" + rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) + assert rules is False + + def testCert(self, site): + # user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C" + user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A" + # cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet" + cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA" + + # Check if the user file is loaded + assert "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents + user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"] + rules_content = site.content_manager.contents["data/users/content.json"] + + # Override valid cert signers for the test + rules_content["user_contents"]["cert_signers"]["zeroid.bit"] = [ + "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", + "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz" + ] + + # Check valid cert signers + rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) + assert rules["cert_signers"] == {"zeroid.bit": [ + "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", + "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz" + ]} + + # Sign a valid cert + user_content["cert_sign"] = CryptBitcoin.sign("1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % ( + user_content["cert_auth_type"], + user_content["cert_user_id"].split("@")[0] + ), cert_priv) + + # Verify cert + assert site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) + + # Verify if the cert is valid for other address + assert not site.content_manager.verifyCert("data/users/badaddress/content.json", user_content) + + # Sign user content + signed_content = site.content_manager.sign( + "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False + ) + + # Test user cert + assert site.content_manager.verifyFile( + "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", + StringIO(json.dumps(signed_content)), ignore_same=False + ) + + # Test banned user + cert_user_id = user_content["cert_user_id"] # My username + site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][cert_user_id] = False + assert not site.content_manager.verifyFile( + "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", + StringIO(json.dumps(signed_content)), ignore_same=False + ) + + # Test invalid cert + user_content["cert_sign"] = CryptBitcoin.sign( + "badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv + ) + signed_content = site.content_manager.sign( + "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False + ) + assert not site.content_manager.verifyFile( + "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", + StringIO(json.dumps(signed_content)), ignore_same=False + ) diff --git a/src/Test/TestWeb.py b/src/Test/TestWeb.py new file mode 100644 index 00000000..4960ca33 --- /dev/null +++ b/src/Test/TestWeb.py @@ -0,0 +1,77 @@ +import urllib + +import pytest + +try: + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support.expected_conditions import staleness_of + from selenium.common.exceptions import NoSuchElementException +except: + pass + + +class WaitForPageLoad(object): + def __init__(self, browser): + self.browser = browser + + def __enter__(self): + self.old_page = self.browser.find_element_by_tag_name('html') + + def __exit__(self, *args): + WebDriverWait(self.browser, 20).until(staleness_of(self.old_page)) + + +@pytest.mark.usefixtures("resetSettings") +class TestWeb: + def testFileSecurity(self, site_url): + assert "Forbidden" in urllib.urlopen("%s/media/./sites.json" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/media/../config.py" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json" % site_url).read() + assert "Forbidden" in urllib.urlopen("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py" % site_url).read() + + def testHomepage(self, browser, site_url): + browser.get("%s" % site_url) + assert browser.title == "ZeroHello - ZeroNet" + + def testLinkSecurity(self, browser, site_url): + browser.get("%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/test/security.html" % site_url) + assert browser.title == "ZeroHello - ZeroNet" + assert browser.current_url == "%s/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/test/security.html" % site_url + + # Switch to inner frame + browser.switch_to.frame(browser.find_element_by_id("inner-iframe")) + assert "wrapper_nonce" in browser.current_url + browser.switch_to.default_content() + + # Clicking on links without target + browser.switch_to.frame(browser.find_element_by_id("inner-iframe")) + with WaitForPageLoad(browser): + browser.find_element_by_id("link_to_current").click() + assert "wrapper_nonce" not in browser.current_url # The browser object back to default content + assert "Forbidden" not in browser.page_source + # Check if we have frame inside frame + browser.switch_to.frame(browser.find_element_by_id("inner-iframe")) + with pytest.raises(NoSuchElementException): + assert not browser.find_element_by_id("inner-iframe") + browser.switch_to.default_content() + + # Clicking on link with target=_top + browser.switch_to.frame(browser.find_element_by_id("inner-iframe")) + with WaitForPageLoad(browser): + browser.find_element_by_id("link_to_top").click() + assert "wrapper_nonce" not in browser.current_url # The browser object back to default content + assert "Forbidden" not in browser.page_source + browser.switch_to.default_content() + + # Try to escape from inner_frame + browser.switch_to.frame(browser.find_element_by_id("inner-iframe")) + assert "wrapper_nonce" in browser.current_url # Make sure we are inside of the inner-iframe + with WaitForPageLoad(browser): + browser.execute_script("window.top.location = window.location") + assert "wrapper_nonce" in browser.current_url # We try to use nonce-ed html without iframe + assert "Forbidden" in browser.page_source # Only allow to use nonce once-time + browser.switch_to.default_content() diff --git a/src/Test/conftest.py b/src/Test/conftest.py new file mode 100644 index 00000000..5c222fef --- /dev/null +++ b/src/Test/conftest.py @@ -0,0 +1,71 @@ +import os +import sys +import urllib + +import pytest + +# Config +if sys.platform == "win32": + PHANTOMJS_PATH = "tools/phantomjs/bin/phantomjs.exe" +else: + PHANTOMJS_PATH = "phantomjs" +SITE_URL = "http://127.0.0.1:43110" + +# Imports relative to src dir +sys.path.append( + os.path.abspath(os.path.dirname(__file__)+"/..") +) +from Config import config +config.argv = ["none"] # Dont pass any argv to config parser +config.parse() +config.data_dir = "src/Test/testdata" # Use test data for unittests + +from Site import Site +from User import UserManager + +@pytest.fixture(scope="session") +def resetSettings(request): + os.chdir(os.path.abspath(os.path.dirname(__file__)+"/../..")) # Set working dir + open("%s/sites.json" % config.data_dir, "w").write("{}") + open("%s/users.json" % config.data_dir, "w").write(""" + { + "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": { + "certs": {}, + "master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a", + "sites": {} + } + } + """) + def cleanup(): + os.unlink("%s/sites.json" % config.data_dir) + os.unlink("%s/users.json" % config.data_dir) + request.addfinalizer(cleanup) + +@pytest.fixture(scope="session") +def site(): + site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") + return site + + +@pytest.fixture(scope="session") +def user(): + user = UserManager.user_manager.get() + return user + +@pytest.fixture(scope="session") +def browser(): + try: + from selenium import webdriver + browser = webdriver.PhantomJS(executable_path=PHANTOMJS_PATH, service_log_path=os.path.devnull) + browser.set_window_size(1400, 1000) + except Exception, err: + raise pytest.skip("Test requires selenium + phantomjs: %s" % err) + return browser + +@pytest.fixture(scope="session") +def site_url(): + try: + urllib.urlopen(SITE_URL).read() + except Exception, err: + raise pytest.skip("Test requires zeronet client running: %s" % err) + return SITE_URL diff --git a/src/Test/pytest.ini b/src/Test/pytest.ini new file mode 100644 index 00000000..6e9e4712 --- /dev/null +++ b/src/Test/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +python_files = Test*.py +addopts = -rsxX -v + diff --git a/src/Test/test.py b/src/Test/test.py deleted file mode 100644 index 383f6702..00000000 --- a/src/Test/test.py +++ /dev/null @@ -1,441 +0,0 @@ -import sys -import os -import unittest -import urllib -import time -sys.path.append(os.path.abspath("src")) # Imports relative to src dir - -from Config import config -config.parse() -config.data_dir = "src/Test/testdata" # Use test data for unittests - -from Crypt import CryptBitcoin - - -class TestCase(unittest.TestCase): - def setUp(self): - if not os.path.isfile("%s/sites.json" % config.data_dir): - open("%s/sites.json" % config.data_dir, "w").write("{}") - if not os.path.isfile("%s/users.json" % config.data_dir): - open("%s/users.json" % config.data_dir, "w").write(""" - { - "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": { - "certs": {}, - "master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a", - "sites": {} - } - } - """) - - def tearDown(self): - if os.path.isfile("src/Test/testdata/users.json"): - os.unlink("src/Test/testdata/users.json") - if os.path.isfile("src/Test/testdata/sites.json"): - os.unlink("src/Test/testdata/sites.json") - - def testMediaRoute(self): - try: - urllib.urlopen("http://127.0.0.1:43110").read() - except Exception, err: - raise unittest.SkipTest(err) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/./sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/../config.py").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/media/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/..//sites.json").read()) - self.assertIn("Forbidden", urllib.urlopen("http://127.0.0.1:43110/1EU1tbG9oC1A8jz2ouVwGZyQ5asrNsE4Vr/../../zeronet.py").read()) - - def testBitcoinSignOld(self): - s = time.time() - privatekey = "23DKQpDz7bXM7w5KN5Wnmz7bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf" - privatekey_bad = "23DKQpDz7bXM7w5KN5Wnmz6bwRNqNHcdQjb2WwrdB1QtTf5gM3pFdf" - - address = CryptBitcoin.privatekeyToAddress(privatekey) - self.assertEqual(address, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez") - - address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad) - self.assertNotEqual(address_bad, "12vTsjscg4hYPewUL2onma5pgQmWPMs3ez") - - sign = CryptBitcoin.signOld("hello", privatekey) - - self.assertTrue(CryptBitcoin.verify("hello", address, sign)) - self.assertFalse(CryptBitcoin.verify("not hello", address, sign)) - - sign_bad = CryptBitcoin.signOld("hello", privatekey_bad) - self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad)) - - print "Taken: %.3fs, " % (time.time() - s), - - def testBitcoinSign(self): - s = time.time() - privatekey = "5K9S6dVpufGnroRgFrT6wsKiz2mJRYsC73eWDmajaHserAp3F1C" - privatekey_bad = "5Jbm9rrusXyApAoM8YoM4Rja337zMMoBUMRJ1uijiguU2aZRnwC" - - address = CryptBitcoin.privatekeyToAddress(privatekey) - self.assertEqual(address, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz") - - address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad) - self.assertNotEqual(address_bad, "1MpDMxFeDUkiHohxx9tbGLeEGEuR4ZNsJz") - - sign = CryptBitcoin.sign("hello", privatekey) - - self.assertTrue(CryptBitcoin.verify("hello", address, sign)) - self.assertFalse(CryptBitcoin.verify("not hello", address, sign)) - - sign_bad = CryptBitcoin.sign("hello", privatekey_bad) - self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad)) - - print "Taken: %.3fs, " % (time.time() - s), - - def testBitcoinSignCompressed(self): - raise unittest.SkipTest("Not supported yet") - s = time.time() - privatekey = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKbZAgWdpFo1UETZSKdgHaNN2J" - privatekey_bad = "Kwg4YXhL5gsNwarFWtzTKuUiwAhKsZAgWdpFo1UETZSKdgHaNN2J" - - address = CryptBitcoin.privatekeyToAddress(privatekey) - self.assertEqual(address, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f") - - address_bad = CryptBitcoin.privatekeyToAddress(privatekey_bad) - self.assertNotEqual(address_bad, "1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f") - - sign = CryptBitcoin.sign("hello", privatekey) - - self.assertTrue(CryptBitcoin.verify("hello", address, sign)) - self.assertFalse(CryptBitcoin.verify("not hello", address, sign)) - - sign_bad = CryptBitcoin.sign("hello", privatekey_bad) - self.assertFalse(CryptBitcoin.verify("hello", address, sign_bad)) - - print "Taken: %.3fs, " % (time.time() - s), - - def testTrackers(self): - raise unittest.SkipTest("Notyet") - from Site import SiteManager - from lib.subtl.subtl import UdpTrackerClient - import hashlib - - ok = 0 - for protocol, ip, port in SiteManager.TRACKERS: - address = "test" - if protocol == "udp": - tracker = UdpTrackerClient(ip, port) - peers = None - try: - tracker.connect() - tracker.poll_once() - tracker.announce(info_hash=hashlib.sha1(address).hexdigest(), num_want=5) - back = tracker.poll_once() - peers = back["response"]["peers"] - except Exception, err: - peers = None - print "Tracker error: %s://%s:%s %s" % (protocol, ip, port, err) - if peers is not None: - ok += 1 - - self.assertEqual(ok, len(SiteManager.TRACKERS)) - - def testDb(self): - from Db import Db - for db_path in [os.path.abspath("%s/test/zeronet.db" % config.data_dir), "%s/test/zeronet.db" % config.data_dir]: - print "Creating db using %s..." % db_path, - schema = { - "db_name": "TestDb", - "db_file": "%s/test/zeronet.db" % config.data_dir, - "map": { - "data.json": { - "to_table": { - "test": "test" - } - } - }, - "tables": { - "test": { - "cols": [ - ["test_id", "INTEGER"], - ["title", "TEXT"], - ], - "indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"], - "schema_changed": 1426195822 - } - } - } - - if os.path.isfile("%s/test/zeronet.db" % config.data_dir): - os.unlink("%s/test/zeronet.db" % config.data_dir) - db = Db(schema, "%s/test/zeronet.db" % config.data_dir) - db.checkTables() - db.close() - - # Cleanup - os.unlink("%s/test/zeronet.db" % config.data_dir) - os.rmdir("%s/test/" % config.data_dir) - - def testContentManagerIncludes(self): - from Site import Site - from cStringIO import StringIO - import json - - site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") - # Include info - rules = site.content_manager.getRules("data/test_include/content.json") - self.assertEqual(rules["signers"], ['15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo']) - self.assertEqual(rules["user_name"], 'test') - self.assertEqual(rules["max_size"], 20000) - self.assertEqual(rules["includes_allowed"], False) - self.assertEqual(rules["files_allowed"], 'data.json') - # Valid signers - self.assertEqual( - site.content_manager.getValidSigners("data/test_include/content.json"), - ['15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT'] - ) - self.assertEqual( - site.content_manager.getValidSigners("data/users/content.json"), - ['1LSxsKfC9S9TVXGGNSM3vPHjyW82jgCX5f', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT'] - ) - self.assertEqual(site.content_manager.getValidSigners("content.json"), ['1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT']) - - # Data validation - data_dict = { - "files": { - "data.json": { - "sha512": "369d4e780cc80504285f13774ca327fe725eed2d813aad229e62356b07365906", - "size": 505 - } - }, - "modified": 1431451896.656619, - "signs": { - "15ik6LeBWnACWfaika1xqGapRZ1zh3JpCo": "G2QC+ZIozPQQ/XiOEOMzfekOP8ipi+rKaTy/R/3MnDf98mLIhSSA8927FW6D/ZyP7HHuII2y9d0zbAk+rr8ksQM=" - } - } - # Normal data - data = StringIO(json.dumps(data_dict)) - self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), True) - # Too large - data_dict["files"]["data.json"]["size"] = 200000 # Emulate 2MB sized data.json - data = StringIO(json.dumps(data_dict)) - self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), False) - data_dict["files"]["data.json"]["size"] = 505 # Reset - # Not allowed file - data_dict["files"]["data.html"] = data_dict["files"]["data.json"] - data = StringIO(json.dumps(data_dict)) - self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), False) - del data_dict["files"]["data.html"] # Reset - # Should work again - data = StringIO(json.dumps(data_dict)) - self.assertEqual(site.content_manager.verifyFile("data/test_include/content.json", data, ignore_same=False), True) - - def testUserContentRules(self): - from Site import Site - - site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") - user_content = site.storage.loadJson("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json") - - # File info for not exist file - self.assertEqual(site.content_manager.getFileInfo("data/users/notexist/data.json")["content_inner_path"], "data/users/notexist/content.json") - self.assertEqual(site.content_manager.getValidSigners("data/users/notexist/data.json"), ["notexist", "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"]) - - # File info for exsitsing file - file_info = site.content_manager.getFileInfo("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/data.json") - valid_signers = site.content_manager.getValidSigners(file_info["content_inner_path"], user_content) - self.assertEqual(valid_signers, ['14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet', '1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q', '1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT']) - - # Known user - user_content["cert_auth_type"] = "web" - user_content["cert_user_id"] = "nofish@zeroid.bit" - rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content) - self.assertEqual(rules["max_size"], 100000) - - # Unknown user - user_content["cert_auth_type"] = "web" - user_content["cert_user_id"] = "noone@zeroid.bit" - rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content) - self.assertEqual(rules["max_size"], 10000) - - # User with more size limit by auth type - user_content["cert_auth_type"] = "bitmsg" - user_content["cert_user_id"] = "noone@zeroid.bit" - rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content) - self.assertEqual(rules["max_size"], 15000) - - # Banned user - user_content["cert_auth_type"] = "web" - user_content["cert_user_id"] = "bad@zeroid.bit" - rules = site.content_manager.getRules("data/users/1C5sgvWaSgfaTpV5kjBCnCiKtENNMYo69q/content.json", user_content) - self.assertFalse(rules) - - def testUserContentCert(self): - from Site import Site - from cStringIO import StringIO - import json - # user_addr = "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C" - user_priv = "5Kk7FSA63FC2ViKmKLuBxk9gQkaQ5713hKq8LmFAf4cVeXh6K6A" - # cert_addr = "14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet" - cert_priv = "5JusJDSjHaMHwUjDT3o6eQ54pA6poo8La5fAgn1wNc3iK59jxjA" - - site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") - # user_content = site.storage.loadJson("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json") - # site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"] = user_content # Add to content manager - # Check if the user file is loaded - self.assertTrue("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json" in site.content_manager.contents) - user_content = site.content_manager.contents["data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json"] - cert_content = site.content_manager.contents["data/users/content.json"] - # Override cert signer - cert_content["user_contents"]["cert_signers"]["zeroid.bit"] = ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"] - - # Valid cert providers - rules = site.content_manager.getRules("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content) - self.assertEqual(rules["cert_signers"], {"zeroid.bit": ["14wgQ4VDDZNoRMFF4yCDuTrBSHmYhL3bet", "1iD5ZQJMNXu43w1qLB8sfdHVKppVMduGz"]}) - - # Add cert - user_content["cert_sign"] = CryptBitcoin.sign( - "1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"].split("@")[0]), cert_priv - ) - - # Verify cert - self.assertTrue(site.content_manager.verifyCert("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_content)) - self.assertFalse(site.content_manager.verifyCert("data/users/badaddress/content.json", user_content)) - - # Sign user content - signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False) - - # Test user cert - self.assertTrue(site.content_manager.verifyFile( - "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", - StringIO(json.dumps(signed_content)), ignore_same=False - )) - - # Test banned user - site.content_manager.contents["data/users/content.json"]["user_contents"]["permissions"][user_content["cert_user_id"]] = False - self.assertFalse(site.content_manager.verifyFile( - "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", - StringIO(json.dumps(signed_content)), ignore_same=False - )) - - # Test invalid cert - user_content["cert_sign"] = CryptBitcoin.sign( - "badaddress#%s/%s" % (user_content["cert_auth_type"], user_content["cert_user_id"]), cert_priv - ) - signed_content = site.content_manager.sign("data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", user_priv, filewrite=False) - self.assertFalse(site.content_manager.verifyFile( - "data/users/1J6UrZMkarjVg5ax9W4qThir3BFUikbW6C/content.json", - StringIO(json.dumps(signed_content)), ignore_same=False - )) - - def testClone(self): - from Site import Site - import shutil - - site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") # Privatekey: 5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv - self.assertEqual(site.storage.directory, "src/Test/testdata/1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") - - # Remove old files - if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"): - shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") - self.assertFalse(os.path.isfile("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL/content.json")) - - # Clone 1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT to 15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc - new_site = site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL", "5JU2p5h3R7B1WrbaEdEDNZR7YHqRLGcjNcqwqVQzX2H4SuNe2ee", address_index=1) - - # Check if clone was successful - self.assertEqual(new_site.address, "159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") - self.assertTrue(new_site.storage.isFile("content.json")) - self.assertTrue(new_site.storage.isFile("index.html")) - self.assertTrue(new_site.storage.isFile("data/users/content.json")) - self.assertTrue(new_site.storage.isFile("data/zeroblog.db")) - - # Test re-cloning (updating) - - # Changes in non-data files should be overwritten - new_site.storage.write("index.html", "this will be overwritten") - self.assertEqual(new_site.storage.read("index.html"), "this will be overwritten") - - # Changes in data file should be kept after re-cloning - changed_contentjson = new_site.storage.loadJson("content.json") - changed_contentjson["description"] = "Update Description Test" - new_site.storage.writeJson("content.json", changed_contentjson) - - changed_data = new_site.storage.loadJson("data/data.json") - changed_data["title"] = "UpdateTest" - new_site.storage.writeJson("data/data.json", changed_data) - - # Re-clone the site - site.clone("159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") - - self.assertEqual(new_site.storage.loadJson("data/data.json")["title"], "UpdateTest") - self.assertEqual(new_site.storage.loadJson("content.json")["description"], "Update Description Test") - self.assertNotEqual(new_site.storage.read("index.html"), "this will be overwritten") - - # Delete created files - if os.path.isdir("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL"): - new_site.storage.closeDb() - shutil.rmtree("src/Test/testdata/159EGD5srUsMP97UpcLy8AtKQbQLK2AbbL") - - def testUserNewsite(self): - from User import UserManager - user = UserManager.user_manager.get() - user.sites = {} # Reset user data - self.assertEqual(user.master_address, "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc") - self.assertEqual( - user.getAddressAuthIndex("15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc"), - 1458664252141532163166741013621928587528255888800826689784628722366466547364755811L - ) - - # Re-generate privatekey based on address_index - address, address_index, site_data = user.getNewSiteData() - self.assertEqual(CryptBitcoin.hdPrivatekey(user.master_seed, address_index), site_data["privatekey"]) - - user.sites = {} # Reset user data - - # Site address and auth address is different - self.assertNotEqual(user.getSiteData(address)["auth_address"], address) - # Re-generate auth_privatekey for site - self.assertEqual(user.getSiteData(address)["auth_privatekey"], site_data["auth_privatekey"]) - - def testSslCert(self): - from Crypt import CryptConnection - # Remove old certs - if os.path.isfile("%s/cert-rsa.pem" % config.data_dir): - os.unlink("%s/cert-rsa.pem" % config.data_dir) - if os.path.isfile("%s/key-rsa.pem" % config.data_dir): - os.unlink("%s/key-rsa.pem" % config.data_dir) - - CryptConnection.manager.loadCerts() - - self.assertIn("tls-rsa", CryptConnection.manager.crypt_supported) - self.assertEqual(CryptConnection.manager.selectCrypt(["tls-rsa", "unknown"]), "tls-rsa") - self.assertTrue(os.path.isfile("%s/cert-rsa.pem" % config.data_dir)) - self.assertTrue(os.path.isfile("%s/key-rsa.pem" % config.data_dir)) - - # Remove created files - os.unlink("%s/cert-rsa.pem" % config.data_dir) - os.unlink("%s/key-rsa.pem" % config.data_dir) - - def testConfigParse(self): - import Config - config_test = Config.Config("zeronet.py".split(" ")) - config_test.parse(silent=True, parse_config=False) - self.assertFalse(config_test.debug) - self.assertFalse(config_test.debug_socket) - - config_test = Config.Config("zeronet.py --debug --debug_socket --ui_password hello".split(" ")) - config_test.parse(silent=True, parse_config=False) - self.assertTrue(config_test.debug) - self.assertTrue(config_test.debug_socket) - - args = "zeronet.py --unknown_arg --debug --debug_socket --ui_restrict 127.0.0.1 1.2.3.4 " - args += "--another_unknown argument --use_openssl False siteSign address privatekey --inner_path users/content.json" - config_test = Config.Config(args.split(" ")) - config_test.parse(silent=True, parse_config=False) - self.assertTrue(config_test.debug) - self.assertIn("1.2.3.4", config_test.ui_restrict) - self.assertFalse(config_test.use_openssl) - self.assertEqual(config_test.inner_path, "users/content.json") - -if __name__ == "__main__": - import logging - logging.getLogger().setLevel(level=logging.CRITICAL) - unittest.main(verbosity=2) - # unittest.main(verbosity=2, defaultTest="TestCase.testConfigParse") diff --git a/src/Ui/UiRequest.py b/src/Ui/UiRequest.py index 9e2986c4..54757cb1 100644 --- a/src/Ui/UiRequest.py +++ b/src/Ui/UiRequest.py @@ -298,7 +298,7 @@ class UiRequest(object): # Check wrapper nonce content_type = self.getContentType(path) if "htm" in content_type: # Valid nonce must present to render html files - wrapper_nonce = self.get["wrapper_nonce"] + wrapper_nonce = self.get.get("wrapper_nonce") if wrapper_nonce not in self.server.wrapper_nonces: return self.error403("Wrapper nonce error. Please reload the page.") self.server.wrapper_nonces.remove(self.get["wrapper_nonce"])