From 6840b80c7bf47b85c328a2ff887ce90aca6dd81f Mon Sep 17 00:00:00 2001 From: str4d Date: Sat, 30 Jul 2016 15:50:07 +1200 Subject: [PATCH] Implement I2P connection manager Requires i2p.socket --- src/Config.py | 3 + src/I2P/I2PManager.py | 142 ++++++++++++++++++++++++++++++++++++++++++ src/I2P/__init__.py | 1 + src/Test/TestI2P.py | 36 +++++++++++ src/Test/conftest.py | 14 +++++ 5 files changed, 196 insertions(+) create mode 100644 src/I2P/I2PManager.py create mode 100644 src/I2P/__init__.py create mode 100644 src/Test/TestI2P.py diff --git a/src/Config.py b/src/Config.py index 1d4e2d7f..6f165e77 100644 --- a/src/Config.py +++ b/src/Config.py @@ -171,6 +171,9 @@ class Config(object): self.parser.add_argument('--tor_controller', help='Tor controller address', metavar='ip:port', default='127.0.0.1:9051') self.parser.add_argument('--tor_proxy', help='Tor proxy address', metavar='ip:port', default='127.0.0.1:9050') + self.parser.add_argument('--i2p', help='enable: Use only for I2P peers, always: Use I2P for every connection', choices=["disable", "enable", "always"], default='enable') + self.parser.add_argument('--i2p_sam', help='I2P SAM API address', metavar='ip:port', default='127.0.0.1:7656') + self.parser.add_argument('--version', action='version', version='ZeroNet %s r%s' % (self.version, self.rev)) return self.parser diff --git a/src/I2P/I2PManager.py b/src/I2P/I2PManager.py new file mode 100644 index 00000000..0c4a0374 --- /dev/null +++ b/src/I2P/I2PManager.py @@ -0,0 +1,142 @@ +import logging + +from gevent.coros import RLock +from gevent.server import StreamServer +from gevent.pool import Pool +from i2p import socket +from i2p.datatypes import Destination + +from Config import config +from Site import SiteManager +from Debug import Debug + + +class I2PManager: + def __init__(self, fileserver_handler=None): + self.dest_conns = {} # Destination: SAM connection + self.dest_servs = {} # Destination: StreamServer + self.site_dests = {} # Site address: Destination + self.log = logging.getLogger("I2PManager") + self.start_dests = None + self.lock = RLock() + + if config.i2p == "disable": + self.enabled = False + self.start_dests = False + self.status = "Disabled" + else: + self.enabled = True + self.status = "Waiting" + + if fileserver_handler: + self.fileserver_handler = fileserver_handler + else: + self.fileserver_handler = lambda self, sock, addr: None + + self.sam_ip, self.sam_port = config.i2p_sam.split(":") + self.sam_port = int(self.sam_port) + + # Test SAM port + if config.i2p != "disable": + try: + assert self.connect(), "No connection" + self.log.debug("I2P SAM port %s check ok" % config.i2p_sam) + except Exception, err: + self.log.debug("I2P SAM port %s check error: %s" % (config.i2p_sam, err)) + self.enabled = False + + def connect(self): + if not self.enabled: + return False + self.site_dests = {} + self.dest_conns = {} + self.dest_servs = {} + + self.log.debug("Connecting to %s:%s" % (self.sam_ip, self.sam_port)) + with self.lock: + try: + socket.checkAPIConnection((self.sam_ip, self.sam_port)) + self.status = u"Connected" + return True + except Exception, err: + self.status = u"Error (%s)" % err + self.log.error("I2P SAM connect error: %s" % Debug.formatException(err)) + self.enabled = False + return False + + def disconnect(self): + for server in self.dest_servs: + server.stop() + self.dest_conns = {} + self.dest_servs = {} + + def startDests(self): + if self.enabled: + self.log.debug("Start Destinations") + self.start_dests = True + + def addDest(self, site_address=None): + sock = socket.socket(socket.AF_I2P, socket.SOCK_STREAM, + samaddr=(self.sam_ip, self.sam_port)) + try: + sock.setblocking(0) + sock.bind(None, site_address) # Transient Destination, tied to site address + sock.listen() + server = StreamServer( + sock, self.fileserver_handler, spawn=Pool(1000) + ) + server.start() + dest = sock.getsockname() + self.dest_conns[dest] = sock + self.dest_servs[dest] = server + self.status = u"OK (%s Destinations running)" % len(self.dest_conns) + SiteManager.peer_blacklist.append((dest.base64()+".i2p", 0)) + return dest + except Exception, err: + self.status = u"SESSION CREATE error (%s)" % err + self.log.error("I2P SESSION CREATE error: %s" % Debug.formatException(err)) + return False + + def delDest(self, dest): + if dest in self.dest_servs: + self.dest_servs[dest].stop() + del self.dest_conns[dest] + del self.dest_servs[dest] + self.status = "OK (%s Destinations running)" % len(self.dest_conns) + return True + else: + self.status = u"Tried to delete non-existent Destination" + self.log.error("I2P error: Tried to delete non-existent") + self.disconnect() + return False + + def getDest(self, site_address): + with self.lock: + if not self.enabled: + return None + if self.start_dests: # Different Destination for every site + dest = self.site_dests.get(site_address) + else: # Same Destination for every site + dest = self.site_dests.get("global") + site_address = "global" + if not dest: + self.site_dests[site_address] = self.addDest(site_address) + dest = self.site_dests[site_address] + self.log.debug("Created new Destination for %s: %s" % (site_address, dest)) + return dest + + def getPrivateDest(self, addr): + dest = addr if isinstance(addr, Destination) else getDest(addr) + return self.dest_conns[dest].getPrivateDest() + + def createSocket(self, site_address, dest, port): + if not self.enabled: + return False + if dest.endswith(".i2p") and not dest.endswith(".b32.i2p"): + dest = Destination(raw=dest[:-4], b64=True) + self.log.debug("Creating new socket to %s:%s" % + (dest.base32() if isinstance(dest, Destination) else dest, port)) + sock = socket.socket(socket.AF_I2P, socket.SOCK_STREAM, + samaddr=(self.sam_ip, self.sam_port)) + sock.connect((dest, int(port)), site_address) + return sock diff --git a/src/I2P/__init__.py b/src/I2P/__init__.py new file mode 100644 index 00000000..2a1e8091 --- /dev/null +++ b/src/I2P/__init__.py @@ -0,0 +1 @@ +from I2PManager import I2PManager diff --git a/src/Test/TestI2P.py b/src/Test/TestI2P.py new file mode 100644 index 00000000..02eef10a --- /dev/null +++ b/src/Test/TestI2P.py @@ -0,0 +1,36 @@ +import pytest +import time + +# stats.i2p +TEST_B64 = 'Okd5sN9hFWx-sr0HH8EFaxkeIMi6PC5eGTcjM1KB7uQ0ffCUJ2nVKzcsKZFHQc7pLONjOs2LmG5H-2SheVH504EfLZnoB7vxoamhOMENnDABkIRGGoRisc5AcJXQ759LraLRdiGSR0WTHQ0O1TU0hAz7vAv3SOaDp9OwNDr9u902qFzzTKjUTG5vMTayjTkLo2kOwi6NVchDeEj9M7mjj5ySgySbD48QpzBgcqw1R27oIoHQmjgbtbmV2sBL-2Tpyh3lRe1Vip0-K0Sf4D-Zv78MzSh8ibdxNcZACmZiVODpgMj2ejWJHxAEz41RsfBpazPV0d38Mfg4wzaS95R5hBBo6SdAM4h5vcZ5ESRiheLxJbW0vBpLRd4mNvtKOrcEtyCvtvsP3FpA-6IKVswyZpHgr3wn6ndDHiVCiLAQZws4MsIUE1nkfxKpKtAnFZtPrrB8eh7QO9CkH2JBhj7bG0ED6mV5~X5iqi52UpsZ8gnjZTgyG5pOF8RcFrk86kHxAAAA' + +@pytest.mark.usefixtures("resetSettings") +@pytest.mark.usefixtures("resetTempSettings") +class TestI2P: + def testAddDest(self, i2p_manager): + # Add + dest = i2p_manager.addDest() + assert dest + assert dest in i2p_manager.dest_conns + + # Delete + assert i2p_manager.delDest(dest) + assert dest not in i2p_manager.dest_conns + + def testSignDest(self, i2p_manager): + dest = i2p_manager.addDest() + + # Sign + sign = i2p_manager.getPrivateDest(dest).sign("hello") + assert len(sign) == dest.signature_size() + + # Verify + assert dest.verify("hello", sign) + assert not dest.verify("not hello", sign) + + # Delete + i2p_manager.delDest(dest) + + def testSiteDest(self, i2p_manager): + assert i2p_manager.getDest("address1") != i2p_manager.getDest("address2") + assert i2p_manager.getDest("address1") == i2p_manager.getDest("address1") diff --git a/src/Test/conftest.py b/src/Test/conftest.py index 380b407f..12c3b1e0 100644 --- a/src/Test/conftest.py +++ b/src/Test/conftest.py @@ -37,6 +37,7 @@ config.data_dir = "src/Test/testdata" # Use test data for unittests config.debug_socket = True # Use test data for unittests config.verbose = True # Use test data for unittests config.tor = "disabled" # Don't start Tor client +config.i2p = "disable" # Don't start I2P client config.trackers = [] os.chdir(os.path.abspath(os.path.dirname(__file__) + "/../..")) # Set working dir @@ -58,6 +59,7 @@ from Connection import ConnectionServer from Crypt import CryptConnection from Ui import UiWebsocket from Tor import TorManager +from I2P import I2PManager from Content import ContentDb from util import RateLimit @@ -226,3 +228,15 @@ def tor_manager(): except Exception, err: raise pytest.skip("Test requires Tor with ControlPort: %s, %s" % (config.tor_controller, err)) return tor_manager + + +@pytest.fixture(scope="session") +def i2p_manager(): + try: + i2p_manager = I2PManager() + i2p_manager.enabled = True + assert i2p_manager.connect(), "No connection" + i2p_manager.startDests() + except Exception, err: + raise pytest.skip("Test requires I2P with SAM port: %s, %s" % (config.i2p_sam, err)) + return i2p_manager