diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 1d105b5f..b7f828f2 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -2,8 +2,6 @@ import socket import time import gevent -import msgpack -import msgpack.fallback try: from gevent.coros import RLock except: @@ -11,17 +9,17 @@ except: from Config import config from Debug import Debug -from util import StreamingMsgpack +from util import Msgpack from Crypt import CryptConnection from util import helper class Connection(object): __slots__ = ( - "sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "req_id", "ip_type", + "sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type", "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection", "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock", - "last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "updateName", "waiting_requests", "waiting_streams" + "last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams" ) def __init__(self, server, ip, port, sock=None, target_onion=None, is_tracker_connection=False): @@ -46,6 +44,7 @@ class Connection(object): self.server = server self.unpacker = None # Stream incoming socket messages here + self.unpacker_bytes = 0 # How many bytes the unpacker received self.req_id = 0 # Last request id self.handshake = {} # Handshake info got from peer self.crypt = None # Connection encryption method @@ -102,7 +101,7 @@ class Connection(object): return "<%s>" % self.__str__() def log(self, text): - self.server.log.debug("%s > %s" % (self.name, text.decode("utf8", "ignore"))) + self.server.log.debug("%s > %s" % (self.name, text)) def getValidSites(self): return [key for key, val in self.server.tor_manager.site_onions.items() if val == self.target_onion] @@ -162,7 +161,7 @@ class Connection(object): self.sock.do_handshake() self.crypt = "tls-rsa" self.sock_wrapped = True - except Exception, err: + except Exception as err: if not config.force_encryption: self.log("Crypt connection error: %s, adding ip %s as broken ssl." % (err, self.ip)) self.server.broken_ssl_ips[self.ip] = True @@ -194,10 +193,16 @@ class Connection(object): self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", True) self.sock_wrapped = True self.crypt = "tls-rsa" - except Exception, err: + except Exception as err: self.log("Socket peek error: %s" % Debug.formatException(err)) self.messageLoop() + def getMsgpackUnpacker(self): + if self.handshake and self.handshake.get("use_bin_type"): + return Msgpack.getUnpacker(fallback=True, decode=False) + else: # Backward compatibility for <0.7.0 + return Msgpack.getUnpacker(fallback=True, decode=True) + # Message loop for connection def messageLoop(self): if not self.sock: @@ -208,7 +213,7 @@ class Connection(object): self.connected = True buff_len = 0 req_len = 0 - unpacker_bytes = 0 + self.unpacker_bytes = 0 try: while not self.closed: @@ -225,15 +230,15 @@ class Connection(object): req_len += buff_len if not self.unpacker: - self.unpacker = msgpack.fallback.Unpacker() - unpacker_bytes = 0 + self.unpacker = self.getMsgpackUnpacker() + self.unpacker_bytes = 0 self.unpacker.feed(buff) - unpacker_bytes += buff_len + self.unpacker_bytes += buff_len while True: try: - message = self.unpacker.next() + message = next(self.unpacker) except StopIteration: break if not type(message) is dict: @@ -257,10 +262,10 @@ class Connection(object): # Handle message if "stream_bytes" in message: - buff_left = self.handleStream(message, self.unpacker, buff, unpacker_bytes) - self.unpacker = msgpack.fallback.Unpacker() + buff_left = self.handleStream(message, buff) + self.unpacker = self.getMsgpackUnpacker() self.unpacker.feed(buff_left) - unpacker_bytes = len(buff_left) + self.unpacker_bytes = len(buff_left) if config.debug_socket: self.log("Start new unpacker with buff_left: %r" % buff_left) else: @@ -274,19 +279,23 @@ class Connection(object): self.server.stat_recv["error: %s" % err]["num"] += 1 self.close("MessageLoop ended (closed: %s)" % self.closed) # MessageLoop ended, close connection + def getUnpackerUnprocessedBytesNum(self): + if "tell" in dir(self.unpacker): + bytes_num = self.unpacker_bytes - self.unpacker.tell() + else: + bytes_num = self.unpacker._fb_buf_n - self.unpacker._fb_buf_o + return bytes_num + # Stream socket directly to a file - def handleStream(self, message, unpacker, buff, unpacker_bytes): + def handleStream(self, message, buff): stream_bytes_left = message["stream_bytes"] file = self.waiting_streams[message["to"]] - if "tell" in dir(unpacker): - unpacker_unprocessed_bytes = unpacker_bytes - unpacker.tell() - else: - unpacker_unprocessed_bytes = unpacker._fb_buf_n - unpacker._fb_buf_o + unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum() - if unpacker_unprocessed_bytes: # Found stream bytes in unpacker - unpacker_stream_bytes = min(unpacker_unprocessed_bytes, stream_bytes_left) - buff_stream_start = len(buff) - unpacker_unprocessed_bytes + if unprocessed_bytes_num: # Found stream bytes in unpacker + unpacker_stream_bytes = min(unprocessed_bytes_num, stream_bytes_left) + buff_stream_start = len(buff) - unprocessed_bytes_num file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes]) stream_bytes_left -= unpacker_stream_bytes else: @@ -295,7 +304,7 @@ class Connection(object): if config.debug_socket: self.log( "Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" % - (message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unpacker_unprocessed_bytes) + (message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unprocessed_bytes_num) ) try: @@ -351,6 +360,7 @@ class Connection(object): handshake = { "version": config.version, "protocol": "v2", + "use_bin_type": True, "peer_id": peer_id, "fileserver_port": self.server.port, "port_opened": self.server.port_opened.get(self.ip_type, None), @@ -390,6 +400,9 @@ class Connection(object): # Check if we can encrypt the connection if handshake.get("crypt_supported") and self.ip not in self.server.broken_ssl_ips: + if type(handshake["crypt_supported"][0]) is bytes: + handshake["crypt_supported"] = [item.decode() for item in handshake["crypt_supported"]] # Backward compatibility + if self.ip_type == "onion" or self.ip in config.ip_local: crypt = None elif handshake.get("crypt"): # Recommended crypt by server @@ -513,13 +526,13 @@ class Connection(object): self.server.stat_sent[stat_key]["num"] += 1 if streaming: with self.send_lock: - bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) + bytes_sent = Msgpack.stream(message, self.sock.sendall) self.bytes_sent += bytes_sent self.server.bytes_sent += bytes_sent self.server.stat_sent[stat_key]["bytes"] += bytes_sent message = None else: - data = msgpack.packb(message) + data = Msgpack.pack(message) self.bytes_sent += len(data) self.server.bytes_sent += len(data) self.server.stat_sent[stat_key]["bytes"] += len(data)