diff --git a/src/I2P/I2PManager.py b/src/I2P/I2PManager.py index 0c4a0374..511e481f 100644 --- a/src/I2P/I2PManager.py +++ b/src/I2P/I2PManager.py @@ -3,6 +3,9 @@ import logging from gevent.coros import RLock from gevent.server import StreamServer from gevent.pool import Pool +from httplib import HTTPConnection +import urllib2 + from i2p import socket from i2p.datatypes import Destination @@ -11,6 +14,29 @@ from Site import SiteManager from Debug import Debug +class I2PHTTPConnection(HTTPConnection): + def __init__(self, i2p_manager, site_address, *args, **kwargs): + HTTPConnection.__init__(self, *args, **kwargs) + self.i2p_manager = i2p_manager + self.site_address = site_address + self._create_connection = self._create_i2p_connection + + def _create_i2p_connection(self, address, timeout=60, + source_address=None): + return self.i2p_manager.createSocket(self.site_address, *address) + +class I2PHTTPHandler(urllib2.HTTPHandler): + def __init__(self, i2p_manager, site_address, *args, **kwargs): + urllib2.HTTPHandler.__init__(self, *args, **kwargs) + self.i2p_manager = i2p_manager + self.site_address = site_address + + def http_open(self, req): + return self.do_open(self._createI2PHTTPConnection, req) + + def _createI2PHTTPConnection(self, *args, **kwargs): + return I2PHTTPConnection(self.i2p_manager, self.site_address, *args, **kwargs) + class I2PManager: def __init__(self, fileserver_handler=None): self.dest_conns = {} # Destination: SAM connection @@ -140,3 +166,11 @@ class I2PManager: samaddr=(self.sam_ip, self.sam_port)) sock.connect((dest, int(port)), site_address) return sock + + def lookup(self, name): + return socket.lookup(name, (self.sam_ip, self.sam_port)) + + def urlopen(self, site_address, url, timeout): + handler = I2PHTTPHandler(self, site_address) + opener = urllib2.build_opener(handler) + return opener.open(url, timeout=50) diff --git a/src/Site/Site.py b/src/Site/Site.py index f5f4ed98..604c8c69 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -672,6 +672,11 @@ class Site(object): # Gather peers from tracker # Return: Complete time or False on error def announceTracker(self, tracker_protocol, tracker_address, fileserver_port=0, add_types=[], my_peer_id="", mode="start"): + is_i2p = ".i2p" in tracker_address + i2p_manager = self.connection_server.i2p_manager + if is_i2p and not (i2p_manager and i2p_manager.enabled): + return False + s = time.time() if "ip4" not in add_types: fileserver_port = 0 @@ -698,12 +703,18 @@ class Site(object): 'uploaded': 0, 'downloaded': 0, 'left': 0, 'compact': 1, 'numwant': 30, 'event': 'started' } + if is_i2p: + params['ip'] = i2p_manager.getDest(self.address).base64() req = None try: url = "http://" + tracker_address + "?" + urllib.urlencode(params) + timeout = 60 if is_i2p else 30 # Load url - with gevent.Timeout(30, False): # Make sure of timeout - req = urllib2.urlopen(url, timeout=25) + with gevent.Timeout(timeout, False): # Make sure of timeout + if is_i2p: + req = i2p_manager.urlopen(self.address, url, timeout=50) + else: + req = urllib2.urlopen(url, timeout=25) response = req.read() req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions req.close() @@ -714,13 +725,31 @@ class Site(object): # Decode peers peer_data = bencode.decode(response)["peers"] response = None - peer_count = len(peer_data) / 6 peers = [] - for peer_offset in xrange(peer_count): - off = 6 * peer_offset - peer = peer_data[off:off + 6] - addr, port = struct.unpack('!LH', peer) - peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port}) + if isinstance(peer_data, str): + # Compact response + peer_length = 32 if is_i2p else 6 + peer_count = len(peer_data) / peer_length + for peer_offset in xrange(peer_count): + off = peer_length * peer_offset + peer = peer_data[off:off + peer_length] + if is_i2p: + # TODO measure whether non-compact is faster than compact+lookup + try: + dest = i2p_manager.lookup(peer+".b32.i2p") + peers.append({"addr": dest.base64()+".i2p", "port": 6881}) + except Exception: + pass + else: + addr, port = struct.unpack('!LH', peer) + peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port}) + else: + # Non-compact response + for peer in peer_data: + if is_i2p: + peers.append({"addr": peer["ip"]+".i2p", "port": peer["port"]}) + else: + peers.append({"addr": peer["ip"], "port": peer["port"]}) except Exception, err: self.log.debug("Http tracker %s error: %s" % (url, err)) if req: @@ -755,6 +784,8 @@ class Site(object): trackers = [tracker for tracker in trackers if not tracker.startswith("udp://")] if self.connection_server and not self.connection_server.tor_manager.enabled: trackers = [tracker for tracker in trackers if ".onion" not in tracker] + if self.connection_server and not self.connection_server.i2p_manager.enabled: + trackers = [tracker for tracker in trackers if ".i2p" not in tracker] if mode == "update" or mode == "more": # Only announce on one tracker, increment the queried tracker id self.last_tracker_id += 1 @@ -772,6 +803,8 @@ class Site(object): add_types.append("ip4") if self.connection_server.tor_manager.enabled and self.connection_server.tor_manager.start_onions: add_types.append("onion") + if self.connection_server.i2p_manager.enabled and self.connection_server.i2p_manager.start_dests: + add_types.append("i2p") else: my_peer_id = ""