diff --git a/plugins/Stats/StatsPlugin.py b/plugins/Stats/StatsPlugin.py index 627bad88..52b64266 100644 --- a/plugins/Stats/StatsPlugin.py +++ b/plugins/Stats/StatsPlugin.py @@ -116,7 +116,7 @@ class UiRequestPlugin(object): # Sites yield "

Sites:" yield "" - yield "" + yield "" for site in self.server.sites.values(): yield self.formatTableRow([ ( @@ -133,7 +133,15 @@ class UiRequestPlugin(object): ]) yield "" yield "
address connected peers content.json
address connected peers content.json
" diff --git a/plugins/disabled-UiPassword/UiPasswordPlugin.py b/plugins/disabled-UiPassword/UiPasswordPlugin.py index c375fac8..a0e42e81 100644 --- a/plugins/disabled-UiPassword/UiPasswordPlugin.py +++ b/plugins/disabled-UiPassword/UiPasswordPlugin.py @@ -67,6 +67,7 @@ class UiRequestPlugin(object): @classmethod def cleanup(cls): + cls.last_cleanup = time.time() for session_id, session in cls.sessions.items(): if session["keep"] and time.time() - session["added"] > 60 * 60 * 24 * 60: # Max 60days for keep sessions del(cls.sessions[session_id]) diff --git a/src/Config.py b/src/Config.py index ac3958ff..5da6c2c0 100644 --- a/src/Config.py +++ b/src/Config.py @@ -8,7 +8,7 @@ class Config(object): def __init__(self, argv): self.version = "0.3.1" - self.rev = 307 + self.rev = 324 self.argv = argv self.action = None self.createParser() @@ -85,8 +85,9 @@ class Config(object): action.add_argument('peer_port', help='Peer port') action.add_argument('site', help='Site address') action.add_argument('filename', help='File name to request') + action.add_argument('--benchmark', help='Request file 10x then displays the total time', action='store_true') - # PeerGetFile + # PeerCmd action = self.subparsers.add_parser("peerCmd", help='Request and print a file content from peer') action.add_argument('peer_ip', help='Peer ip') action.add_argument('peer_port', help='Peer port') @@ -125,6 +126,10 @@ class Config(object): self.parser.add_argument('--disable_encryption', help='Disable connection encryption', action='store_true') self.parser.add_argument('--disable_sslcompression', help='Disable SSL compression to save memory', type='bool', choices=[True, False], default=True) + self.parser.add_argument('--use_tempfiles', help='Use temporary files when downloading (experimental)', + type='bool', choices=[True, False], default=False) + self.parser.add_argument('--stream_downloads', help='Stream download directly to files (experimental)', + type='bool', choices=[True, False], default=False) self.parser.add_argument('--coffeescript_compiler', help='Coffeescript compiler for developing', default=coffeescript, metavar='executable_path') diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index fe683091..8c166d4c 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -15,7 +15,7 @@ class Connection(object): "sock", "sock_wrapped", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "last_recv_time", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", - "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests" + "last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests", "waiting_streams" ) def __init__(self, server, ip, port, sock=None): @@ -56,6 +56,7 @@ class Connection(object): self.updateName() self.waiting_requests = {} # Waiting sent requests + self.waiting_streams = {} # Waiting response file streams def updateName(self): self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) @@ -116,18 +117,25 @@ class Connection(object): buff = self.sock.recv(16 * 1024) if not buff: break # Connection closed + + # Statistics self.last_recv_time = time.time() self.incomplete_buff_recv += 1 self.bytes_recv += len(buff) self.server.bytes_recv += len(buff) + if not self.unpacker: self.unpacker = msgpack.Unpacker() self.unpacker.feed(buff) + buff = None for message in self.unpacker: self.incomplete_buff_recv = 0 - self.handleMessage(message) + if "stream_bytes" in message: + self.handleStream(message) + else: + self.handleMessage(message) + message = None - buff = None except Exception, err: if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) @@ -209,6 +217,46 @@ class Connection(object): self.waiting_requests[last_req_id].set(message) del self.waiting_requests[last_req_id] # Remove from waiting request + # Stream socket directly to a file + def handleStream(self, message): + if config.debug_socket: + self.log("Starting stream %s: %s bytes" % (message["to"], message["stream_bytes"])) + + read_bytes = message["stream_bytes"] # Bytes left we have to read from socket + try: + buff = self.unpacker.read_bytes(min(16 * 1024, read_bytes)) # Check if the unpacker has something left in buffer + except Exception, err: + buff = "" + file = self.waiting_streams[message["to"]] + if buff: + read_bytes -= len(buff) + file.write(buff) + + try: + while 1: + if read_bytes <= 0: + break + buff = self.sock.recv(16 * 1024) + buff_len = len(buff) + read_bytes -= buff_len + file.write(buff) + + # Statistics + self.last_recv_time = time.time() + self.incomplete_buff_recv += 1 + self.bytes_recv += buff_len + self.server.bytes_recv += buff_len + except Exception, err: + self.log("Stream read error: %s" % Debug.formatException(err)) + + if config.debug_socket: + self.log("End stream %s" % message["to"]) + + self.incomplete_buff_recv = 0 + self.waiting_requests[message["to"]].set(message) # Set the response to event + del self.waiting_streams[message["to"]] + del self.waiting_requests[message["to"]] + # Send data to connection def send(self, message, streaming=False): if config.debug_socket: @@ -218,22 +266,43 @@ class Connection(object): message.get("req_id")) ) self.last_send_time = time.time() - if streaming: - bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) - message = None - self.bytes_sent += bytes_sent - self.server.bytes_sent += bytes_sent - else: - data = msgpack.packb(message) - message = None - self.bytes_sent += len(data) - self.server.bytes_sent += len(data) - self.sock.sendall(data) + try: + if streaming: + bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) + message = None + self.bytes_sent += bytes_sent + self.server.bytes_sent += bytes_sent + else: + data = msgpack.packb(message) + message = None + self.bytes_sent += len(data) + self.server.bytes_sent += len(data) + self.sock.sendall(data) + except Exception, err: + self.log("Send errror: %s" % Debug.formatException(err)) + self.close() + return False self.last_sent_time = time.time() return True + # Stream raw file to connection + def sendRawfile(self, file, read_bytes): + buff = 64 * 1024 + bytes_left = read_bytes + while True: + self.last_send_time = time.time() + self.sock.sendall( + file.read(min(bytes_left, buff)) + ) + bytes_left -= buff + if bytes_left <= 0: + break + self.bytes_sent += read_bytes + self.server.bytes_sent += read_bytes + return True + # Create and send a request to peer - def request(self, cmd, params={}): + def request(self, cmd, params={}, stream_to=None): # Last command sent more than 10 sec ago, timeout if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time)) @@ -246,6 +315,8 @@ class Connection(object): data = {"cmd": cmd, "req_id": self.req_id, "params": params} event = gevent.event.AsyncResult() # Create new event for response self.waiting_requests[self.req_id] = event + if stream_to: + self.waiting_streams[self.req_id] = stream_to self.send(data) # Send request res = event.get() # Wait until event solves return res @@ -280,6 +351,7 @@ class Connection(object): for request in self.waiting_requests.values(): # Mark pending requests failed request.set(False) self.waiting_requests = {} + self.waiting_streams = {} self.server.removeConnection(self) # Remove connection from server registry try: if self.sock: diff --git a/src/Connection/ConnectionBenchmark.py b/src/Connection/ConnectionBenchmark.py deleted file mode 100644 index 5605398d..00000000 --- a/src/Connection/ConnectionBenchmark.py +++ /dev/null @@ -1,136 +0,0 @@ -import time, socket, msgpack -from cStringIO import StringIO - -print "Connecting..." -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(("localhost", 1234)) - - -print "1 Threaded: Send, receive 10000 ping request...", -s = time.time() -for i in range(10000): - sock.sendall(msgpack.packb({"cmd": "Ping"})) - req = sock.recv(16*1024) -print time.time()-s, repr(req), time.time()-s - - -print "1 Threaded: Send, receive, decode 10000 ping request...", -s = time.time() -unpacker = msgpack.Unpacker() -reqs = 0 -for i in range(10000): - sock.sendall(msgpack.packb({"cmd": "Ping"})) - unpacker.feed(sock.recv(16*1024)) - for req in unpacker: - reqs += 1 -print "Found:", req, "x", reqs, time.time()-s - - -print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...", -s = time.time() -unpacker = msgpack.Unpacker() -reqs = 0 -for i in range(1000): - sock.sendall(msgpack.packb({"cmd": "Ping"})) - unpacker.feed(sock.recv(16*1024)) - for req in unpacker: - reqs += 1 - sock.close() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("localhost", 1234)) -print "Found:", req, "x", reqs, time.time()-s - - -print "1 Threaded: Request, receive, decode 10000 x 10k data request...", -s = time.time() -unpacker = msgpack.Unpacker() -reqs = 0 -for i in range(10000): - sock.sendall(msgpack.packb({"cmd": "Bigdata"})) - - """buff = StringIO() - data = sock.recv(16*1024) - buff.write(data) - if not data: - break - while not data.endswith("\n"): - data = sock.recv(16*1024) - if not data: break - buff.write(data) - req = msgpack.unpackb(buff.getvalue().strip("\n")) - reqs += 1""" - - req_found = False - while not req_found: - buff = sock.recv(16*1024) - unpacker.feed(buff) - for req in unpacker: - reqs += 1 - req_found = True - break # Only process one request -print "Found:", len(req["res"]), "x", reqs, time.time()-s - - -print "10 Threaded: Request, receive, decode 10000 x 10k data request...", -import gevent -s = time.time() -reqs = 0 -req = None -def requester(): - global reqs, req - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("localhost", 1234)) - unpacker = msgpack.Unpacker() - for i in range(1000): - sock.sendall(msgpack.packb({"cmd": "Bigdata"})) - - req_found = False - while not req_found: - buff = sock.recv(16*1024) - unpacker.feed(buff) - for req in unpacker: - reqs += 1 - req_found = True - break # Only process one request - -threads = [] -for i in range(10): - threads.append(gevent.spawn(requester)) -gevent.joinall(threads) -print "Found:", len(req["res"]), "x", reqs, time.time()-s - - -print "1 Threaded: ZeroMQ Send, receive 1000 ping request...", -s = time.time() -import zmq.green as zmq -c = zmq.Context() -zmq_sock = c.socket(zmq.REQ) -zmq_sock.connect('tcp://127.0.0.1:1234') -for i in range(1000): - zmq_sock.send(msgpack.packb({"cmd": "Ping"})) - req = zmq_sock.recv(16*1024) -print "Found:", req, time.time()-s - - -print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...", -s = time.time() -import zmq.green as zmq -c = zmq.Context() -zmq_sock = c.socket(zmq.REQ) -zmq_sock.connect('tcp://127.0.0.1:1234') -for i in range(1000): - zmq_sock.send(msgpack.packb({"cmd": "Bigdata"})) - req = msgpack.unpackb(zmq_sock.recv(1024*1024)) -print "Found:", len(req["res"]), time.time()-s - - -print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...", -s = time.time() -import zmq.green as zmq -c = zmq.Context() -zmq_sock = c.socket(zmq.REQ) -zmq_sock.connect('tcp://127.0.0.1:1233') -for i in range(1000): - zmq_sock.send(msgpack.packb({"cmd": "Bigdata"})) - req = msgpack.unpackb(zmq_sock.recv(1024*1024)) -print "Found:", len(req["res"]), time.time()-s \ No newline at end of file diff --git a/src/File/FileRequest.py b/src/File/FileRequest.py index b987e304..d147d18b 100644 --- a/src/File/FileRequest.py +++ b/src/File/FileRequest.py @@ -34,6 +34,10 @@ class FileRequest(object): if not self.connection.closed: self.connection.send(msg, streaming) + def sendRawfile(self, file, read_bytes): + if not self.connection.closed: + self.connection.sendRawfile(file, read_bytes) + def response(self, msg, streaming=False): if self.responded: self.log.debug("Req id %s already responded" % self.req_id) @@ -51,6 +55,8 @@ class FileRequest(object): if cmd == "getFile": self.actionGetFile(params) + elif cmd == "streamFile": + self.actionStreamFile(params) elif cmd == "update": event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"]) if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second @@ -157,6 +163,43 @@ class FileRequest(object): self.response({"error": "File read error: %s" % Debug.formatException(err)}) return False + # New-style file streaming out of Msgpack context + def actionStreamFile(self, params): + site = self.sites.get(params["site"]) + if not site or not site.settings["serving"]: # Site unknown or not serving + self.response({"error": "Unknown site"}) + return False + try: + if config.debug_socket: + self.log.debug("Opening file: %s" % params["inner_path"]) + with site.storage.open(params["inner_path"]) as file: + file.seek(params["location"]) + stream_bytes = min(FILE_BUFF, os.fstat(file.fileno()).st_size-params["location"]) + back = { + "size": os.fstat(file.fileno()).st_size, + "location": min(file.tell() + FILE_BUFF, os.fstat(file.fileno()).st_size), + "stream_bytes": stream_bytes + } + if config.debug_socket: + self.log.debug( + "Sending file %s from position %s to %s" % + (params["inner_path"], params["location"], back["location"]) + ) + self.response(back) + self.sendRawfile(file, read_bytes=FILE_BUFF) + if config.debug_socket: + self.log.debug("File %s sent" % params["inner_path"]) + + # Add peer to site if not added before + connected_peer = site.addPeer(self.connection.ip, self.connection.port) + if connected_peer: # Just added + connected_peer.connect(self.connection) # Assign current connection to peer + + except Exception, err: + self.log.debug("GetFile read error: %s" % Debug.formatException(err)) + self.response({"error": "File read error: %s" % Debug.formatException(err)}) + return False + # Peer exchange request def actionPex(self, params): site = self.sites.get(params["site"]) diff --git a/src/File/FileServer.py b/src/File/FileServer.py index a56daa5d..30c6e9da 100644 --- a/src/File/FileServer.py +++ b/src/File/FileServer.py @@ -184,6 +184,8 @@ class FileServer(ConnectionServer): if site.bad_files: site.retryBadFiles() + site.cleanupPeers() + # In passive mode keep 5 active peer connection to get the updates if self.port_opened is False: site.needConnections() diff --git a/src/Peer/Peer.py b/src/Peer/Peer.py index 7003ced0..6e70114c 100644 --- a/src/Peer/Peer.py +++ b/src/Peer/Peer.py @@ -7,12 +7,17 @@ import struct from cStringIO import StringIO from Debug import Debug +from Config import config +if config.use_tempfiles: + import tempfile # Communicate remote peers class Peer(object): - __slots__ = ("ip", "port", "site", "key", "connection_server", "connection", "last_found", "last_response", - "last_ping", "added", "connection_error", "hash_failed", "download_bytes", "download_time") + __slots__ = ( + "ip", "port", "site", "key", "connection_server", "connection", "last_found", "last_response", + "last_ping", "added", "connection_error", "hash_failed", "download_bytes", "download_time" + ) def __init__(self, ip, port, site=None): self.ip = ip @@ -22,7 +27,7 @@ class Peer(object): self.connection_server = sys.modules["main"].file_server self.connection = None - self.last_found = None # Time of last found in the torrent tracker + self.last_found = time.time() # Time of last found in the torrent tracker self.last_response = None # Time of last successful response from peer self.last_ping = None # Last response time for ping self.added = time.time() @@ -85,7 +90,7 @@ class Peer(object): self.last_found = time.time() # Send a command to peer - def request(self, cmd, params={}): + def request(self, cmd, params={}, stream_to=None): if not self.connection or self.connection.closed: self.connect() if not self.connection: @@ -94,7 +99,7 @@ class Peer(object): for retry in range(1, 3): # Retry 3 times try: - response = self.connection.request(cmd, params) + response = self.connection.request(cmd, params, stream_to) if not response: raise Exception("Send error") if "error" in response: @@ -120,8 +125,16 @@ class Peer(object): # Get a file content from peer def getFile(self, site, inner_path): + # Use streamFile if client supports it + if config.stream_downloads and self.connection and self.connection.handshake["rev"] > 310: + return self.streamFile(site, inner_path) + location = 0 - buff = StringIO() + if config.use_tempfiles: + buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') + else: + buff = StringIO() + s = time.time() while True: # Read in 512k parts back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location}) @@ -135,6 +148,33 @@ class Peer(object): break else: location = back["location"] + + self.download_bytes += back["location"] + self.download_time += (time.time() - s) + buff.seek(0) + return buff + + # Download file out of msgpack context to save memory and cpu + def streamFile(self, site, inner_path): + location = 0 + if config.use_tempfiles: + buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') + else: + buff = StringIO() + + s = time.time() + while True: # Read in 512k parts + back = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff) + + if not back: # Error + self.log("Invalid response: %s" % back) + return False + + if back["location"] == back["size"]: # End of file + break + else: + location = back["location"] + self.download_bytes += back["location"] self.download_time += (time.time() - s) buff.seek(0) diff --git a/src/Site/Site.py b/src/Site/Site.py index 774a6596..85cd07b8 100644 --- a/src/Site/Site.py +++ b/src/Site/Site.py @@ -252,7 +252,6 @@ class Site: # Update site by redownload all content.json def redownloadContents(self): - # Download all content.json again content_threads = [] for inner_path in self.content_manager.contents.keys(): @@ -449,7 +448,7 @@ class Site: return False # Ignore blacklist (eg. myself) key = "%s:%s" % (ip, port) if key in self.peers: # Already has this ip - # self.peers[key].found() + self.peers[key].found() if return_peer: # Always return peer return self.peers[key] else: @@ -651,17 +650,38 @@ class Site: break # Found requested number of peers if (not found and not ignore) or (need_num > 5 and need_num < 100 and len(found) < need_num): - # Return not that good peers: Not found any peer and the requester dont have any or cant give enought peer + # Return not that good peers: Not found any peer and the requester dont have any or cant give enough peer found = [peer for peer in peers if not peer.key.endswith(":0") and peer.key not in ignore][0:need_num - len(found)] return found + # Cleanup probably dead peers + def cleanupPeers(self): + peers = self.peers.values() + if len(peers) < 20: + return False + removed = 0 + + for peer in peers: + if peer.connection and peer.connection.connected: + continue + if peer.connection and not peer.connection.connected: + peer.connection = None # Dead connection + if time.time() - peer.last_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour + peer.remove() + removed += 1 + if removed > 5: # Don't remove too much at once + break + + if removed: + self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers))) + # - Events - # Add event listeners def addEventListeners(self): self.onFileStart = util.Event() # If WorkerManager added new task - self.onFileDone = util.Event() # If WorkerManager successfuly downloaded a file + self.onFileDone = util.Event() # If WorkerManager successfully downloaded a file self.onFileFail = util.Event() # If WorkerManager failed to download a file self.onComplete = util.Event() # All file finished diff --git a/src/Worker/Worker.py b/src/Worker/Worker.py index 033b71e7..09398e22 100644 --- a/src/Worker/Worker.py +++ b/src/Worker/Worker.py @@ -45,9 +45,9 @@ class Worker(object): try: buff = self.peer.getFile(site.address, task["inner_path"]) except Exception, err: - self.manager.log.debug("%s: getFile error: err" % (self.key, err)) + self.manager.log.debug("%s: getFile error: %s" % (self.key, err)) buff = None - if self.running is False: # Worker no longer needed or got killed + if self.running is False or task["done"] is True: # Worker no longer needed or got killed self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"])) break if buff: # Download ok diff --git a/src/lib/opensslVerify/opensslVerify.py b/src/lib/opensslVerify/opensslVerify.py index 8dc005a1..5fd9a76d 100644 --- a/src/lib/opensslVerify/opensslVerify.py +++ b/src/lib/opensslVerify/opensslVerify.py @@ -8,6 +8,7 @@ import ctypes import ctypes.util +import _ctypes import hashlib import base64 import time @@ -395,7 +396,6 @@ def ECDSA_SIG_recover_key_GFp(eckey, r, s, msg, msglen, recid, check): def closeLibrary(): - import _ctypes if "FreeLibrary" in dir(_ctypes): _ctypes.FreeLibrary(ssl._lib._handle) else: @@ -414,10 +414,12 @@ def getMessagePubkey(message, sig): mb = ctypes.create_string_buffer(size) ssl.i2o_ECPublicKey(eckey, ctypes.byref(ctypes.pointer(mb))) pub = mb.raw + """ if time.time() - ssl.time_opened > 60 * 5: # Reopen every 5 min logging.debug("Reopening OpenSSL...") closeLibrary() openLibrary() + """ return pub diff --git a/src/main.py b/src/main.py index a65ea500..7d3598a5 100644 --- a/src/main.py +++ b/src/main.py @@ -263,7 +263,7 @@ class Actions(object): print "Response time: %.3fs (crypt: %s)" % (time.time() - s, peer.connection.crypt) time.sleep(1) - def peerGetFile(self, peer_ip, peer_port, site, filename): + def peerGetFile(self, peer_ip, peer_port, site, filename, benchmark=False): logging.info("Opening a simple connection server") global file_server from Connection import ConnectionServer @@ -273,8 +273,13 @@ class Actions(object): logging.info("Getting %s/%s from peer: %s:%s..." % (site, filename, peer_ip, peer_port)) peer = Peer(peer_ip, peer_port) s = time.time() - print peer.getFile(site, filename).read() - print "Response time: %.3fs" % (time.time() - s) + peer.getFile(site, filename) + if benchmark: + for i in range(10): + print peer.getFile(site, filename), + print "Response time: %.3fs" % (time.time() - s) + raw_input("Check memory") + def peerCmd(self, peer_ip, peer_port, cmd, parameters): logging.info("Opening a simple connection server")