New msgpack compatible stream handling
This commit is contained in:
parent
4b9455e84d
commit
f10233a0a6
1 changed files with 71 additions and 55 deletions
|
@ -163,7 +163,7 @@ class Connection(object):
|
|||
buff_len = 0
|
||||
req_len = 0
|
||||
|
||||
self.unpacker = msgpack.fallback.Unpacker() # Due memory problems of C version
|
||||
unpacker_bytes = 0
|
||||
try:
|
||||
while not self.closed:
|
||||
buff = self.sock.recv(64 * 1024)
|
||||
|
@ -180,75 +180,86 @@ class Connection(object):
|
|||
|
||||
if not self.unpacker:
|
||||
self.unpacker = msgpack.fallback.Unpacker()
|
||||
self.unpacker.feed(buff)
|
||||
buff = None
|
||||
for message in self.unpacker:
|
||||
try:
|
||||
# Stats
|
||||
self.incomplete_buff_recv = 0
|
||||
stat_key = message.get("cmd", "unknown")
|
||||
if stat_key == "response" and "to" in message:
|
||||
cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"]
|
||||
stat_key = "response: %s" % cmd_sent
|
||||
self.server.stat_recv[stat_key]["bytes"] += req_len
|
||||
self.server.stat_recv[stat_key]["num"] += 1
|
||||
if "stream_bytes" in message:
|
||||
self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"]
|
||||
req_len = 0
|
||||
unpacker_bytes = 0
|
||||
|
||||
# Handle message
|
||||
if "stream_bytes" in message:
|
||||
self.handleStream(message)
|
||||
else:
|
||||
self.handleMessage(message)
|
||||
except TypeError:
|
||||
raise Exception("Invalid message type: %s" % type(message))
|
||||
self.unpacker.feed(buff)
|
||||
unpacker_bytes += buff_len
|
||||
|
||||
for message in self.unpacker:
|
||||
if not type(message) is dict:
|
||||
raise Exception(
|
||||
"Invalid message type: %s, content: %r, buffer: %r" %
|
||||
(type(message), message, buff[0:16])
|
||||
)
|
||||
|
||||
# Stats
|
||||
self.incomplete_buff_recv = 0
|
||||
stat_key = message.get("cmd", "unknown")
|
||||
if stat_key == "response" and "to" in message:
|
||||
cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"]
|
||||
stat_key = "response: %s" % cmd_sent
|
||||
if stat_key == "update":
|
||||
stat_key = "update: %s" % message["params"]["site"]
|
||||
self.server.stat_recv[stat_key]["bytes"] += req_len
|
||||
self.server.stat_recv[stat_key]["num"] += 1
|
||||
if "stream_bytes" in message:
|
||||
self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"]
|
||||
req_len = 0
|
||||
|
||||
# Handle message
|
||||
if "stream_bytes" in message:
|
||||
buff_left = self.handleStream(message, self.unpacker, buff, unpacker_bytes)
|
||||
self.unpacker = msgpack.fallback.Unpacker()
|
||||
self.unpacker.feed(buff_left)
|
||||
unpacker_bytes = len(buff_left)
|
||||
if config.debug_socket:
|
||||
self.log("Start new unpacker with buff_left: %r" % buff_left)
|
||||
break
|
||||
else:
|
||||
self.handleMessage(message)
|
||||
|
||||
message = None
|
||||
except Exception, err:
|
||||
except Exception as err:
|
||||
if not self.closed:
|
||||
self.log("Socket error: %s" % Debug.formatException(err))
|
||||
self.close("MessageLoop ended") # MessageLoop ended, close connection
|
||||
self.server.stat_recv["error: %s" % err]["bytes"] += req_len
|
||||
self.server.stat_recv["error: %s" % err]["num"] += 1
|
||||
self.close("MessageLoop ended (closed: %s)" % self.closed) # MessageLoop ended, close connection
|
||||
|
||||
# Stream socket directly to a file
|
||||
def handleStream(self, message):
|
||||
|
||||
read_bytes = message["stream_bytes"] # Bytes left we have to read from socket
|
||||
# Check if the unpacker has something left in buffer
|
||||
if hasattr(self.unpacker, "_buffer"): # New version of msgpack
|
||||
bytes_buffer_left = len(self.unpacker._buffer) - self.unpacker.tell()
|
||||
else:
|
||||
bytes_buffer_left = self.unpacker._fb_buf_n - self.unpacker._fb_buf_o
|
||||
|
||||
extradata_len = min(bytes_buffer_left, read_bytes)
|
||||
if extradata_len:
|
||||
buff = self.unpacker.read_bytes(extradata_len)
|
||||
# Get rid of extra data from buffer
|
||||
if hasattr(self.unpacker, "_consume"):
|
||||
self.unpacker._consume()
|
||||
else:
|
||||
self.unpacker._fb_consume()
|
||||
else:
|
||||
buff = ""
|
||||
|
||||
def handleStream(self, message, unpacker, buff, unpacker_bytes):
|
||||
stream_bytes_left = message["stream_bytes"]
|
||||
file = self.waiting_streams[message["to"]]
|
||||
if buff:
|
||||
read_bytes -= len(buff)
|
||||
file.write(buff)
|
||||
|
||||
if "tell" in dir(unpacker):
|
||||
unpacker_unprocessed_bytes = unpacker_bytes - unpacker.tell()
|
||||
else:
|
||||
unpacker_unprocessed_bytes = unpacker._fb_buf_n - unpacker._fb_buf_o
|
||||
|
||||
if unpacker_unprocessed_bytes: # Found stream bytes in unpacker
|
||||
unpacker_stream_bytes = min(unpacker_unprocessed_bytes, stream_bytes_left)
|
||||
buff_stream_start = len(buff) - unpacker_unprocessed_bytes
|
||||
file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes])
|
||||
stream_bytes_left -= unpacker_stream_bytes
|
||||
else:
|
||||
unpacker_stream_bytes = 0
|
||||
|
||||
if config.debug_socket:
|
||||
self.log("Starting stream %s: %s bytes (%s from unpacker)" % (message["to"], message["stream_bytes"], len(buff)))
|
||||
self.log(
|
||||
"Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" %
|
||||
(message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unpacker_unprocessed_bytes)
|
||||
)
|
||||
|
||||
try:
|
||||
while 1:
|
||||
if read_bytes <= 0:
|
||||
if stream_bytes_left <= 0:
|
||||
break
|
||||
buff = self.sock.recv(min(64 * 1024, read_bytes))
|
||||
if not buff:
|
||||
stream_buff = self.sock.recv(min(64 * 1024, stream_bytes_left))
|
||||
if not stream_buff:
|
||||
break
|
||||
buff_len = len(buff)
|
||||
read_bytes -= buff_len
|
||||
file.write(buff)
|
||||
buff_len = len(stream_buff)
|
||||
stream_bytes_left -= buff_len
|
||||
file.write(stream_buff)
|
||||
|
||||
# Statistics
|
||||
self.last_recv_time = time.time()
|
||||
|
@ -266,6 +277,11 @@ class Connection(object):
|
|||
del self.waiting_streams[message["to"]]
|
||||
del self.waiting_requests[message["to"]]
|
||||
|
||||
if unpacker_stream_bytes:
|
||||
return buff[buff_stream_start + unpacker_stream_bytes:]
|
||||
else:
|
||||
return ""
|
||||
|
||||
# My handshake info
|
||||
def getHandshakeInfo(self):
|
||||
# No TLS for onion connections
|
||||
|
|
Loading…
Reference in a new issue