Implement announces to I2P BitTorrent trackers

Protocol details: https://geti2p.net/en/docs/applications/bittorrent
This commit is contained in:
str4d 2016-08-01 02:17:02 +12:00
parent 5e57411c58
commit feebcd0662
2 changed files with 75 additions and 8 deletions

View file

@ -3,6 +3,9 @@ import logging
from gevent.coros import RLock from gevent.coros import RLock
from gevent.server import StreamServer from gevent.server import StreamServer
from gevent.pool import Pool from gevent.pool import Pool
from httplib import HTTPConnection
import urllib2
from i2p import socket from i2p import socket
from i2p.datatypes import Destination from i2p.datatypes import Destination
@ -11,6 +14,29 @@ from Site import SiteManager
from Debug import Debug from Debug import Debug
class I2PHTTPConnection(HTTPConnection):
def __init__(self, i2p_manager, site_address, *args, **kwargs):
HTTPConnection.__init__(self, *args, **kwargs)
self.i2p_manager = i2p_manager
self.site_address = site_address
self._create_connection = self._create_i2p_connection
def _create_i2p_connection(self, address, timeout=60,
source_address=None):
return self.i2p_manager.createSocket(self.site_address, *address)
class I2PHTTPHandler(urllib2.HTTPHandler):
def __init__(self, i2p_manager, site_address, *args, **kwargs):
urllib2.HTTPHandler.__init__(self, *args, **kwargs)
self.i2p_manager = i2p_manager
self.site_address = site_address
def http_open(self, req):
return self.do_open(self._createI2PHTTPConnection, req)
def _createI2PHTTPConnection(self, *args, **kwargs):
return I2PHTTPConnection(self.i2p_manager, self.site_address, *args, **kwargs)
class I2PManager: class I2PManager:
def __init__(self, fileserver_handler=None): def __init__(self, fileserver_handler=None):
self.dest_conns = {} # Destination: SAM connection self.dest_conns = {} # Destination: SAM connection
@ -140,3 +166,11 @@ class I2PManager:
samaddr=(self.sam_ip, self.sam_port)) samaddr=(self.sam_ip, self.sam_port))
sock.connect((dest, int(port)), site_address) sock.connect((dest, int(port)), site_address)
return sock return sock
def lookup(self, name):
return socket.lookup(name, (self.sam_ip, self.sam_port))
def urlopen(self, site_address, url, timeout):
handler = I2PHTTPHandler(self, site_address)
opener = urllib2.build_opener(handler)
return opener.open(url, timeout=50)

View file

@ -672,6 +672,11 @@ class Site(object):
# Gather peers from tracker # Gather peers from tracker
# Return: Complete time or False on error # Return: Complete time or False on error
def announceTracker(self, tracker_protocol, tracker_address, fileserver_port=0, add_types=[], my_peer_id="", mode="start"): def announceTracker(self, tracker_protocol, tracker_address, fileserver_port=0, add_types=[], my_peer_id="", mode="start"):
is_i2p = ".i2p" in tracker_address
i2p_manager = self.connection_server.i2p_manager
if is_i2p and not (i2p_manager and i2p_manager.enabled):
return False
s = time.time() s = time.time()
if "ip4" not in add_types: if "ip4" not in add_types:
fileserver_port = 0 fileserver_port = 0
@ -698,11 +703,17 @@ class Site(object):
'uploaded': 0, 'downloaded': 0, 'left': 0, 'compact': 1, 'numwant': 30, 'uploaded': 0, 'downloaded': 0, 'left': 0, 'compact': 1, 'numwant': 30,
'event': 'started' 'event': 'started'
} }
if is_i2p:
params['ip'] = i2p_manager.getDest(self.address).base64()
req = None req = None
try: try:
url = "http://" + tracker_address + "?" + urllib.urlencode(params) url = "http://" + tracker_address + "?" + urllib.urlencode(params)
timeout = 60 if is_i2p else 30
# Load url # Load url
with gevent.Timeout(30, False): # Make sure of timeout with gevent.Timeout(timeout, False): # Make sure of timeout
if is_i2p:
req = i2p_manager.urlopen(self.address, url, timeout=50)
else:
req = urllib2.urlopen(url, timeout=25) req = urllib2.urlopen(url, timeout=25)
response = req.read() response = req.read()
req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions
@ -714,13 +725,31 @@ class Site(object):
# Decode peers # Decode peers
peer_data = bencode.decode(response)["peers"] peer_data = bencode.decode(response)["peers"]
response = None response = None
peer_count = len(peer_data) / 6
peers = [] peers = []
if isinstance(peer_data, str):
# Compact response
peer_length = 32 if is_i2p else 6
peer_count = len(peer_data) / peer_length
for peer_offset in xrange(peer_count): for peer_offset in xrange(peer_count):
off = 6 * peer_offset off = peer_length * peer_offset
peer = peer_data[off:off + 6] peer = peer_data[off:off + peer_length]
if is_i2p:
# TODO measure whether non-compact is faster than compact+lookup
try:
dest = i2p_manager.lookup(peer+".b32.i2p")
peers.append({"addr": dest.base64()+".i2p", "port": 6881})
except Exception:
pass
else:
addr, port = struct.unpack('!LH', peer) addr, port = struct.unpack('!LH', peer)
peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port}) peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port})
else:
# Non-compact response
for peer in peer_data:
if is_i2p:
peers.append({"addr": peer["ip"]+".i2p", "port": peer["port"]})
else:
peers.append({"addr": peer["ip"], "port": peer["port"]})
except Exception, err: except Exception, err:
self.log.debug("Http tracker %s error: %s" % (url, err)) self.log.debug("Http tracker %s error: %s" % (url, err))
if req: if req:
@ -755,6 +784,8 @@ class Site(object):
trackers = [tracker for tracker in trackers if not tracker.startswith("udp://")] trackers = [tracker for tracker in trackers if not tracker.startswith("udp://")]
if self.connection_server and not self.connection_server.tor_manager.enabled: if self.connection_server and not self.connection_server.tor_manager.enabled:
trackers = [tracker for tracker in trackers if ".onion" not in tracker] trackers = [tracker for tracker in trackers if ".onion" not in tracker]
if self.connection_server and not self.connection_server.i2p_manager.enabled:
trackers = [tracker for tracker in trackers if ".i2p" not in tracker]
if mode == "update" or mode == "more": # Only announce on one tracker, increment the queried tracker id if mode == "update" or mode == "more": # Only announce on one tracker, increment the queried tracker id
self.last_tracker_id += 1 self.last_tracker_id += 1
@ -772,6 +803,8 @@ class Site(object):
add_types.append("ip4") add_types.append("ip4")
if self.connection_server.tor_manager.enabled and self.connection_server.tor_manager.start_onions: if self.connection_server.tor_manager.enabled and self.connection_server.tor_manager.start_onions:
add_types.append("onion") add_types.append("onion")
if self.connection_server.i2p_manager.enabled and self.connection_server.i2p_manager.start_dests:
add_types.append("i2p")
else: else:
my_peer_id = "" my_peer_id = ""