diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 38ea924f..3de71fc8 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -86,7 +86,7 @@ class UiRequestPlugin(object): len(main.file_server.connections), main.file_server.last_connection_id ) yield "" - yield "" + yield "" yield "" for connection in main.file_server.connections: if "cipher" in dir(connection.sock): @@ -102,6 +102,7 @@ class UiRequestPlugin(object): ("%s", (connection.crypt, cipher)), ("%6.3f", connection.last_ping_delay), ("%s", connection.incomplete_buff_recv), + ("%s", connection.bad_actions), ("since", max(connection.last_send_time, connection.last_recv_time)), ("since", connection.start_time), ("%.3f", connection.last_sent_time - connection.last_send_time), diff --git a/src/Config.py b/src/Config.py index 9eacdb63..ae85be1a 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.6" - self.rev = 989 + self.rev = 1015 self.argv = argv self.action = None self.config_file = "zeronet.conf" @@ -119,6 +119,7 @@ class Config(object): self.parser.add_argument('--verbose', help='More detailed logging', action='store_true') self.parser.add_argument('--debug', help='Debug mode', action='store_true') self.parser.add_argument('--debug_socket', help='Debug socket connections', action='store_true') + self.parser.add_argument('--debug_gevent', help='Debug gevent functions', action='store_true') self.parser.add_argument('--batch', help="Batch mode (No interactive input for commands)", action='store_true') @@ -150,7 +151,7 @@ class Config(object): type='bool', choices=[True, False], default=True) self.parser.add_argument('--keep_ssl_cert', help='Disable new SSL cert generation on startup', action='store_true') self.parser.add_argument('--max_files_opened', help='Change maximum opened files allowed by OS to this value on startup', - default=1024, type=int, metavar='limit') + default=2048, type=int, metavar='limit') self.parser.add_argument('--use_tempfiles', help='Use temporary files when downloading (experimental)', type='bool', choices=[True, False], default=False) self.parser.add_argument('--stream_downloads', help='Stream download directly to files (experimental)', diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index de67ee06..1a7b54b1 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -17,7 +17,7 @@ class Connection(object): "sock", "sock_wrapped", "ip", "port", "cert_pin", "site_lock", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "last_recv_time", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", - "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests", "waiting_streams" + "last_ping_delay", "last_req_time", "last_cmd", "bad_actions", "name", "updateName", "waiting_requests", "waiting_streams" ) def __init__(self, server, ip, port, sock=None, site_lock=None): @@ -56,6 +56,7 @@ class Connection(object): self.last_ping_delay = None self.last_req_time = 0 self.last_cmd = None + self.bad_actions = 0 self.name = None self.updateName() @@ -75,6 +76,12 @@ class Connection(object): def log(self, text): self.server.log.debug("%s > %s" % (self.name, text)) + def badAction(self, weight=1): + self.bad_actions += weight + + def goodAction(self): + self.bad_actions = 0 + # Open connection to peer and wait for handshake def connect(self): self.log("Connecting...") diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index 23f90482..6412ad30 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -164,7 +164,9 @@ class ConnectionServer: self.connections.remove(connection) def checkConnections(self): + run_i = 0 while self.running: + run_i += 1 time.sleep(60) # Check every minute self.ip_incoming = {} # Reset connected ips counter self.broken_ssl_peer_ids = {} # Reset broken ssl peerids count @@ -205,3 +207,11 @@ class ConnectionServer: elif idle > 60 and connection.protocol == "?": # No connection after 1 min connection.log("[Cleanup] Connect timeout: %s" % idle) connection.close() + + elif idle < 60 and connection.bad_actions > 40: + connection.log("[Cleanup] Too many bad actions: %s" % connection.bad_actions) + connection.close() + + elif run_i % 30 == 0: + # Reset bad action counter every 30 min + connection.bad_actions = 0 diff --git a/src/Db/Db.py b/src/Db/Db.py index e6c64bec..e5f0b06e 100644 --- a/src/Db/Db.py +++ b/src/Db/Db.py @@ -7,6 +7,7 @@ import os import gevent from DbCursor import DbCursor +from Config import config opened_dbs = [] @@ -45,7 +46,7 @@ class Db(object): def connect(self): if self not in opened_dbs: opened_dbs.append(self) - + s = time.time() self.log.debug("Connecting to %s (sqlite version: %s)..." % (self.db_path, sqlite3.version)) if not os.path.isdir(self.db_dir): # Directory not exist yet os.makedirs(self.db_dir) @@ -53,6 +54,8 @@ class Db(object): if not os.path.isfile(self.db_path): self.log.debug("Db file not exist yet: %s" % self.db_path) self.conn = sqlite3.connect(self.db_path) + if config.verbose: + self.log.debug("Connected to Db in %.3fs" % (time.time()-s)) self.conn.row_factory = sqlite3.Row self.conn.isolation_level = None self.cur = self.getCursor() @@ -62,6 +65,8 @@ class Db(object): self.cur.execute("PRAGMA synchronous = OFF") if self.foreign_keys: self.execute("PRAGMA foreign_keys = ON") + if config.verbose: + self.log.debug("Db is ready to use in %.3fs" % (time.time()-s)) # Execute query using dbcursor @@ -312,4 +317,4 @@ if __name__ == "__main__": cur.execute("COMMIT") print "Done in %.3fs" % (time.time() - s) for query, stats in sorted(dbjson.query_stats.items()): - print "-", query, stats + print "-", query, stats \ No newline at end of file diff --git a/src/Debug/Debug.py b/src/Debug/Debug.py index f3b54fd7..050adf4c 100644 --- a/src/Debug/Debug.py +++ b/src/Debug/Debug.py @@ -1,6 +1,7 @@ import sys import os import traceback +from Config import config # Non fatal exception @@ -28,6 +29,21 @@ def formatException(err=None, format="text"): else: return "%s: %s in %s" % (exc_type.__name__, err, " > ".join(tb)) +# Test if gevent eventloop blocks +if config.debug_gevent: + import logging + import gevent + import time + def testBlock(): + logging.debug("Gevent block checker started") + last_time = time.time() + while 1: + time.sleep(1) + if time.time()-last_time > 1.1: + logging.debug("Gevent block detected: %s" % (time.time()-last_time-1)) + last_time = time.time() + gevent.spawn(testBlock) + if __name__ == "__main__": try: diff --git a/src/Debug/DebugHook.py b/src/Debug/DebugHook.py index 1bb8e1e4..cfd97252 100644 --- a/src/Debug/DebugHook.py +++ b/src/Debug/DebugHook.py @@ -8,6 +8,13 @@ from Config import config last_error = None +def shutdown(): + try: + gevent.spawn(sys.modules["main"].file_server.stop) + gevent.spawn(sys.modules["main"].ui_server.stop) + except Exception, err: + print "Proper shutdown error: %s" % err + sys.exit(0) # Store last error, ignore notify, allow manual error logging def handleError(*args): @@ -19,6 +26,8 @@ def handleError(*args): silent = False if args[0].__name__ != "Notify": last_error = args + if args[0].__name__ == "KeyboardInterrupt": + shutdown() if not silent and args[0].__name__ != "Notify": logging.exception("Unhandled exception") sys.__excepthook__(*args) @@ -26,6 +35,8 @@ def handleError(*args): # Ignore notify errors def handleErrorNotify(*args): + if args[0].__name__ == "KeyboardInterrupt": + shutdown() if args[0].__name__ != "Notify": logging.exception("Unhandled exception") sys.__excepthook__(*args) diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index 19b68ae2..68163f7b 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -57,14 +57,16 @@ class FileRequest(object): if "site" in params and self.connection.site_lock and self.connection.site_lock not in (params["site"], "global"): self.response({"error": "Invalid site"}) self.log.error("Site lock violation: %s != %s" % (self.connection.site_lock != params["site"])) + self.connection.badAction(5) return False if cmd == "update": event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"]) if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second + time.sleep(5) self.response({"ok": "File update queued"}) # If called more than once within 10 sec only keep the last update - RateLimit.callAsync(event, 10, self.actionUpdate, params) + RateLimit.callAsync(event, max(self.connection.bad_actions, 10), self.actionUpdate, params) else: func_name = "action" + cmd[0].upper() + cmd[1:] func = getattr(self, func_name, None) @@ -81,8 +83,8 @@ class FileRequest(object): return False if site.settings["own"] and params["inner_path"].endswith("content.json"): self.log.debug( - "Someone trying to push a file to own site %s, reload local %s first" % - (site.address, params["inner_path"]) + "%s pushing a file to own site %s, reloading local %s first" % + (self.connection.ip, site.address, params["inner_path"]) ) changed, deleted = site.content_manager.loadContent(params["inner_path"], add_bad_files=False) if changed or deleted: # Content.json changed locally @@ -107,6 +109,7 @@ class FileRequest(object): ) self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]}) + self.connection.goodAction() elif valid is None: # Not changed if params.get("peer"): @@ -125,10 +128,12 @@ class FileRequest(object): site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) self.response({"ok": "File not changed"}) + self.connection.badAction() else: # Invalid sign or sha1 hash self.log.debug("Update for %s is invalid" % params["inner_path"]) self.response({"error": "File invalid"}) + self.connection.badAction(5) # Send file content request def actionGetFile(self, params): @@ -302,6 +307,7 @@ class FileRequest(object): site = self.sites.get(params["site"]) if not site or not site.settings["serving"]: # Site unknown or not serving self.response({"error": "Unknown site"}) + self.connection.badAction(5) return False found = site.worker_manager.findOptionalHashIds(params["hash_ids"]) @@ -340,6 +346,7 @@ class FileRequest(object): site = self.sites.get(params["site"]) if not site or not site.settings["serving"]: # Site unknown or not serving self.response({"error": "Unknown site"}) + self.connection.badAction(5) return False peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection) # Add or get peer @@ -375,3 +382,4 @@ class FileRequest(object): # Unknown command def actionUnknown(self, cmd, params): self.response({"error": "Unknown command: %s" % cmd}) + self.connection.badAction(5) diff --git a/src/Site/Site.py b/src/Site/Site.py index 9580bbe8..8dbb4c48 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -362,8 +362,8 @@ class Site(object): threads = 5 if limit == "default": if len(self.peers) > 50: - limit = 4 - threads = 4 + limit = 3 + threads = 3 else: limit = 5 diff --git a/src/Site/SiteStorage.py b/src/Site/SiteStorage.py index 8e1fad75..e6ca58a4 100644 --- a/src/Site/SiteStorage.py +++ b/src/Site/SiteStorage.py @@ -262,11 +262,15 @@ class SiteStorage: # Verify all files sha512sum using content.json def verifyFiles(self, quick_check=False, add_optional=False, add_changed=True): bad_files = [] + i = 0 if not self.site.content_manager.contents.get("content.json"): # No content.json, download it first self.site.needFile("content.json", update=True) # Force update to fix corrupt file self.site.content_manager.loadContent() # Reload content.json for content_inner_path, content in self.site.content_manager.contents.items(): + i += 1 + if i % 100 == 0: + time.sleep(0.0001) # Context switch to avoid gevent hangs if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file self.log.debug("[MISSING] %s" % content_inner_path) bad_files.append(content_inner_path) @@ -324,6 +328,7 @@ class SiteStorage: (content_inner_path, len(content["files"]), quick_check, bad_files, optional_added, optional_removed) ) + time.sleep(0.0001) # Context switch to avoid gevent hangs return bad_files # Check and try to fix site files integrity diff --git a/src/Ui/media/Wrapper.coffee b/src/Ui/media/Wrapper.coffee index b675d946..ab613eab 100644 --- a/src/Ui/media/Wrapper.coffee +++ b/src/Ui/media/Wrapper.coffee @@ -70,7 +70,8 @@ class Wrapper # Incoming message from inner frame onMessageInner: (e) => - if not window.postmessage_nonce_security and @opener == null # Test opener + # No nonce security enabled, test if window opener present + if not window.postmessage_nonce_security and @opener == null if window.opener @log "Opener present", window.opener @displayOpenerDialog() @@ -79,14 +80,17 @@ class Wrapper @opener = false message = e.data + # Invalid message (probably not for us) if not message.cmd return false + # Test nonce security to avoid third-party messages if window.postmessage_nonce_security and message.wrapper_nonce != window.wrapper_nonce @log "Message nonce error:", message.wrapper_nonce, '!=', window.wrapper_nonce @actionNotification({"params": ["error", "Message wrapper_nonce error, please report!"]}) window.removeEventListener("message", @onMessageInner) return + cmd = message.cmd if cmd == "innerReady" @inner_ready = true diff --git a/src/Ui/template/wrapper.html b/src/Ui/template/wrapper.html index 6dbb0c2f..54f2fcc5 100644 --- a/src/Ui/template/wrapper.html +++ b/src/Ui/template/wrapper.html @@ -34,7 +34,7 @@ if (window.opener && window.stop) window.stop()
0
- +
@@ -55,7 +55,7 @@ if (window.opener && window.stop) window.stop() - +
id proto type ip open crypt pingbuff idle open delay out in last sentbuff bad idle open delay out in last sentwaiting version peerid