diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py
index 91d3e4e1..2954c1e6 100644
--- a/src/Connection/ConnectionServer.py
+++ b/src/Connection/ConnectionServer.py
@@ -13,6 +13,7 @@ from Config import config
from Crypt import CryptConnection
from Crypt import CryptHash
from Tor import TorManager
+from util import UpnpPunch
class ConnectionServer:
@@ -73,6 +74,13 @@ class ConnectionServer:
self.log.info("StreamServer bind error, must be running already: %s" % err)
def stop(self):
+ self.log.debug('Closing port %d' % self.port)
+ if self.running:
+ try:
+ UpnpPunch.ask_to_close_port(self.port)
+ self.log.info('Closed port via upnp.')
+ except (UpnpPunch.UpnpError, UpnpPunch.IGDError), err:
+ self.log.info("Failed at attempt to use upnp to close port: %s" %err)
self.running = False
self.stream_server.stop()
diff --git a/src/File/FileServer.py b/src/File/FileServer.py
index 6f84a49c..91dfe104 100644
--- a/src/File/FileServer.py
+++ b/src/File/FileServer.py
@@ -69,13 +69,13 @@ class FileServer(ConnectionServer):
self.log.info("Trying to open port using UpnpPunch...")
try:
- upnp_punch = UpnpPunch.open_port(self.port, 'ZeroNet')
- upnp_punch = True
- except Exception, err:
- self.log.error("UpnpPunch run error: %s" % Debug.formatException(err))
- upnp_punch = False
+ UpnpPunch.ask_to_open_port(self.port, 'ZeroNet', retries=3)
+ except (UpnpPunch.UpnpError, UpnpPunch.IGDError) as err:
+ self.log.error("UpnpPunch run error: %s" %
+ Debug.formatException(err))
+ return False
- if upnp_punch and self.testOpenport(port)["result"] is True:
+ if self.testOpenport(port)["result"] is True:
return True
self.log.info("Upnp mapping failed :( Please forward port %s on your router to your ipaddress" % port)
diff --git a/src/Test/TestUpnpPunch.py b/src/Test/TestUpnpPunch.py
new file mode 100644
index 00000000..f77d7f8d
--- /dev/null
+++ b/src/Test/TestUpnpPunch.py
@@ -0,0 +1,274 @@
+import socket
+from urlparse import urlparse
+
+import pytest
+import mock
+
+from util import UpnpPunch as upnp
+
+
+@pytest.fixture
+def mock_socket():
+ mock_socket = mock.MagicMock()
+ mock_socket.recv = mock.MagicMock(return_value='Hello')
+ mock_socket.bind = mock.MagicMock()
+ mock_socket.send_to = mock.MagicMock()
+
+ return mock_socket
+
+
+@pytest.fixture
+def url_obj():
+ return urlparse('http://192.168.1.1/ctrlPoint.xml')
+
+
+@pytest.fixture(params=['WANPPPConnection', 'WANIPConnection'])
+def igd_profile(request):
+ return """
+ urn:schemas-upnp-org:service:{}:1
+ urn:upnp-org:serviceId:wanpppc:pppoa
+ /upnp/control/wanpppcpppoa
+ /upnp/event/wanpppcpppoa
+ /WANPPPConnection.xml
+""".format(request.param)
+
+
+@pytest.fixture
+def httplib_response():
+ class FakeResponse(object):
+ def __init__(self, status=200, body='OK'):
+ self.status = status
+ self.body = body
+
+ def read(self):
+ return self.body
+ return FakeResponse
+
+
+class TestUpnpPunch(object):
+ def test_perform_m_search(self, mock_socket):
+ local_ip = '127.0.0.1'
+
+ with mock.patch('util.UpnpPunch.socket.socket',
+ return_value=mock_socket):
+ result = upnp.perform_m_search(local_ip)
+ assert result == 'Hello'
+ assert local_ip == mock_socket.bind.call_args_list[0][0][0][0]
+ assert ('239.255.255.250',
+ 1900) == mock_socket.sendto.call_args_list[0][0][1]
+
+ def test_perform_m_search_socket_error(self, mock_socket):
+ mock_socket.recv.side_effect = socket.error('Timeout error')
+
+ with mock.patch('util.UpnpPunch.socket.socket',
+ return_value=mock_socket):
+ with pytest.raises(upnp.UpnpError):
+ upnp.perform_m_search('127.0.0.1')
+
+ def test_retrieve_location_from_ssdp(self, url_obj):
+ ctrl_location = url_obj.geturl()
+ parsed_location = urlparse(ctrl_location)
+ rsp = ('auth: gibberish\r\nlocation: {0}\r\n'
+ 'Content-Type: text/html\r\n\r\n').format(ctrl_location)
+ result = upnp._retrieve_location_from_ssdp(rsp)
+ assert result == parsed_location
+
+ def test_retrieve_location_from_ssdp_no_header(self):
+ rsp = 'auth: gibberish\r\nContent-Type: application/json\r\n\r\n'
+ with pytest.raises(upnp.IGDError):
+ upnp._retrieve_location_from_ssdp(rsp)
+
+ def test_retrieve_igd_profile(self, url_obj):
+ with mock.patch('urllib2.urlopen') as mock_urlopen:
+ upnp._retrieve_igd_profile(url_obj)
+ mock_urlopen.assert_called_with(url_obj.geturl(), timeout=5)
+
+ def test_retrieve_igd_profile_timeout(self, url_obj):
+ with mock.patch('urllib2.urlopen') as mock_urlopen:
+ mock_urlopen.side_effect = socket.error('Timeout error')
+ with pytest.raises(upnp.IGDError):
+ upnp._retrieve_igd_profile(url_obj)
+
+ def test_parse_igd_profile_service_type(self, igd_profile):
+ control_path, upnp_schema = upnp._parse_igd_profile(igd_profile)
+ assert control_path == '/upnp/control/wanpppcpppoa'
+ assert upnp_schema in ('WANPPPConnection', 'WANIPConnection',)
+
+ def test_parse_igd_profile_no_ctrlurl(self, igd_profile):
+ igd_profile = igd_profile.replace('controlURL', 'nope')
+ with pytest.raises(upnp.IGDError):
+ control_path, upnp_schema = upnp._parse_igd_profile(igd_profile)
+
+ def test_parse_igd_profile_no_schema(self, igd_profile):
+ igd_profile = igd_profile.replace('Connection', 'nope')
+ with pytest.raises(upnp.IGDError):
+ control_path, upnp_schema = upnp._parse_igd_profile(igd_profile)
+
+ def test_create_open_message_parsable(self):
+ from xml.parsers.expat import ExpatError
+ msg, _ = upnp._create_open_message('127.0.0.1', 8888)
+ try:
+ upnp.parseString(msg)
+ except ExpatError as e:
+ pytest.fail('Incorrect XML message: {}'.format(e))
+
+ def test_create_open_message_contains_right_stuff(self):
+ settings = {'description': 'test desc',
+ 'protocol': 'test proto',
+ 'upnp_schema': 'test schema'}
+ msg, fn_name = upnp._create_open_message('127.0.0.1', 8888, **settings)
+ assert fn_name == 'AddPortMapping'
+ assert '127.0.0.1' in msg
+ assert '8888' in msg
+ assert settings['description'] in msg
+ assert settings['protocol'] in msg
+ assert settings['upnp_schema'] in msg
+
+ def test_parse_for_errors_bad_rsp(self, httplib_response):
+ rsp = httplib_response(status=500)
+ with pytest.raises(upnp.IGDError) as exc:
+ upnp._parse_for_errors(rsp)
+ assert 'Unable to parse' in exc.value.message
+
+ def test_parse_for_errors_error(self, httplib_response):
+ soap_error = (''
+ '500'
+ 'Bad request'
+ '')
+ rsp = httplib_response(status=500, body=soap_error)
+ with pytest.raises(upnp.IGDError) as exc:
+ upnp._parse_for_errors(rsp)
+ assert 'SOAP request error' in exc.value.message
+
+ def test_parse_for_errors_good_rsp(self, httplib_response):
+ rsp = httplib_response(status=200)
+ assert rsp == upnp._parse_for_errors(rsp)
+
+ def test_send_requests_success(self):
+ with mock.patch(
+ 'util.UpnpPunch._send_soap_request') as mock_send_request:
+ mock_send_request.return_value = mock.MagicMock(status=200)
+ upnp._send_requests(['msg'], None, None, None)
+
+ assert mock_send_request.called
+
+ def test_send_requests_failed(self):
+ with mock.patch(
+ 'util.UpnpPunch._send_soap_request') as mock_send_request:
+ mock_send_request.return_value = mock.MagicMock(status=500)
+ with pytest.raises(upnp.UpnpError):
+ upnp._send_requests(['msg'], None, None, None)
+
+ assert mock_send_request.called
+
+ def test_collect_idg_data(self):
+ pass
+
+ @mock.patch('util.UpnpPunch._get_local_ips')
+ @mock.patch('util.UpnpPunch._collect_idg_data')
+ @mock.patch('util.UpnpPunch._send_requests')
+ def test_ask_to_open_port_success(self, mock_send_requests,
+ mock_collect_idg, mock_local_ips):
+ mock_collect_idg.return_value = {'upnp_schema': 'schema-yo'}
+ mock_local_ips.return_value = ['192.168.0.12']
+
+ result = upnp.ask_to_open_port(retries=5)
+
+ soap_msg = mock_send_requests.call_args[0][0][0][0]
+
+ assert result is None
+
+ assert mock_collect_idg.called
+ assert '192.168.0.12' in soap_msg
+ assert '15441' in soap_msg
+ assert 'schema-yo' in soap_msg
+
+ @mock.patch('util.UpnpPunch._get_local_ips')
+ @mock.patch('util.UpnpPunch._collect_idg_data')
+ @mock.patch('util.UpnpPunch._send_requests')
+ def test_ask_to_open_port_failure(self, mock_send_requests,
+ mock_collect_idg, mock_local_ips):
+ mock_local_ips.return_value = ['192.168.0.12']
+ mock_collect_idg.return_value = {'upnp_schema': 'schema-yo'}
+ mock_send_requests.side_effect = upnp.UpnpError()
+
+ with pytest.raises(upnp.UpnpError):
+ upnp.ask_to_open_port()
+
+ @mock.patch('util.UpnpPunch._collect_idg_data')
+ @mock.patch('util.UpnpPunch._send_requests')
+ def test_orchestrate_soap_request(self, mock_send_requests,
+ mock_collect_idg):
+ soap_mock = mock.MagicMock()
+ args = ['127.0.0.1', 31337, soap_mock, 'upnp-test', {'upnp_schema':
+ 'schema-yo'}]
+ mock_collect_idg.return_value = args[-1]
+
+ upnp._orchestrate_soap_request(*args[:-1])
+
+ assert mock_collect_idg.called
+ soap_mock.assert_called_with(
+ *args[:2] + ['upnp-test', 'UDP', 'schema-yo'])
+ assert mock_send_requests.called
+
+ @mock.patch('util.UpnpPunch._collect_idg_data')
+ @mock.patch('util.UpnpPunch._send_requests')
+ def test_orchestrate_soap_request_without_desc(self, mock_send_requests,
+ mock_collect_idg):
+ soap_mock = mock.MagicMock()
+ args = ['127.0.0.1', 31337, soap_mock, {'upnp_schema': 'schema-yo'}]
+ mock_collect_idg.return_value = args[-1]
+
+ upnp._orchestrate_soap_request(*args[:-1])
+
+ assert mock_collect_idg.called
+ soap_mock.assert_called_with(*args[:2] + [None, 'UDP', 'schema-yo'])
+ assert mock_send_requests.called
+
+ def test_create_close_message_parsable(self):
+ from xml.parsers.expat import ExpatError
+ msg, _ = upnp._create_close_message('127.0.0.1', 8888)
+ try:
+ upnp.parseString(msg)
+ except ExpatError as e:
+ pytest.fail('Incorrect XML message: {}'.format(e))
+
+ def test_create_close_message_contains_right_stuff(self):
+ settings = {'protocol': 'test proto',
+ 'upnp_schema': 'test schema'}
+ msg, fn_name = upnp._create_close_message('127.0.0.1', 8888, **
+ settings)
+ assert fn_name == 'DeletePortMapping'
+ assert '8888' in msg
+ assert settings['protocol'] in msg
+ assert settings['upnp_schema'] in msg
+
+ @mock.patch('util.UpnpPunch._get_local_ips')
+ @mock.patch('util.UpnpPunch._orchestrate_soap_request')
+ def test_communicate_with_igd_success(self, mock_orchestrate,
+ mock_get_local_ips):
+ mock_get_local_ips.return_value = ['192.168.0.12']
+ upnp._communicate_with_igd()
+ assert mock_get_local_ips.called
+ assert mock_orchestrate.called
+
+ @mock.patch('util.UpnpPunch._get_local_ips')
+ @mock.patch('util.UpnpPunch._orchestrate_soap_request')
+ def test_communicate_with_igd_succeed_despite_single_failure(
+ self, mock_orchestrate, mock_get_local_ips):
+ mock_get_local_ips.return_value = ['192.168.0.12']
+ mock_orchestrate.side_effect = [upnp.UpnpError, None]
+ upnp._communicate_with_igd(retries=2)
+ assert mock_get_local_ips.called
+ assert mock_orchestrate.called
+
+ @mock.patch('util.UpnpPunch._get_local_ips')
+ @mock.patch('util.UpnpPunch._orchestrate_soap_request')
+ def test_communicate_with_igd_total_failure(self, mock_orchestrate,
+ mock_get_local_ips):
+ mock_get_local_ips.return_value = ['192.168.0.12']
+ mock_orchestrate.side_effect = [upnp.UpnpError, upnp.IGDError]
+ with pytest.raises(upnp.UpnpError):
+ upnp._communicate_with_igd(retries=2)
+ assert mock_get_local_ips.called
+ assert mock_orchestrate.called
diff --git a/src/util/UpnpPunch.py b/src/util/UpnpPunch.py
index 01471ee7..b595e7bb 100644
--- a/src/util/UpnpPunch.py
+++ b/src/util/UpnpPunch.py
@@ -5,18 +5,30 @@ import logging
from urlparse import urlparse
from xml.dom.minidom import parseString
-import gevent
from gevent import socket
-# Relevant UPnP spec: http://www.upnp.org/specs/gw/UPnP-gw-WANIPConnection-v1-Service.pdf
+# Relevant UPnP spec:
+# http://www.upnp.org/specs/gw/UPnP-gw-WANIPConnection-v1-Service.pdf
# General TODOs:
# Handle 0 or >1 IGDs
-remove_whitespace = re.compile(r'>\s*<')
+
+class UpnpError(Exception):
+ pass
-def _m_search_ssdp(local_ip):
+class IGDError(UpnpError):
+ """
+ Signifies a problem with the IGD.
+ """
+ pass
+
+
+REMOVE_WHITESPACE = re.compile(r'>\s*<')
+
+
+def perform_m_search(local_ip):
"""
Broadcast a UDP SSDP M-SEARCH packet and return response.
"""
@@ -43,10 +55,8 @@ def _m_search_ssdp(local_ip):
try:
return sock.recv(2048)
- except socket.error, err:
- # no reply from IGD, possibly no IGD on LAN
- logging.debug("UDP SSDP M-SEARCH send error using ip %s: %s" % (local_ip, err))
- return False
+ except socket.error:
+ raise UpnpError("No reply from IGD using {} as IP".format(local_ip))
def _retrieve_location_from_ssdp(response):
@@ -54,24 +64,28 @@ def _retrieve_location_from_ssdp(response):
Parse raw HTTP response to retrieve the UPnP location header
and return a ParseResult object.
"""
- parsed = re.findall(r'(?P.*?): (?P.*?)\r\n', response)
- location_header = filter(lambda x: x[0].lower() == 'location', parsed)
+ parsed_headers = re.findall(r'(?P.*?): (?P.*?)\r\n', response)
+ header_locations = [header[1]
+ for header in parsed_headers
+ if header[0].lower() == 'location']
- if not len(location_header):
- # no location header returned :(
- return False
+ if len(header_locations) < 1:
+ raise IGDError('IGD response does not contain a "location" header.')
- return urlparse(location_header[0][1])
+ return urlparse(header_locations[0])
def _retrieve_igd_profile(url):
"""
Retrieve the device's UPnP profile.
"""
- return urllib2.urlopen(url.geturl()).read()
+ try:
+ return urllib2.urlopen(url.geturl(), timeout=5).read()
+ except socket.error:
+ raise IGDError('IGD profile query timed out')
-def _node_val(node):
+def _get_first_child_data(node):
"""
Get the text value of the first child text node of a node.
"""
@@ -82,34 +96,65 @@ def _parse_igd_profile(profile_xml):
"""
Traverse the profile xml DOM looking for either
WANIPConnection or WANPPPConnection and return
- the value found as well as the 'controlURL'.
+ the 'controlURL' and the service xml schema.
"""
dom = parseString(profile_xml)
service_types = dom.getElementsByTagName('serviceType')
for service in service_types:
- if _node_val(service).find('WANIPConnection') > 0 or \
- _node_val(service).find('WANPPPConnection') > 0:
- control_url = service.parentNode.getElementsByTagName(
- 'controlURL'
- )[0].childNodes[0].data
- upnp_schema = _node_val(service).split(':')[-2]
- return control_url, upnp_schema
-
- return False
+ if _get_first_child_data(service).find('WANIPConnection') > 0 or \
+ _get_first_child_data(service).find('WANPPPConnection') > 0:
+ try:
+ control_url = _get_first_child_data(
+ service.parentNode.getElementsByTagName('controlURL')[0])
+ upnp_schema = _get_first_child_data(service).split(':')[-2]
+ return control_url, upnp_schema
+ except IndexError:
+ # Pass the error because any error here should raise the
+ # that's specified outside the for loop.
+ pass
+ raise IGDError(
+ 'Could not find a control url or UPNP schema in IGD response.')
-def _get_local_ip():
+# add description
+def _get_local_ips():
+ local_ips = []
+
+ # get local ip using UDP and a broadcast address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- # not using because gevents getaddrinfo doesn't like that
+ # Not using because gevents getaddrinfo doesn't like that
# using port 1 as per hobbldygoop's comment about port 0 not working on osx:
# https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928
s.connect(('239.255.255.250', 1))
- return s.getsockname()[0]
+ local_ips.append(s.getsockname()[0])
+
+ # Get ip by using UDP and a normal address (google dns ip)
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.connect(('8.8.8.8', 0))
+ local_ips.append(s.getsockname()[0])
+ except:
+ pass
+
+ # Get ip by '' hostname . Not supported on all platforms.
+ try:
+ local_ips += socket.gethostbyname_ex('')[2]
+ except:
+ pass
+
+ # Delete duplicates
+ local_ips = list(set(local_ips))
+
+ logging.debug("Found local ips: %s" % local_ips)
+ return local_ips
-def _create_soap_message(local_ip, port, description="UPnPPunch", protocol="TCP",
+def _create_open_message(local_ip,
+ port,
+ description="UPnPPunch",
+ protocol="TCP",
upnp_schema='WANIPConnection'):
"""
Build a SOAP AddPortMapping message.
@@ -134,46 +179,67 @@ def _create_soap_message(local_ip, port, description="UPnPPunch", protocol="TCP"
host_ip=local_ip,
description=description,
upnp_schema=upnp_schema)
- return remove_whitespace.sub('><', soap_message)
+ return (REMOVE_WHITESPACE.sub('><', soap_message), 'AddPortMapping')
+
+
+def _create_close_message(local_ip,
+ port,
+ description=None,
+ protocol='TCP',
+ upnp_schema='WANIPConnection'):
+ soap_message = """
+
+
+
+
+ {port}
+ {protocol}
+
+
+""".format(port=port,
+ protocol=protocol,
+ upnp_schema=upnp_schema)
+ return (REMOVE_WHITESPACE.sub('><', soap_message), 'DeletePortMapping')
def _parse_for_errors(soap_response):
- if soap_response.status == 500:
+ logging.debug(soap_response.status)
+ if soap_response.status >= 400:
response_data = soap_response.read()
+ logging.debug(response_data)
try:
err_dom = parseString(response_data)
- err_code = _node_val(err_dom.getElementsByTagName('errorCode')[0])
- err_msg = _node_val(
+ err_code = _get_first_child_data(err_dom.getElementsByTagName(
+ 'errorCode')[0])
+ err_msg = _get_first_child_data(
err_dom.getElementsByTagName('errorDescription')[0]
)
- except Exception, err:
- logging.error("Unable to parse SOAP error: {0}, response: {1}".format(err, response_data))
- return False
-
- logging.error('SOAP request error: {0} - {1}'.format(err_code, err_msg))
- raise Exception(
+ except Exception as err:
+ raise IGDError(
+ 'Unable to parse SOAP error: {0}. Got: "{1}"'.format(
+ err, response_data))
+ raise IGDError(
'SOAP request error: {0} - {1}'.format(err_code, err_msg)
)
-
- return False
- else:
- return True
+ return soap_response
-def _send_soap_request(location, upnp_schema, control_url, soap_message):
+def _send_soap_request(location, upnp_schema, control_path, soap_fn,
+ soap_message):
"""
Send out SOAP request to UPnP device and return a response.
"""
headers = {
'SOAPAction': (
'"urn:schemas-upnp-org:service:{schema}:'
- '1#AddPortMapping"'.format(schema=upnp_schema)
+ '1#{fn_name}"'.format(schema=upnp_schema, fn_name=soap_fn)
),
'Content-Type': 'text/xml'
}
- logging.debug("Sending UPnP request to {0}:{1}...".format(location.hostname, location.port))
+ logging.debug("Sending UPnP request to {0}:{1}...".format(
+ location.hostname, location.port))
conn = httplib.HTTPConnection(location.hostname, location.port)
- conn.request('POST', control_url, soap_message, headers)
+ conn.request('POST', control_path, soap_message, headers)
response = conn.getresponse()
conn.close()
@@ -181,64 +247,83 @@ def _send_soap_request(location, upnp_schema, control_url, soap_message):
return _parse_for_errors(response)
-def open_port(port=15441, desc="UpnpPunch"):
+def _collect_idg_data(ip_addr):
+ idg_data = {}
+ idg_response = perform_m_search(ip_addr)
+ idg_data['location'] = _retrieve_location_from_ssdp(idg_response)
+ idg_data['control_path'], idg_data['upnp_schema'] = _parse_igd_profile(
+ _retrieve_igd_profile(idg_data['location']))
+ return idg_data
+
+
+def _send_requests(messages, location, upnp_schema, control_path):
+ responses = [_send_soap_request(location, upnp_schema, control_path,
+ message_tup[1], message_tup[0])
+ for message_tup in messages]
+
+ if all(rsp.status == 200 for rsp in responses):
+ return
+ raise UpnpError('Sending requests using UPnP failed.')
+
+
+def _orchestrate_soap_request(ip, port, msg_fn, desc=None):
+ logging.debug("Trying using local ip: %s" % ip)
+ idg_data = _collect_idg_data(ip)
+
+ soap_messages = [
+ msg_fn(ip, port, desc, proto, idg_data['upnp_schema'])
+ for proto in ['TCP', 'UDP']
+ ]
+
+ _send_requests(soap_messages, **idg_data)
+
+
+def _communicate_with_igd(port=15441,
+ desc="UpnpPunch",
+ retries=3,
+ fn=_create_open_message):
"""
- Attempt to forward a port using UPnP.
+ Manage sending a message generated by 'fn'.
"""
- local_ips = [_get_local_ip()]
- try:
- local_ips += socket.gethostbyname_ex('')[2] # Get ip by '' hostname not supported on all platform
- except:
- pass
-
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('8.8.8.8', 0)) # Using google dns route
- local_ips.append(s.getsockname()[0])
- except:
- pass
-
- local_ips = list(set(local_ips)) # Delete duplicates
- logging.debug("Found local ips: %s" % local_ips)
- local_ips = local_ips * 3 # Retry every ip 3 times
+ # Retry every ip 'retries' times
+ local_ips = _get_local_ips() * retries
+ success = False
for local_ip in local_ips:
- logging.debug("Trying using local ip: %s" % local_ip)
- idg_response = _m_search_ssdp(local_ip)
-
- if not idg_response:
- logging.debug("No IGD response")
+ try:
+ _orchestrate_soap_request(local_ip, port, fn, desc)
+ success = True
+ break
+ except (UpnpError, IGDError) as e:
+ logging.debug('Upnp request using "{0}" failed: {1}'.format(
+ local_ip, e))
+ success = False
continue
- location = _retrieve_location_from_ssdp(idg_response)
+ if not success:
+ raise UpnpError(
+ 'Failed to communicate with igd using port {0} on local machine after {1} tries.'.format(
+ port, retries))
- if not location:
- logging.debug("No location")
- continue
- parsed = _parse_igd_profile(
- _retrieve_igd_profile(location)
- )
+def ask_to_open_port(port=15441, desc="UpnpPunch", retries=3):
+ logging.debug("Trying to open port %d." % port)
+ _communicate_with_igd(port=port,
+ desc=desc,
+ retries=retries,
+ fn=_create_open_message)
- if not parsed:
- logging.debug("IGD parse error using location %s" % repr(location))
- continue
- control_url, upnp_schema = parsed
+def ask_to_close_port(port=15441, desc="UpnpPunch", retries=3):
+ logging.debug("Trying to close port %d." % port)
+ # retries=1 because multiple successes cause 500 response and failure
+ _communicate_with_igd(port=port,
+ desc=desc,
+ retries=1,
+ fn=_create_close_message)
- soap_messages = [_create_soap_message(local_ip, port, desc, proto, upnp_schema)
- for proto in ['TCP', 'UDP']]
- requests = [gevent.spawn(
- _send_soap_request, location, upnp_schema, control_url, message
- ) for message in soap_messages]
-
- gevent.joinall(requests, timeout=3)
-
- if all([request.value for request in requests]):
- return True
- return False
if __name__ == "__main__":
from gevent import monkey
@@ -247,5 +332,5 @@ if __name__ == "__main__":
s = time.time()
logging.getLogger().setLevel(logging.DEBUG)
- print open_port(15441, "ZeroNet")
+ print ask_to_open_port(15441, "ZeroNet", retries=3)
print "Done in", time.time()-s