diff --git a/src/Peer/PeerPortchecker.py b/src/Peer/PeerPortchecker.py
index 6f487b5a..3c4daecf 100644
--- a/src/Peer/PeerPortchecker.py
+++ b/src/Peer/PeerPortchecker.py
@@ -9,6 +9,10 @@ from util import UpnpPunch
class PeerPortchecker(object):
+ checker_functions = {
+ "ipv4": ["checkIpfingerprints", "checkCanyouseeme"],
+ "ipv6": ["checkMyaddr", "checkIpv6scanner"]
+ }
def __init__(self, file_server):
self.log = logging.getLogger("PeerPortchecker")
self.upnp_port_opened = False
@@ -39,10 +43,7 @@ class PeerPortchecker(object):
return UpnpPunch.ask_to_close_port(port, protos=["TCP"])
def portCheck(self, port, ip_type="ipv4"):
- if ip_type == "ipv6":
- checker_functions = ["checkMyaddr", "checkIpv6scanner"]
- else:
- checker_functions = ["checkPortchecker", "checkCanyouseeme"]
+ checker_functions = self.checker_functions[ip_type]
for func_name in checker_functions:
func = getattr(self, func_name)
@@ -51,13 +52,13 @@ class PeerPortchecker(object):
res = func(port)
if res:
self.log.info(
- "Checking port %s (%s) using %s result: %s in %.3fs" %
+ "Checked port %s (%s) using %s result: %s in %.3fs" %
(port, ip_type, func_name, res, time.time() - s)
)
time.sleep(0.1)
if res["opened"] and not self.file_server.had_external_incoming:
res["opened"] = False
- self.log.warning("Port %s:%s, but no incoming connection" % (res["ip"], port))
+ self.log.warning("Port %s:%s looks opened, but no incoming connection" % (res["ip"], port))
break
except Exception as err:
self.log.warning(
@@ -87,41 +88,19 @@ class PeerPortchecker(object):
else:
raise Exception("Invalid response: %s" % message)
- def checkPortchecker(self, port):
- data = urllib.request.urlopen("https://portchecker.co", b"port=%s" % str(port).encode("ascii"), timeout=20.0).read().decode("utf8")
- message = re.match(r'.*
(.*?)
', data, re.DOTALL).group(1)
- message = re.sub(r"<.*?>", "", message.replace("
", " ").replace(" ", " ").strip()) # Strip http tags
+ def checkIpfingerprints(self, port):
+ data = self.requestUrl("https://www.ipfingerprints.com/portscan.php").read().decode("utf8")
+ ip = re.match(r'.*name="remoteHost".*?value="(.*?)"', data, re.DOTALL).group(1)
- match = re.match(r".*targetIP.*?value=\"(.*?)\"", data, re.DOTALL)
- if match:
- ip = match.group(1)
- else:
- raise Exception("Invalid response: %s" % message)
+ post_data = {
+ "remoteHost": ip, "start_port": port, "end_port": port,
+ "normalScan": "Yes", "scan_type": "connect2", "ping_type": "none"
+ }
+ message = self.requestUrl("https://www.ipfingerprints.com/scripts/getPortsInfo.php", post_data).read().decode("utf8")
if "open" in message:
return {"ip": ip, "opened": True}
- elif "closed" in message:
- return {"ip": ip, "opened": False}
- else:
- raise Exception("Invalid response: %s" % message)
-
- def checkSubnetonline(self, port):
- url = "https://www.subnetonline.com/pages/ipv6-network-tools/online-ipv6-port-scanner.php"
-
- data = self.requestUrl(url).read().decode("utf8")
-
- ip = re.match(r'.*Your IP is.*?name="host".*?value="(.*?)"', data, re.DOTALL).group(1)
- token = re.match(r'.*name="token".*?value="(.*?)"', data, re.DOTALL).group(1)
-
- post_data = {"host": ip, "port": port, "allow": "on", "token": token, "submit": "Scanning.."}
- data = self.requestUrl(url, post_data).read().decode("utf8")
-
- message = re.match(r".*(.*?)
", data, re.DOTALL).group(1)
- message = re.sub(r"<.*?>", "", message.replace("
", " ").replace(" ", " ").strip()) # Strip http tags
-
- if "online" in message:
- return {"ip": ip, "opened": True}
- elif "closed" in message:
+ elif "filtered" in message or "closed" in message:
return {"ip": ip, "opened": False}
else:
raise Exception("Invalid response: %s" % message)
@@ -165,9 +144,46 @@ class PeerPortchecker(object):
else:
raise Exception("Invalid response: %s" % message_text)
-if __name__ == "__main__":
- import time
- peer_portchecker = PeerPortchecker()
- for func_name in ["checkIpv6scanner", "checkMyaddr", "checkPortchecker", "checkCanyouseeme"]:
- s = time.time()
- print((func_name, getattr(peer_portchecker, func_name)(3894), "%.3fs" % (time.time() - s)))
+ def checkPortchecker(self, port): # Not working: Forbidden
+ data = self.requestUrl("https://portchecker.co").read().decode("utf8")
+ csrf = re.match(r'.*name="_csrf" value="(.*?)"', data, re.DOTALL).group(1)
+
+ data = self.requestUrl("https://portchecker.co", {"port": port, "_csrf": csrf}).read().decode("utf8")
+ message = re.match(r'.*(.*?)
', data, re.DOTALL).group(1)
+ message = re.sub(r"<.*?>", "", message.replace("
", " ").replace(" ", " ").strip()) # Strip http tags
+
+ match = re.match(r".*targetIP.*?value=\"(.*?)\"", data, re.DOTALL)
+ if match:
+ ip = match.group(1)
+ else:
+ raise Exception("Invalid response: %s" % message)
+
+ if "open" in message:
+ return {"ip": ip, "opened": True}
+ elif "closed" in message:
+ return {"ip": ip, "opened": False}
+ else:
+ raise Exception("Invalid response: %s" % message)
+
+ def checkSubnetonline(self, port): # Not working: Invalid response
+ url = "https://www.subnetonline.com/pages/ipv6-network-tools/online-ipv6-port-scanner.php"
+
+ data = self.requestUrl(url).read().decode("utf8")
+
+ ip = re.match(r'.*Your IP is.*?name="host".*?value="(.*?)"', data, re.DOTALL).group(1)
+ token = re.match(r'.*name="token".*?value="(.*?)"', data, re.DOTALL).group(1)
+
+ post_data = {"host": ip, "port": port, "allow": "on", "token": token, "submit": "Scanning.."}
+ data = self.requestUrl(url, post_data).read().decode("utf8")
+
+ print(post_data, data)
+
+ message = re.match(r".*(.*?)
", data, re.DOTALL).group(1)
+ message = re.sub(r"<.*?>", "", message.replace("
", " ").replace(" ", " ").strip()) # Strip http tags
+
+ if "online" in message:
+ return {"ip": ip, "opened": True}
+ elif "closed" in message:
+ return {"ip": ip, "opened": False}
+ else:
+ raise Exception("Invalid response: %s" % message)