diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 456c36f0..c0b74c31 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -153,6 +153,7 @@ class Connection(object): self.updateName() self.connected = True buff_len = 0 + req_len = 0 self.unpacker = msgpack.fallback.Unpacker() # Due memory problems of C version try: @@ -167,6 +168,7 @@ class Connection(object): self.incomplete_buff_recv += 1 self.bytes_recv += buff_len self.server.bytes_recv += buff_len + req_len += buff_len if not self.unpacker: self.unpacker = msgpack.fallback.Unpacker() @@ -174,7 +176,19 @@ class Connection(object): buff = None for message in self.unpacker: try: + # Stats self.incomplete_buff_recv = 0 + stat_key = message.get("cmd", "unknown") + if stat_key == "response" and "to" in message: + cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"] + stat_key = "response: %s" % cmd_sent + self.server.stat_recv[stat_key]["bytes"] += req_len + self.server.stat_recv[stat_key]["num"] += 1 + if "stream_bytes" in message: + self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"] + req_len = 0 + + # Handle message if "stream_bytes" in message: self.handleStream(message) else: @@ -227,7 +241,7 @@ class Connection(object): self.log("End stream %s" % message["to"]) self.incomplete_buff_recv = 0 - self.waiting_requests[message["to"]].set(message) # Set the response to event + self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event del self.waiting_streams[message["to"]] del self.waiting_requests[message["to"]] @@ -306,7 +320,7 @@ class Connection(object): if self.last_send_time and len(self.waiting_requests) == 1: ping = time.time() - self.last_send_time self.last_ping_delay = ping - self.waiting_requests[message["to"]].set(message) # Set the response to event + self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event del self.waiting_requests[message["to"]] elif message["to"] == 0: # Other peers handshake ping = time.time() - self.start_time @@ -329,7 +343,7 @@ class Connection(object): self.setHandshake(message) else: self.log("Unknown response: %s" % message) - elif cmd: # Handhsake request + elif cmd: if cmd == "handshake": self.handleHandshake(message) else: @@ -338,7 +352,7 @@ class Connection(object): self.log("Unknown message, waiting: %s" % self.waiting_requests.keys()) if self.waiting_requests: last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true - self.waiting_requests[last_req_id].set(message) + self.waiting_requests[last_req_id]["evt"].set(message) del self.waiting_requests[last_req_id] # Remove from waiting request # Incoming handshake set request @@ -380,17 +394,25 @@ class Connection(object): return False try: + stat_key = message.get("cmd", "unknown") + if stat_key == "response": + stat_key = "response: %s" % self.last_cmd_recv + + self.server.stat_sent[stat_key]["num"] += 1 if streaming: with self.send_lock: bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) message = None self.bytes_sent += bytes_sent self.server.bytes_sent += bytes_sent + self.server.stat_sent[stat_key]["bytes"] += bytes_sent + message = None else: data = msgpack.packb(message) - message = None self.bytes_sent += len(data) self.server.bytes_sent += len(data) + self.server.stat_sent[stat_key]["bytes"] += len(data) + message = None with self.send_lock: self.sock.sendall(data) except Exception, err: @@ -414,13 +436,15 @@ class Connection(object): break self.bytes_sent += read_bytes self.server.bytes_sent += read_bytes + self.server.stat_sent["raw_file"]["num"] += 1 + self.server.stat_sent["raw_file"]["bytes"] += bytes_sent return True # Create and send a request to peer def request(self, cmd, params={}, stream_to=None): # Last command sent more than 10 sec ago, timeout if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: - self.close("Request %s timeout: %.3fs" % (self.last_cmd, time.time() - self.last_send_time)) + self.close("Request %s timeout: %.3fs" % (self.last_cmd_sent, time.time() - self.last_send_time)) return False self.last_req_time = time.time() @@ -428,7 +452,7 @@ class Connection(object): self.req_id += 1 data = {"cmd": cmd, "req_id": self.req_id, "params": params} event = gevent.event.AsyncResult() # Create new event for response - self.waiting_requests[self.req_id] = event + self.waiting_requests[self.req_id] = {"evt": event, "cmd": cmd} if stream_to: self.waiting_streams[self.req_id] = stream_to self.send(data) # Send request @@ -463,7 +487,7 @@ class Connection(object): (reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv) ) for request in self.waiting_requests.values(): # Mark pending requests failed - request.set(False) + request["evt"].set(False) self.waiting_requests = {} self.waiting_streams = {} self.sites = 0 diff --git a/src/Connection/ConnectionServer.py b/src/Connection/ConnectionServer.py index db921f8c..fe81a5a9 100644 --- a/src/Connection/ConnectionServer.py +++ b/src/Connection/ConnectionServer.py @@ -6,6 +6,7 @@ import gevent import msgpack from gevent.server import StreamServer from gevent.pool import Pool +from collections import defaultdict from Debug import Debug from Connection import Connection @@ -39,6 +40,8 @@ class ConnectionServer: self.running = True self.thread_checker = gevent.spawn(self.checkConnections) + self.stat_recv = defaultdict(lambda: defaultdict(int)) + self.stat_sent = defaultdict(lambda: defaultdict(int)) self.bytes_recv = 0 self.bytes_sent = 0