From f10233a0a600c187f053db8d29563de1034e8b10 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Tue, 27 Feb 2018 02:46:26 +0100 Subject: [PATCH] New msgpack compatible stream handling --- src/Connection/Connection.py | 126 ++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 55 deletions(-) diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index e7c43de3..d6778690 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -163,7 +163,7 @@ class Connection(object): buff_len = 0 req_len = 0 - self.unpacker = msgpack.fallback.Unpacker() # Due memory problems of C version + unpacker_bytes = 0 try: while not self.closed: buff = self.sock.recv(64 * 1024) @@ -180,75 +180,86 @@ class Connection(object): if not self.unpacker: self.unpacker = msgpack.fallback.Unpacker() - self.unpacker.feed(buff) - buff = None - for message in self.unpacker: - try: - # Stats - self.incomplete_buff_recv = 0 - stat_key = message.get("cmd", "unknown") - if stat_key == "response" and "to" in message: - cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"] - stat_key = "response: %s" % cmd_sent - self.server.stat_recv[stat_key]["bytes"] += req_len - self.server.stat_recv[stat_key]["num"] += 1 - if "stream_bytes" in message: - self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"] - req_len = 0 + unpacker_bytes = 0 - # Handle message - if "stream_bytes" in message: - self.handleStream(message) - else: - self.handleMessage(message) - except TypeError: - raise Exception("Invalid message type: %s" % type(message)) + self.unpacker.feed(buff) + unpacker_bytes += buff_len + + for message in self.unpacker: + if not type(message) is dict: + raise Exception( + "Invalid message type: %s, content: %r, buffer: %r" % + (type(message), message, buff[0:16]) + ) + + # Stats + self.incomplete_buff_recv = 0 + stat_key = message.get("cmd", "unknown") + if stat_key == "response" and "to" in message: + cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"] + stat_key = "response: %s" % cmd_sent + if stat_key == "update": + stat_key = "update: %s" % message["params"]["site"] + self.server.stat_recv[stat_key]["bytes"] += req_len + self.server.stat_recv[stat_key]["num"] += 1 + if "stream_bytes" in message: + self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"] + req_len = 0 + + # Handle message + if "stream_bytes" in message: + buff_left = self.handleStream(message, self.unpacker, buff, unpacker_bytes) + self.unpacker = msgpack.fallback.Unpacker() + self.unpacker.feed(buff_left) + unpacker_bytes = len(buff_left) + if config.debug_socket: + self.log("Start new unpacker with buff_left: %r" % buff_left) + break + else: + self.handleMessage(message) message = None - except Exception, err: + except Exception as err: if not self.closed: self.log("Socket error: %s" % Debug.formatException(err)) - self.close("MessageLoop ended") # MessageLoop ended, close connection + self.server.stat_recv["error: %s" % err]["bytes"] += req_len + self.server.stat_recv["error: %s" % err]["num"] += 1 + self.close("MessageLoop ended (closed: %s)" % self.closed) # MessageLoop ended, close connection # Stream socket directly to a file - def handleStream(self, message): - - read_bytes = message["stream_bytes"] # Bytes left we have to read from socket - # Check if the unpacker has something left in buffer - if hasattr(self.unpacker, "_buffer"): # New version of msgpack - bytes_buffer_left = len(self.unpacker._buffer) - self.unpacker.tell() - else: - bytes_buffer_left = self.unpacker._fb_buf_n - self.unpacker._fb_buf_o - - extradata_len = min(bytes_buffer_left, read_bytes) - if extradata_len: - buff = self.unpacker.read_bytes(extradata_len) - # Get rid of extra data from buffer - if hasattr(self.unpacker, "_consume"): - self.unpacker._consume() - else: - self.unpacker._fb_consume() - else: - buff = "" - + def handleStream(self, message, unpacker, buff, unpacker_bytes): + stream_bytes_left = message["stream_bytes"] file = self.waiting_streams[message["to"]] - if buff: - read_bytes -= len(buff) - file.write(buff) + + if "tell" in dir(unpacker): + unpacker_unprocessed_bytes = unpacker_bytes - unpacker.tell() + else: + unpacker_unprocessed_bytes = unpacker._fb_buf_n - unpacker._fb_buf_o + + if unpacker_unprocessed_bytes: # Found stream bytes in unpacker + unpacker_stream_bytes = min(unpacker_unprocessed_bytes, stream_bytes_left) + buff_stream_start = len(buff) - unpacker_unprocessed_bytes + file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes]) + stream_bytes_left -= unpacker_stream_bytes + else: + unpacker_stream_bytes = 0 if config.debug_socket: - self.log("Starting stream %s: %s bytes (%s from unpacker)" % (message["to"], message["stream_bytes"], len(buff))) + self.log( + "Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" % + (message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unpacker_unprocessed_bytes) + ) try: while 1: - if read_bytes <= 0: + if stream_bytes_left <= 0: break - buff = self.sock.recv(min(64 * 1024, read_bytes)) - if not buff: + stream_buff = self.sock.recv(min(64 * 1024, stream_bytes_left)) + if not stream_buff: break - buff_len = len(buff) - read_bytes -= buff_len - file.write(buff) + buff_len = len(stream_buff) + stream_bytes_left -= buff_len + file.write(stream_buff) # Statistics self.last_recv_time = time.time() @@ -266,6 +277,11 @@ class Connection(object): del self.waiting_streams[message["to"]] del self.waiting_requests[message["to"]] + if unpacker_stream_bytes: + return buff[buff_stream_start + unpacker_stream_bytes:] + else: + return "" + # My handshake info def getHandshakeInfo(self): # No TLS for onion connections