Received and sent connection command statistics
This commit is contained in:
parent
24c1596048
commit
de360a8585
2 changed files with 35 additions and 8 deletions
|
@ -153,6 +153,7 @@ class Connection(object):
|
||||||
self.updateName()
|
self.updateName()
|
||||||
self.connected = True
|
self.connected = True
|
||||||
buff_len = 0
|
buff_len = 0
|
||||||
|
req_len = 0
|
||||||
|
|
||||||
self.unpacker = msgpack.fallback.Unpacker() # Due memory problems of C version
|
self.unpacker = msgpack.fallback.Unpacker() # Due memory problems of C version
|
||||||
try:
|
try:
|
||||||
|
@ -167,6 +168,7 @@ class Connection(object):
|
||||||
self.incomplete_buff_recv += 1
|
self.incomplete_buff_recv += 1
|
||||||
self.bytes_recv += buff_len
|
self.bytes_recv += buff_len
|
||||||
self.server.bytes_recv += buff_len
|
self.server.bytes_recv += buff_len
|
||||||
|
req_len += buff_len
|
||||||
|
|
||||||
if not self.unpacker:
|
if not self.unpacker:
|
||||||
self.unpacker = msgpack.fallback.Unpacker()
|
self.unpacker = msgpack.fallback.Unpacker()
|
||||||
|
@ -174,7 +176,19 @@ class Connection(object):
|
||||||
buff = None
|
buff = None
|
||||||
for message in self.unpacker:
|
for message in self.unpacker:
|
||||||
try:
|
try:
|
||||||
|
# Stats
|
||||||
self.incomplete_buff_recv = 0
|
self.incomplete_buff_recv = 0
|
||||||
|
stat_key = message.get("cmd", "unknown")
|
||||||
|
if stat_key == "response" and "to" in message:
|
||||||
|
cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"]
|
||||||
|
stat_key = "response: %s" % cmd_sent
|
||||||
|
self.server.stat_recv[stat_key]["bytes"] += req_len
|
||||||
|
self.server.stat_recv[stat_key]["num"] += 1
|
||||||
|
if "stream_bytes" in message:
|
||||||
|
self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"]
|
||||||
|
req_len = 0
|
||||||
|
|
||||||
|
# Handle message
|
||||||
if "stream_bytes" in message:
|
if "stream_bytes" in message:
|
||||||
self.handleStream(message)
|
self.handleStream(message)
|
||||||
else:
|
else:
|
||||||
|
@ -227,7 +241,7 @@ class Connection(object):
|
||||||
self.log("End stream %s" % message["to"])
|
self.log("End stream %s" % message["to"])
|
||||||
|
|
||||||
self.incomplete_buff_recv = 0
|
self.incomplete_buff_recv = 0
|
||||||
self.waiting_requests[message["to"]].set(message) # Set the response to event
|
self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event
|
||||||
del self.waiting_streams[message["to"]]
|
del self.waiting_streams[message["to"]]
|
||||||
del self.waiting_requests[message["to"]]
|
del self.waiting_requests[message["to"]]
|
||||||
|
|
||||||
|
@ -306,7 +320,7 @@ class Connection(object):
|
||||||
if self.last_send_time and len(self.waiting_requests) == 1:
|
if self.last_send_time and len(self.waiting_requests) == 1:
|
||||||
ping = time.time() - self.last_send_time
|
ping = time.time() - self.last_send_time
|
||||||
self.last_ping_delay = ping
|
self.last_ping_delay = ping
|
||||||
self.waiting_requests[message["to"]].set(message) # Set the response to event
|
self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event
|
||||||
del self.waiting_requests[message["to"]]
|
del self.waiting_requests[message["to"]]
|
||||||
elif message["to"] == 0: # Other peers handshake
|
elif message["to"] == 0: # Other peers handshake
|
||||||
ping = time.time() - self.start_time
|
ping = time.time() - self.start_time
|
||||||
|
@ -329,7 +343,7 @@ class Connection(object):
|
||||||
self.setHandshake(message)
|
self.setHandshake(message)
|
||||||
else:
|
else:
|
||||||
self.log("Unknown response: %s" % message)
|
self.log("Unknown response: %s" % message)
|
||||||
elif cmd: # Handhsake request
|
elif cmd:
|
||||||
if cmd == "handshake":
|
if cmd == "handshake":
|
||||||
self.handleHandshake(message)
|
self.handleHandshake(message)
|
||||||
else:
|
else:
|
||||||
|
@ -338,7 +352,7 @@ class Connection(object):
|
||||||
self.log("Unknown message, waiting: %s" % self.waiting_requests.keys())
|
self.log("Unknown message, waiting: %s" % self.waiting_requests.keys())
|
||||||
if self.waiting_requests:
|
if self.waiting_requests:
|
||||||
last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true
|
last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true
|
||||||
self.waiting_requests[last_req_id].set(message)
|
self.waiting_requests[last_req_id]["evt"].set(message)
|
||||||
del self.waiting_requests[last_req_id] # Remove from waiting request
|
del self.waiting_requests[last_req_id] # Remove from waiting request
|
||||||
|
|
||||||
# Incoming handshake set request
|
# Incoming handshake set request
|
||||||
|
@ -380,17 +394,25 @@ class Connection(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
stat_key = message.get("cmd", "unknown")
|
||||||
|
if stat_key == "response":
|
||||||
|
stat_key = "response: %s" % self.last_cmd_recv
|
||||||
|
|
||||||
|
self.server.stat_sent[stat_key]["num"] += 1
|
||||||
if streaming:
|
if streaming:
|
||||||
with self.send_lock:
|
with self.send_lock:
|
||||||
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
|
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
|
||||||
message = None
|
message = None
|
||||||
self.bytes_sent += bytes_sent
|
self.bytes_sent += bytes_sent
|
||||||
self.server.bytes_sent += bytes_sent
|
self.server.bytes_sent += bytes_sent
|
||||||
|
self.server.stat_sent[stat_key]["bytes"] += bytes_sent
|
||||||
|
message = None
|
||||||
else:
|
else:
|
||||||
data = msgpack.packb(message)
|
data = msgpack.packb(message)
|
||||||
message = None
|
|
||||||
self.bytes_sent += len(data)
|
self.bytes_sent += len(data)
|
||||||
self.server.bytes_sent += len(data)
|
self.server.bytes_sent += len(data)
|
||||||
|
self.server.stat_sent[stat_key]["bytes"] += len(data)
|
||||||
|
message = None
|
||||||
with self.send_lock:
|
with self.send_lock:
|
||||||
self.sock.sendall(data)
|
self.sock.sendall(data)
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
|
@ -414,13 +436,15 @@ class Connection(object):
|
||||||
break
|
break
|
||||||
self.bytes_sent += read_bytes
|
self.bytes_sent += read_bytes
|
||||||
self.server.bytes_sent += read_bytes
|
self.server.bytes_sent += read_bytes
|
||||||
|
self.server.stat_sent["raw_file"]["num"] += 1
|
||||||
|
self.server.stat_sent["raw_file"]["bytes"] += bytes_sent
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Create and send a request to peer
|
# Create and send a request to peer
|
||||||
def request(self, cmd, params={}, stream_to=None):
|
def request(self, cmd, params={}, stream_to=None):
|
||||||
# Last command sent more than 10 sec ago, timeout
|
# Last command sent more than 10 sec ago, timeout
|
||||||
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
|
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
|
||||||
self.close("Request %s timeout: %.3fs" % (self.last_cmd, time.time() - self.last_send_time))
|
self.close("Request %s timeout: %.3fs" % (self.last_cmd_sent, time.time() - self.last_send_time))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self.last_req_time = time.time()
|
self.last_req_time = time.time()
|
||||||
|
@ -428,7 +452,7 @@ class Connection(object):
|
||||||
self.req_id += 1
|
self.req_id += 1
|
||||||
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
|
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
|
||||||
event = gevent.event.AsyncResult() # Create new event for response
|
event = gevent.event.AsyncResult() # Create new event for response
|
||||||
self.waiting_requests[self.req_id] = event
|
self.waiting_requests[self.req_id] = {"evt": event, "cmd": cmd}
|
||||||
if stream_to:
|
if stream_to:
|
||||||
self.waiting_streams[self.req_id] = stream_to
|
self.waiting_streams[self.req_id] = stream_to
|
||||||
self.send(data) # Send request
|
self.send(data) # Send request
|
||||||
|
@ -463,7 +487,7 @@ class Connection(object):
|
||||||
(reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv)
|
(reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv)
|
||||||
)
|
)
|
||||||
for request in self.waiting_requests.values(): # Mark pending requests failed
|
for request in self.waiting_requests.values(): # Mark pending requests failed
|
||||||
request.set(False)
|
request["evt"].set(False)
|
||||||
self.waiting_requests = {}
|
self.waiting_requests = {}
|
||||||
self.waiting_streams = {}
|
self.waiting_streams = {}
|
||||||
self.sites = 0
|
self.sites = 0
|
||||||
|
|
|
@ -6,6 +6,7 @@ import gevent
|
||||||
import msgpack
|
import msgpack
|
||||||
from gevent.server import StreamServer
|
from gevent.server import StreamServer
|
||||||
from gevent.pool import Pool
|
from gevent.pool import Pool
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
from Debug import Debug
|
from Debug import Debug
|
||||||
from Connection import Connection
|
from Connection import Connection
|
||||||
|
@ -39,6 +40,8 @@ class ConnectionServer:
|
||||||
self.running = True
|
self.running = True
|
||||||
self.thread_checker = gevent.spawn(self.checkConnections)
|
self.thread_checker = gevent.spawn(self.checkConnections)
|
||||||
|
|
||||||
|
self.stat_recv = defaultdict(lambda: defaultdict(int))
|
||||||
|
self.stat_sent = defaultdict(lambda: defaultdict(int))
|
||||||
self.bytes_recv = 0
|
self.bytes_recv = 0
|
||||||
self.bytes_sent = 0
|
self.bytes_sent = 0
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue