diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index a0bd458b..0fe24a66 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -187,6 +187,49 @@ class Connection(object): self.log("Socket error: %s" % Debug.formatException(err)) self.close("MessageLoop ended") # MessageLoop ended, close connection + # Stream socket directly to a file + def handleStream(self, message): + + read_bytes = message["stream_bytes"] # Bytes left we have to read from socket + try: + buff = self.unpacker.read_bytes(min(16 * 1024, read_bytes)) # Check if the unpacker has something left in buffer + except Exception, err: + buff = "" + file = self.waiting_streams[message["to"]] + if buff: + read_bytes -= len(buff) + file.write(buff) + + if config.debug_socket: + self.log("Starting stream %s: %s bytes (%s from unpacker)" % (message["to"], message["stream_bytes"], len(buff))) + + try: + while 1: + if read_bytes <= 0: + break + buff = self.sock.recv(16 * 1024) + if not buff: + break + buff_len = len(buff) + read_bytes -= buff_len + file.write(buff) + + # Statistics + self.last_recv_time = time.time() + self.incomplete_buff_recv += 1 + self.bytes_recv += buff_len + self.server.bytes_recv += buff_len + except Exception, err: + self.log("Stream read error: %s" % Debug.formatException(err)) + + if config.debug_socket: + self.log("End stream %s" % message["to"]) + + self.incomplete_buff_recv = 0 + self.waiting_requests[message["to"]].set(message) # Set the response to event + del self.waiting_streams[message["to"]] + del self.waiting_requests[message["to"]] + # My handshake info def getHandshakeInfo(self): # No TLS for onion connections @@ -320,49 +363,6 @@ class Connection(object): if not self.sock_wrapped and self.cert_pin: self.close("Crypt connection error: Socket not encrypted, but certificate pin present") - # Stream socket directly to a file - def handleStream(self, message): - - read_bytes = message["stream_bytes"] # Bytes left we have to read from socket - try: - buff = self.unpacker.read_bytes(min(16 * 1024, read_bytes)) # Check if the unpacker has something left in buffer - except Exception, err: - buff = "" - file = self.waiting_streams[message["to"]] - if buff: - read_bytes -= len(buff) - file.write(buff) - - if config.debug_socket: - self.log("Starting stream %s: %s bytes (%s from unpacker)" % (message["to"], message["stream_bytes"], len(buff))) - - try: - while 1: - if read_bytes <= 0: - break - buff = self.sock.recv(16 * 1024) - if not buff: - break - buff_len = len(buff) - read_bytes -= buff_len - file.write(buff) - - # Statistics - self.last_recv_time = time.time() - self.incomplete_buff_recv += 1 - self.bytes_recv += buff_len - self.server.bytes_recv += buff_len - except Exception, err: - self.log("Stream read error: %s" % Debug.formatException(err)) - - if config.debug_socket: - self.log("End stream %s" % message["to"]) - - self.incomplete_buff_recv = 0 - self.waiting_requests[message["to"]].set(message) # Set the response to event - del self.waiting_streams[message["to"]] - del self.waiting_requests[message["to"]] - # Send data to connection def send(self, message, streaming=False): self.last_send_time = time.time()