Refactor and move announcer code to separate class

This commit is contained in:
shortcutme 2018-04-28 21:48:21 +02:00
parent f3a809a495
commit 2f71c2b784
No known key found for this signature in database
GPG key ID: 5B63BAE6CB9613AE
2 changed files with 327 additions and 205 deletions

View file

@ -1,15 +1,10 @@
import os
import json
import logging
import hashlib
import re
import time
import random
import sys
import struct
import socket
import urllib
import urllib2
import hashlib
import collections
@ -17,8 +12,6 @@ import gevent
import gevent.pool
import util
from lib import bencode
from lib.subtl.subtl import UdpTrackerClient
from Config import config
from Peer import Peer
from Worker import WorkerManager
@ -29,6 +22,8 @@ from Crypt import CryptHash
from util import helper
from util import Diff
from Plugin import PluginManager
from Connection import ConnectionServer
from SiteAnnouncer import SiteAnnouncer
import SiteManager
@ -47,7 +42,6 @@ class Site(object):
self.peers_recent = collections.deque(maxlen=100)
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
self.time_announce = 0 # Last announce time to tracker
self.last_tracker_id = random.randint(0, 10) # Last announced tracker id
self.worker_manager = WorkerManager(self) # Handle site download from other peers
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
self.content_updated = None # Content.js update time
@ -65,6 +59,9 @@ class Site(object):
else:
self.log.debug("Creating connection server") # remove
self.connection_server = ConnectionServer()
self.announcer = SiteAnnouncer(self) # Announce and get peer list from other nodes
if not self.settings.get("auth_key"): # To auth user in site (Obsolete, will be removed)
self.settings["auth_key"] = CryptHash.random()
self.log.debug("New auth key: %s" % self.settings["auth_key"])
@ -777,203 +774,8 @@ class Site(object):
peer.found(source)
return peer
# Gather peer from connected peers
@util.Noparallel(blocking=False)
def announcePex(self, query_num=2, need_num=5):
peers = [peer for peer in self.peers.values() if peer.connection and peer.connection.connected] # Connected peers
if len(peers) == 0: # Small number of connected peers for this site, connect to any
self.log.debug("Small number of peers detected...query all of peers using pex")
peers = self.peers.values()
need_num = 10
random.shuffle(peers)
done = 0
added = 0
for peer in peers:
res = peer.pex(need_num=need_num)
if type(res) == int: # We have result
done += 1
added += res
if res:
self.worker_manager.onPeers()
self.updateWebsocket(peers_added=res)
if done == query_num:
break
self.log.debug("Pex result: from %s peers got %s new peers." % (done, added))
# Gather peers from tracker
# Return: Complete time or False on error
def announceTracker(self, tracker_protocol, tracker_address, fileserver_port=0, add_types=[], my_peer_id="", mode="start"):
s = time.time()
if mode == "update":
num_want = 10
else:
num_want = 30
if "ip4" not in add_types:
fileserver_port = 0
if tracker_protocol == "udp": # Udp tracker
if config.disable_udp:
return False # No udp supported
ip, port = tracker_address.split(":")
tracker = UdpTrackerClient(ip, int(port))
tracker.peer_port = fileserver_port
try:
tracker.connect()
tracker.poll_once()
tracker.announce(info_hash=hashlib.sha1(self.address).hexdigest(), num_want=num_want, left=431102370)
back = tracker.poll_once()
if back and type(back) is dict:
peers = back["response"]["peers"]
else:
raise Exception("No response")
except Exception, err:
self.log.warning("Tracker error: udp://%s:%s (%s)" % (ip, port, err))
return False
elif tracker_protocol == "http": # Http tracker
params = {
'info_hash': hashlib.sha1(self.address).digest(),
'peer_id': my_peer_id, 'port': fileserver_port,
'uploaded': 0, 'downloaded': 0, 'left': 431102370, 'compact': 1, 'numwant': num_want,
'event': 'started'
}
req = None
try:
url = "http://" + tracker_address + "?" + urllib.urlencode(params)
# Load url
with gevent.Timeout(30, False): # Make sure of timeout
req = urllib2.urlopen(url, timeout=25)
response = req.read()
req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions
req.close()
req = None
if not response:
self.log.warning("Tracker error: http://%s (No response)" % tracker_address)
return False
# Decode peers
peer_data = bencode.decode(response)["peers"]
response = None
peer_count = len(peer_data) / 6
peers = []
for peer_offset in xrange(peer_count):
off = 6 * peer_offset
peer = peer_data[off:off + 6]
addr, port = struct.unpack('!LH', peer)
peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port})
except Exception, err:
self.log.warning("Tracker error: http://%s (%s)" % (tracker_address, err))
if req:
req.close()
req = None
return False
else:
peers = []
# Adding peers
added = 0
for peer in peers:
if peer["port"] == 1: # Some trackers does not accept port 0, so we send port 1 as not-connectable
peer["port"] = 0
if not peer["port"]:
continue # Dont add peers with port 0
if self.addPeer(peer["addr"], peer["port"], source="tracker"):
added += 1
if added:
self.worker_manager.onPeers()
self.updateWebsocket(peers_added=added)
self.log.debug(
"Tracker result: %s://%s (found %s peers, new: %s, total: %s)" %
(tracker_protocol, tracker_address, len(peers), added, len(self.peers))
)
return time.time() - s
# Add myself and get other peers from tracker
def announce(self, force=False, mode="start", pex=True):
if time.time() < self.time_announce + 30 and not force:
return # No reannouncing within 30 secs
self.time_announce = time.time()
trackers = config.trackers
# Filter trackers based on supported networks
if config.disable_udp:
trackers = [tracker for tracker in trackers if not tracker.startswith("udp://")]
if self.connection_server and self.connection_server.tor_manager and not self.connection_server.tor_manager.enabled:
trackers = [tracker for tracker in trackers if ".onion" not in tracker]
if trackers and (mode == "update" or mode == "more"): # Only announce on one tracker, increment the queried tracker id
self.last_tracker_id += 1
self.last_tracker_id = self.last_tracker_id % len(trackers)
trackers = [trackers[self.last_tracker_id]] # We only going to use this one
errors = []
slow = []
add_types = []
if self.connection_server:
my_peer_id = self.connection_server.peer_id
# Type of addresses they can reach me
if self.connection_server.port_opened:
add_types.append("ip4")
if self.connection_server.tor_manager and self.connection_server.tor_manager.start_onions:
add_types.append("onion")
else:
my_peer_id = ""
s = time.time()
announced = 0
threads = []
fileserver_port = config.fileserver_port
for tracker in trackers: # Start announce threads
tracker_protocol, tracker_address = tracker.split("://")
thread = gevent.spawn(
self.announceTracker, tracker_protocol, tracker_address, fileserver_port, add_types, my_peer_id, mode
)
threads.append(thread)
thread.tracker_address = tracker_address
thread.tracker_protocol = tracker_protocol
gevent.joinall(threads, timeout=10) # Wait for announce finish
for thread in threads:
if thread.value:
if thread.value > 1:
slow.append("%.2fs %s://%s" % (thread.value, thread.tracker_protocol, thread.tracker_address))
announced += 1
else:
if thread.ready():
errors.append("%s://%s" % (thread.tracker_protocol, thread.tracker_address))
else: # Still running
slow.append("10s+ %s://%s" % (thread.tracker_protocol, thread.tracker_address))
# Save peers num
self.settings["peers"] = len(self.peers)
if len(errors) < len(threads): # Less errors than total tracker nums
if announced == 1:
announced_to = trackers[0]
else:
announced_to = "%s trackers" % announced
if config.verbose:
self.log.debug(
"Announced types %s in mode %s to %s in %.3fs, errors: %s, slow: %s" %
(add_types, mode, announced_to, time.time() - s, errors, slow)
)
else:
if mode != "update":
self.log.error("Announce to %s trackers in %.3fs, failed" % (announced, time.time() - s))
if pex:
if not [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]:
# If no connected peer yet then wait for connections
gevent.spawn_later(3, self.announcePex, need_num=10) # Spawn 3 secs later
else: # Else announce immediately
if mode == "more": # Need more peers
self.announcePex(need_num=10)
else:
self.announcePex()
def announce(self, *args, **kwargs):
self.announcer.announce(*args, **kwargs)
# Keep connections to get the updates
def needConnections(self, num=6, check_site_on_reconnect=False):

320
src/Site/SiteAnnouncer.py Normal file
View file

@ -0,0 +1,320 @@
import random
import time
import hashlib
import urllib
import urllib2
import struct
import socket
from lib import bencode
from lib.subtl.subtl import UdpTrackerClient
from lib.PySocks import socks
from lib.PySocks import sockshandler
import gevent
from Plugin import PluginManager
from Config import config
import util
from Debug import Debug
class AnnounceError(Exception):
pass
@PluginManager.acceptPlugins
class SiteAnnouncer(object):
def __init__(self, site):
self.site = site
self.stats = {}
self.fileserver_port = config.fileserver_port
self.peer_id = self.site.connection_server.peer_id
self.last_tracker_id = random.randint(0, 10)
self.time_last_announce = 0
def getSupportedTrackers(self):
trackers = config.trackers
if config.disable_udp or config.trackers_proxy != "disable":
trackers = [tracker for tracker in trackers if not tracker.startswith("udp://")]
if not self.site.connection_server.tor_manager.enabled:
trackers = [tracker for tracker in trackers if ".onion" not in tracker]
return trackers
def getAnnouncingTrackers(self, mode):
trackers = self.getSupportedTrackers()
if trackers and (mode == "update" or mode == "more"): # Only announce on one tracker, increment the queried tracker id
self.last_tracker_id += 1
self.last_tracker_id = self.last_tracker_id % len(trackers)
trackers_announcing = [trackers[self.last_tracker_id]] # We only going to use this one
else:
trackers_announcing = trackers
return trackers_announcing
def getOpenedServiceTypes(self):
back = []
# Type of addresses they can reach me
if self.site.connection_server.port_opened and config.trackers_proxy == "disable":
back.append("ip4")
if self.site.connection_server.tor_manager.start_onions:
back.append("onion")
return back
@util.Noparallel(blocking=False)
def announce(self, force=False, mode="start", pex=True):
if time.time() < self.time_last_announce + 30 and not force:
return # No reannouncing within 30 secs
self.fileserver_port = config.fileserver_port
self.time_last_announce = time.time()
trackers = self.getAnnouncingTrackers(mode)
self.site.log.debug("Tracker announcing, trackers: %s" % trackers)
errors = []
slow = []
s = time.time()
threads = []
num_announced = 0
for tracker in trackers: # Start announce threads
thread = gevent.spawn(self.announceTracker, tracker, mode=mode)
threads.append(thread)
thread.tracker = tracker
time.sleep(0.01)
self.updateWebsocket(trackers="announcing")
gevent.joinall(threads, timeout=20) # Wait for announce finish
for thread in threads:
if thread.value is not False:
if thread.value > 1.0: # Takes more than 1 second to announce
slow.append("%.2fs %s" % (thread.value, thread.tracker))
num_announced += 1
else:
if thread.ready():
errors.append(thread.tracker)
else: # Still running
slow.append("30s+ %s" % thread.tracker)
# Save peers num
self.site.settings["peers"] = len(self.site.peers)
if len(errors) < len(threads): # At least one tracker finished
if len(trackers) == 1:
announced_to = trackers[0]
else:
announced_to = "%s/%s trackers" % (num_announced, len(threads))
if config.verbose or 1 == 1: # remove
self.site.log.debug(
"Announced in mode %s to %s in %.3fs, errors: %s, slow: %s" %
(mode, announced_to, time.time() - s, errors, slow)
)
else:
if len(threads) > 1:
self.site.log.error("Announce to %s trackers in %.3fs, failed" % (num_announced, time.time() - s))
self.updateWebsocket(trackers="announced")
if pex:
self.updateWebsocket(pex="announcing")
if mode == "more": # Need more peers
self.announcePex(need_num=10)
else:
self.announcePex()
self.updateWebsocket(pex="announced")
def getTrackerHandler(self, protocol):
if protocol == "udp":
handler = self.announceTrackerUdp
elif protocol == "http":
handler = self.announceTrackerHttp
else:
handler = None
return handler
def announceTracker(self, tracker, mode="start", num_want=10):
s = time.time()
protocol, address = tracker.split("://")
if tracker not in self.stats:
self.stats[tracker] = {"status": "", "num_request": 0, "num_success": 0, "num_error": 0, "time_request": 0}
self.stats[tracker]["status"] = "announcing"
self.stats[tracker]["time_status"] = time.time()
self.stats[tracker]["num_request"] += 1
self.site.log.debug("Tracker announcing to %s (mode: %s)" % (tracker, mode))
if mode == "update":
num_want = 10
else:
num_want = 30
handler = self.getTrackerHandler(protocol)
error = None
try:
if handler:
peers = handler(address, mode=mode, num_want=num_want)
else:
raise AnnounceError("Unknown protocol: %s" % protocol)
except Exception, err:
self.site.log.warning("Tracker %s announce failed: %s" % (tracker, err))
error = err
if error:
self.stats[tracker]["status"] = "error"
self.stats[tracker]["time_status"] = time.time()
self.stats[tracker]["last_error"] = str(err)
self.stats[tracker]["num_error"] += 1
self.updateWebsocket(tracker="error")
return False
self.stats[tracker]["status"] = "announced"
self.stats[tracker]["time_status"] = time.time()
self.stats[tracker]["num_success"] += 1
self.updateWebsocket(tracker="success")
if peers is None: # No peers returned
return time.time() - s
# Adding peers
added = 0
for peer in peers:
if peer["port"] == 1: # Some trackers does not accept port 0, so we send port 1 as not-connectable
peer["port"] = 0
if not peer["port"]:
continue # Dont add peers with port 0
if self.site.addPeer(peer["addr"], peer["port"], source="tracker"):
added += 1
if added:
self.site.worker_manager.onPeers()
self.site.updateWebsocket(peers_added=added)
self.site.log.debug(
"Tracker result: %s://%s (found %s peers, new: %s, total: %s)" %
(protocol, address, len(peers), added, len(self.site.peers))
)
return time.time() - s
def announceTrackerUdp(self, tracker_address, mode="start", num_want=10):
s = time.time()
if config.disable_udp:
raise AnnounceError("Udp disabled by config")
if config.trackers_proxy != "disable":
raise AnnounceError("Udp trackers not available with proxies")
ip, port = tracker_address.split(":")
tracker = UdpTrackerClient(ip, int(port))
if "ipv4" in self.getOpenedServiceTypes():
tracker.peer_port = self.fileserver_port
else:
tracker.peer_port = 0
tracker.connect()
tracker.poll_once()
tracker.announce(info_hash=hashlib.sha1(self.site.address).hexdigest(), num_want=num_want, left=431102370)
back = tracker.poll_once()
if not back:
raise AnnounceError("No response after %.0fs" % (time.time() - s))
elif type(back) is dict and "response" in back:
peers = back["response"]["peers"]
else:
raise AnnounceError("Invalid response: %r" % back)
return peers
def httpRequest(self, url):
if config.trackers_proxy == "tor":
tor_manager = self.site.connection_server.tor_manager
handler = sockshandler.SocksiPyHandler(socks.SOCKS5, tor_manager.proxy_ip, tor_manager.proxy_port)
opener = urllib2.build_opener(handler)
return opener.open(url, timeout=50)
else:
return urllib2.urlopen(url, timeout=25)
def announceTrackerHttp(self, tracker_address, mode="start", num_want=10):
if "ipv4" in self.getOpenedServiceTypes():
port = self.fileserver_port
else:
port = 0
params = {
'info_hash': hashlib.sha1(self.site.address).digest(),
'peer_id': self.peer_id, 'port': port,
'uploaded': 0, 'downloaded': 0, 'left': 431102370, 'compact': 1, 'numwant': num_want,
'event': 'started'
}
url = "http://" + tracker_address + "?" + urllib.urlencode(params)
s = time.time()
response = None
# Load url
if config.tor == "always" or config.trackers_proxy != "disable":
timeout = 60
else:
timeout = 30
with gevent.Timeout(timeout, False): # Make sure of timeout
req = self.httpRequest(url)
response = req.read()
req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions
req.close()
req = None
if not response:
raise AnnounceError("No response after %.0fs" % (time.time() - s))
# Decode peers
try:
peer_data = bencode.decode(response)["peers"]
response = None
peer_count = len(peer_data) / 6
peers = []
for peer_offset in xrange(peer_count):
off = 6 * peer_offset
peer = peer_data[off:off + 6]
addr, port = struct.unpack('!LH', peer)
peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port})
except Exception as err:
raise AnnounceError("Invalid response: %r (%s)" % (response, err))
return peers
@util.Noparallel(blocking=False)
def announcePex(self, query_num=2, need_num=5):
peers = self.site.getConnectedPeers()
if len(peers) == 0: # Wait 3s for connections
time.sleep(3)
peers = self.site.getConnectedPeers()
if len(peers) == 0: # Small number of connected peers for this site, connect to any
peers = self.site.peers.values()
need_num = 10
random.shuffle(peers)
done = 0
total_added = 0
for peer in peers:
num_added = peer.pex(need_num=need_num)
if num_added is not False:
done += 1
total_added += num_added
if num_added:
self.site.worker_manager.onPeers()
self.site.updateWebsocket(peers_added=num_added)
if done == query_num:
break
self.site.log.debug("Pex result: from %s peers got %s new peers." % (done, total_added))
def updateWebsocket(self, **kwargs):
if kwargs:
param = {"event": kwargs.items()[0]}
else:
param = None
for ws in self.site.websockets:
ws.event("announcerChanged", self.site, param)