diff --git a/plugins/AnnounceLocal/AnnounceLocalPlugin.py b/plugins/AnnounceLocal/AnnounceLocalPlugin.py new file mode 100644 index 00000000..5031ff47 --- /dev/null +++ b/plugins/AnnounceLocal/AnnounceLocalPlugin.py @@ -0,0 +1,138 @@ +import time + +import gevent + +from Plugin import PluginManager +from Config import config +import BroadcastServer + + +@PluginManager.registerTo("Site") +class SitePlugin(object): + def announce(self, force=False, mode="start", *args, **kwargs): + local_announcer = self.connection_server.local_announcer + + if force or time.time() - local_announcer.last_discover > 5 * 50: + local_announcer.discover(force=force) + + return super(SitePlugin, self).announce(force=force, mode=mode, *args, **kwargs) + +class LocalAnnouncer(BroadcastServer.BroadcastServer): + def __init__(self, server, listen_port): + super(LocalAnnouncer, self).__init__("zeronet", listen_port=listen_port) + self.server = server + + self.sender_info["peer_id"] = self.server.peer_id + self.sender_info["port"] = self.server.port + self.sender_info["broadcast_port"] = listen_port + self.sender_info["rev"] = config.rev + + self.known_peers = {} + self.last_discover = 0 + + def discover(self, force=False): + self.log.debug("Sending discover request (force: %s)" % force) + if force: # Probably new site added, clean cache + self.known_peers = {} + + for peer_id, known_peer in self.known_peers.items(): + if time.time() - known_peer["found"] > 10 * 60: + del(self.known_peers[peer_id]) + self.log.debug("Timeout, removing from known_peers: %s" % peer_id) + self.broadcast({"cmd": "discoverRequest", "params": {}}, port=self.listen_port) + self.last_discover = time.time() + + def actionDiscoverRequest(self, sender, params): + back = { + "cmd": "discoverResponse", + "params": { + "sites_changed": self.server.site_manager.sites_changed + } + } + + if sender["peer_id"] not in self.known_peers: + self.log.debug("Got discover request from unknown peer %s, time to refresh known peers" % sender["ip"]) + self.discover() + + return back + + def actionDiscoverResponse(self, sender, params): + if sender["peer_id"] in self.known_peers: + self.known_peers[sender["peer_id"]]["found"] = time.time() + if params["sites_changed"] != self.known_peers.get(sender["peer_id"], {}).get("sites_changed"): + # Peer's site list changed, request the list of new sites + return {"cmd": "siteListRequest"} + else: + # Peer's site list is the same + for site in self.server.sites.values(): + peer = site.peers.get("%s:%s" % (sender["ip"], sender["port"])) + if peer: + peer.found("local") + + def actionSiteListRequest(self, sender, params): + back = [] + sites = self.server.sites.values() + + # Split adresses to group of 100 to avoid UDP size limit + site_groups = [sites[i:i + 100] for i in range(0, len(sites), 100)] + for site_group in site_groups: + res = {} + res["sites_changed"] = self.server.site_manager.sites_changed + res["sites"] = [site.address_hash for site in site_group] + back.append({"cmd": "siteListResponse", "params": res}) + return back + + def actionSiteListResponse(self, sender, params): + s = time.time() + peer_sites = set(params["sites"]) + num_found = 0 + added_sites = [] + for site in self.server.sites.values(): + if site.address_hash in peer_sites: + added = site.addPeer(sender["ip"], sender["port"], source="local") + num_found += 1 + if added: + site.worker_manager.onPeers() + site.updateWebsocket(peers_added=1) + added_sites.append(site) + + # Save sites changed value to avoid unnecessary site list download + if sender["peer_id"] not in self.known_peers: + self.known_peers[sender["peer_id"]] = {"added": time.time()} + + self.known_peers[sender["peer_id"]]["sites_changed"] = params["sites_changed"] + self.known_peers[sender["peer_id"]]["updated"] = time.time() + self.known_peers[sender["peer_id"]]["found"] = time.time() + + self.log.debug("Discover from %s response parsed in %.3fs, found: %s added: %s of %s" % (sender["ip"], time.time() - s, num_found, added_sites, len(peer_sites))) + + +@PluginManager.registerTo("FileServer") +class FileServerPlugin(object): + def __init__(self, *args, **kwargs): + res = super(FileServerPlugin, self).__init__(*args, **kwargs) + if config.broadcast_port and config.tor != "always" and not config.disable_udp: + self.local_announcer = LocalAnnouncer(self, config.broadcast_port) + else: + self.local_announcer = None + return res + + def start(self, *args, **kwargs): + if self.local_announcer: + gevent.spawn(self.local_announcer.start) + return super(FileServerPlugin, self).start(*args, **kwargs) + + def stop(self): + if self.local_announcer: + self.local_announcer.stop() + res = super(FileServerPlugin, self).stop() + return res + + +@PluginManager.registerTo("ConfigPlugin") +class ConfigPlugin(object): + def createArguments(self): + group = self.parser.add_argument_group("AnnounceLocal plugin") + group.add_argument('--broadcast_port', help='UDP broadcasting port for local peer discovery', default=1544, type=int) + + return super(ConfigPlugin, self).createArguments() diff --git a/plugins/AnnounceLocal/BroadcastServer.py b/plugins/AnnounceLocal/BroadcastServer.py new file mode 100644 index 00000000..9681860d --- /dev/null +++ b/plugins/AnnounceLocal/BroadcastServer.py @@ -0,0 +1,114 @@ +import socket +import logging +import time + +import msgpack + +from Debug import Debug +from util import UpnpPunch + + +class BroadcastServer(object): + def __init__(self, service_name, listen_port=1544, listen_ip=''): + self.log = logging.getLogger("BroadcastServer") + self.listen_port = listen_port + self.listen_ip = listen_ip + + self.running = False + self.sock = None + self.sender_info = {"service": service_name} + + def createBroadcastSocket(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + if hasattr(socket, 'SO_REUSEPORT'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + + for retry in range(3): + try: + sock.bind((self.listen_ip, self.listen_port)) + break + except Exception as err: + self.log.error("Socket bind error: %s, retry #%s" % (Debug.formatException(err), retry)) + time.sleep(0.1) + + return sock + + def start(self): # Listens for discover requests + self.sock = self.createBroadcastSocket() + self.log.debug("Started on port %s" % self.listen_port) + + self.running = True + + while self.running: + try: + data, addr = self.sock.recvfrom(8192) + except Exception as err: + self.log.error("Listener receive error: %s" % err) + continue + + try: + message = msgpack.unpackb(data) + response_addr, message = self.handleMessage(addr, message) + if message: + self.send(response_addr, message) + except Exception as err: + self.log.error("Handlemessage error: %s" % Debug.formatException(err)) + self.log.debug("Stopped") + + def stop(self): + self.log.debug("Stopping") + self.running = False + if self.sock: + self.sock.close() + + def send(self, addr, message): + if type(message) is not list: + message = [message] + + for message_part in message: + message_part["sender"] = self.sender_info + + self.log.debug("Send to %s: %s" % (addr, message_part["cmd"])) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(msgpack.packb(message_part), addr) + + def getMyIps(self): + return UpnpPunch._get_local_ips() + + def broadcast(self, message, port=None): + if not port: + port = self.listen_port + + my_ips = self.getMyIps() + addr = ("255.255.255.255", port) + + message["sender"] = self.sender_info + self.log.debug("Broadcast to ips %s on port %s: %s" % (my_ips, port, message["cmd"])) + + for my_ip in my_ips: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.bind((my_ip, 0)) + sock.sendto(msgpack.packb(message), addr) + + def handleMessage(self, addr, message): + self.log.debug("Got from %s: %s" % (addr, message["cmd"])) + cmd = message["cmd"] + params = message.get("params", {}) + sender = message["sender"] + sender["ip"] = addr[0] + + func_name = "action" + cmd[0].upper() + cmd[1:] + func = getattr(self, func_name, None) + + if sender["service"] != "zeronet" or sender["peer_id"] == self.sender_info["peer_id"]: + # Skip messages not for us or sent by us + message = None + elif func: + message = func(sender, params) + else: + self.log.debug("Unknown cmd: %s" % cmd) + message = None + + return (sender["ip"], sender["broadcast_port"]), message diff --git a/plugins/AnnounceLocal/Test/TestAnnounce.py b/plugins/AnnounceLocal/Test/TestAnnounce.py new file mode 100644 index 00000000..f972a8b5 --- /dev/null +++ b/plugins/AnnounceLocal/Test/TestAnnounce.py @@ -0,0 +1,105 @@ +import time +import copy + +import gevent +import pytest + +from AnnounceLocal import AnnounceLocalPlugin +from File import FileServer +from Test import Spy + +@pytest.fixture +def announcer(file_server, site): + file_server.sites[site.address] = site + file_server.local_announcer.listen_port = 1100 + file_server.local_announcer.sender_info["broadcast_port"] = 1100 + file_server.local_announcer.listen_ip = file_server.local_announcer.getMyIps()[0] + gevent.spawn(file_server.local_announcer.start) + time.sleep(0.3) + + assert file_server.local_announcer.running + return file_server.local_announcer + +@pytest.fixture +def announcer_remote(site_temp): + file_server_remote = FileServer("127.0.0.1", 1545) + file_server_remote.sites[site_temp.address] = site_temp + file_server_remote.local_announcer.listen_port = 1101 + file_server_remote.local_announcer.sender_info["broadcast_port"] = 1101 + file_server_remote.local_announcer.listen_ip = file_server_remote.local_announcer.getMyIps()[0] + gevent.spawn(file_server_remote.local_announcer.start) + time.sleep(0.3) + + assert file_server_remote.local_announcer.running + return file_server_remote.local_announcer + +@pytest.mark.usefixtures("resetSettings") +@pytest.mark.usefixtures("resetTempSettings") +class TestAnnounce: + def testSenderInfo(self, file_server): + # gevent.spawn(announcer.listen) + + sender_info = file_server.local_announcer.sender_info + assert sender_info["port"] > 0 + assert len(sender_info["peer_id"]) == 20 + assert sender_info["rev"] > 0 + + def testIgnoreSelfMessages(self, file_server, site): + file_server.sites[site.address] = site + announcer = file_server.local_announcer + + # No response to messages that has same peer_id as server + assert not announcer.handleMessage(("0.0.0.0", 123), {"cmd": "discoverRequest", "sender": announcer.sender_info, "params": {}})[1] + + # Response to messages with different peer id + sender_info = copy.copy(announcer.sender_info) + sender_info["peer_id"] += "-" + addr, res = announcer.handleMessage(("0.0.0.0", 123), {"cmd": "discoverRequest", "sender": sender_info, "params": {}}) + assert res["params"]["sites_changed"] > 0 + + def testDiscoverRequest(self, announcer, announcer_remote): + assert len(announcer_remote.known_peers) == 0 + with Spy.Spy(announcer_remote, "handleMessage") as responses: + announcer_remote.broadcast({"cmd": "discoverRequest", "params": {}}, port=announcer.listen_port) + time.sleep(0.1) + + response_cmds = [response[1]["cmd"] for response in responses] + assert response_cmds == ["discoverResponse", "siteListResponse"] + assert len(responses[-1][1]["params"]["sites"]) == 1 + + # It should only request siteList if sites_changed value is different from last response + with Spy.Spy(announcer_remote, "handleMessage") as responses: + announcer_remote.broadcast({"cmd": "discoverRequest", "params": {}}, port=announcer.listen_port) + time.sleep(0.1) + + response_cmds = [response[1]["cmd"] for response in responses] + assert response_cmds == ["discoverResponse"] + + def testPeerDiscover(self, announcer, announcer_remote, site): + assert announcer.server.peer_id != announcer_remote.server.peer_id + assert len(announcer.server.sites.values()[0].peers) == 0 + announcer.broadcast({"cmd": "discoverRequest"}, port=announcer_remote.listen_port) + time.sleep(0.1) + assert len(announcer.server.sites.values()[0].peers) == 1 + + def testRecentPeerList(self, announcer, announcer_remote, site): + assert len(site.peers_recent) == 0 + assert len(site.peers) == 0 + with Spy.Spy(announcer, "handleMessage") as responses: + announcer.broadcast({"cmd": "discoverRequest", "params": {}}, port=announcer_remote.listen_port) + time.sleep(0.1) + assert [response[1]["cmd"] for response in responses] == ["discoverResponse", "siteListResponse"] + assert len(site.peers_recent) == 1 + assert len(site.peers) == 1 + + # It should update peer without siteListResponse + last_time_found = site.peers.values()[0].time_found + site.peers_recent.clear() + with Spy.Spy(announcer, "handleMessage") as responses: + announcer.broadcast({"cmd": "discoverRequest", "params": {}}, port=announcer_remote.listen_port) + time.sleep(0.1) + assert [response[1]["cmd"] for response in responses] == ["discoverResponse"] + assert len(site.peers_recent) == 1 + assert site.peers.values()[0].time_found > last_time_found + + diff --git a/plugins/AnnounceLocal/Test/conftest.py b/plugins/AnnounceLocal/Test/conftest.py new file mode 100644 index 00000000..634e66e2 --- /dev/null +++ b/plugins/AnnounceLocal/Test/conftest.py @@ -0,0 +1 @@ +from src.Test.conftest import * diff --git a/plugins/AnnounceLocal/Test/pytest.ini b/plugins/AnnounceLocal/Test/pytest.ini new file mode 100644 index 00000000..d09210d1 --- /dev/null +++ b/plugins/AnnounceLocal/Test/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +python_files = Test*.py +addopts = -rsxX -v --durations=6 +markers = + webtest: mark a test as a webtest. \ No newline at end of file diff --git a/plugins/AnnounceLocal/__init__.py b/plugins/AnnounceLocal/__init__.py new file mode 100644 index 00000000..defe2412 --- /dev/null +++ b/plugins/AnnounceLocal/__init__.py @@ -0,0 +1 @@ +import AnnounceLocalPlugin \ No newline at end of file