
version and revision are still accessible to users to see, but not the other nodes. this will make it harder to collect information for both informational, but also malicious purposes
635 lines
26 KiB
Python
635 lines
26 KiB
Python
import socket
|
|
import time
|
|
|
|
import gevent
|
|
try:
|
|
from gevent.coros import RLock
|
|
except:
|
|
from gevent.lock import RLock
|
|
|
|
from Config import config
|
|
from Debug import Debug
|
|
from util import Msgpack
|
|
from Crypt import CryptConnection
|
|
from util import helper
|
|
|
|
|
|
class Connection(object):
|
|
__slots__ = (
|
|
"sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type",
|
|
"handshake", "crypt", "connected", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection",
|
|
"last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock",
|
|
"last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams"
|
|
)
|
|
|
|
def __init__(self, server, ip, port, sock=None, target_onion=None, is_tracker_connection=False):
|
|
self.sock = sock
|
|
self.cert_pin = None
|
|
if "#" in ip:
|
|
ip, self.cert_pin = ip.split("#")
|
|
self.target_onion = target_onion # Requested onion adress
|
|
self.id = server.last_connection_id
|
|
server.last_connection_id += 1
|
|
self.protocol = "?"
|
|
self.type = "?"
|
|
self.ip_type = "?"
|
|
self.port = int(port)
|
|
self.setIp(ip)
|
|
|
|
if helper.isPrivateIp(self.ip) and self.ip not in config.ip_local:
|
|
self.is_private_ip = True
|
|
else:
|
|
self.is_private_ip = False
|
|
self.is_tracker_connection = is_tracker_connection
|
|
|
|
self.server = server
|
|
self.unpacker = None # Stream incoming socket messages here
|
|
self.unpacker_bytes = 0 # How many bytes the unpacker received
|
|
self.req_id = 0 # Last request id
|
|
self.handshake = {} # Handshake info got from peer
|
|
self.crypt = None # Connection encryption method
|
|
self.sock_wrapped = False # Socket wrapped to encryption
|
|
|
|
self.connected = False
|
|
self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
|
|
self.closed = False
|
|
|
|
# Stats
|
|
self.start_time = time.time()
|
|
self.handshake_time = 0
|
|
self.last_recv_time = 0
|
|
self.last_message_time = 0
|
|
self.last_send_time = 0
|
|
self.last_sent_time = 0
|
|
self.incomplete_buff_recv = 0
|
|
self.bytes_recv = 0
|
|
self.bytes_sent = 0
|
|
self.last_ping_delay = None
|
|
self.last_req_time = 0
|
|
self.last_cmd_sent = None
|
|
self.last_cmd_recv = None
|
|
self.bad_actions = 0
|
|
self.sites = 0
|
|
self.cpu_time = 0.0
|
|
self.send_lock = RLock()
|
|
|
|
self.name = None
|
|
self.updateName()
|
|
|
|
self.waiting_requests = {} # Waiting sent requests
|
|
self.waiting_streams = {} # Waiting response file streams
|
|
|
|
def setIp(self, ip):
|
|
self.ip = ip
|
|
self.ip_type = helper.getIpType(ip)
|
|
self.updateName()
|
|
|
|
def createSocket(self):
|
|
if helper.getIpType(self.ip) == "ipv6" and not hasattr(socket, "socket_noproxy"):
|
|
# Create IPv6 connection as IPv4 when using proxy
|
|
return socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
|
else:
|
|
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def updateName(self):
|
|
self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def __repr__(self):
|
|
return "<%s>" % self.__str__()
|
|
|
|
def log(self, text):
|
|
self.server.log.debug("%s > %s" % (self.name, text))
|
|
|
|
def getValidSites(self):
|
|
return [key for key, val in self.server.tor_manager.site_onions.items() if val == self.target_onion]
|
|
|
|
def badAction(self, weight=1):
|
|
self.bad_actions += weight
|
|
if self.bad_actions > 40:
|
|
self.close("Too many bad actions")
|
|
elif self.bad_actions > 20:
|
|
time.sleep(5)
|
|
|
|
def goodAction(self):
|
|
self.bad_actions = 0
|
|
|
|
# Open connection to peer and wait for handshake
|
|
def connect(self):
|
|
self.type = "out"
|
|
if self.ip_type == "onion":
|
|
if not self.server.tor_manager or not self.server.tor_manager.enabled:
|
|
raise Exception("Can't connect to onion addresses, no Tor controller present")
|
|
self.sock = self.server.tor_manager.createSocket(self.ip, self.port)
|
|
elif config.tor == "always" and helper.isPrivateIp(self.ip) and self.ip not in config.ip_local:
|
|
raise Exception("Can't connect to local IPs in Tor: always mode")
|
|
elif config.trackers_proxy != "disable" and config.tor != "always" and self.is_tracker_connection:
|
|
if config.trackers_proxy == "tor":
|
|
self.sock = self.server.tor_manager.createSocket(self.ip, self.port)
|
|
else:
|
|
import socks
|
|
self.sock = socks.socksocket()
|
|
proxy_ip, proxy_port = config.trackers_proxy.split(":")
|
|
self.sock.set_proxy(socks.PROXY_TYPE_SOCKS5, proxy_ip, int(proxy_port))
|
|
else:
|
|
self.sock = self.createSocket()
|
|
|
|
if "TCP_NODELAY" in dir(socket):
|
|
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
|
timeout_before = self.sock.gettimeout()
|
|
self.sock.settimeout(30)
|
|
if self.ip_type == "ipv6" and not hasattr(self.sock, "proxy"):
|
|
sock_address = (self.ip, self.port, 1, 1)
|
|
else:
|
|
sock_address = (self.ip, self.port)
|
|
|
|
self.sock.connect(sock_address)
|
|
|
|
# Implicit SSL
|
|
should_encrypt = not self.ip_type == "onion" and self.ip not in self.server.broken_ssl_ips and self.ip not in config.ip_local
|
|
if self.cert_pin:
|
|
self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", cert_pin=self.cert_pin)
|
|
self.sock.do_handshake()
|
|
self.crypt = "tls-rsa"
|
|
self.sock_wrapped = True
|
|
elif should_encrypt and "tls-rsa" in CryptConnection.manager.crypt_supported:
|
|
try:
|
|
self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa")
|
|
self.sock.do_handshake()
|
|
self.crypt = "tls-rsa"
|
|
self.sock_wrapped = True
|
|
except Exception as err:
|
|
if not config.force_encryption:
|
|
self.log("Crypt connection error, adding %s:%s as broken ssl. %s" % (self.ip, self.port, Debug.formatException(err)))
|
|
self.server.broken_ssl_ips[self.ip] = True
|
|
self.sock.close()
|
|
self.crypt = None
|
|
self.sock = self.createSocket()
|
|
self.sock.settimeout(30)
|
|
self.sock.connect(sock_address)
|
|
|
|
# Detect protocol
|
|
self.send({"cmd": "handshake", "req_id": 0, "params": self.getHandshakeInfo()})
|
|
event_connected = self.event_connected
|
|
gevent.spawn(self.messageLoop)
|
|
connect_res = event_connected.get() # Wait for handshake
|
|
self.sock.settimeout(timeout_before)
|
|
return connect_res
|
|
|
|
# Handle incoming connection
|
|
def handleIncomingConnection(self, sock):
|
|
self.log("Incoming connection...")
|
|
|
|
if "TCP_NODELAY" in dir(socket):
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
|
self.type = "in"
|
|
if self.ip not in config.ip_local: # Clearnet: Check implicit SSL
|
|
try:
|
|
first_byte = sock.recv(1, gevent.socket.MSG_PEEK)
|
|
if first_byte == b"\x16":
|
|
self.log("Crypt in connection using implicit SSL")
|
|
self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", True)
|
|
self.sock_wrapped = True
|
|
self.crypt = "tls-rsa"
|
|
except Exception as err:
|
|
self.log("Socket peek error: %s" % Debug.formatException(err))
|
|
self.messageLoop()
|
|
|
|
def getMsgpackUnpacker(self):
|
|
if self.handshake and self.handshake.get("use_bin_type"):
|
|
return Msgpack.getUnpacker(fallback=True, decode=False)
|
|
else: # Backward compatibility for <0.7.0
|
|
return Msgpack.getUnpacker(fallback=True, decode=True)
|
|
|
|
# Message loop for connection
|
|
def messageLoop(self):
|
|
if not self.sock:
|
|
self.log("Socket error: No socket found")
|
|
return False
|
|
self.protocol = "v2"
|
|
self.updateName()
|
|
self.connected = True
|
|
buff_len = 0
|
|
req_len = 0
|
|
self.unpacker_bytes = 0
|
|
|
|
try:
|
|
while not self.closed:
|
|
buff = self.sock.recv(64 * 1024)
|
|
if not buff:
|
|
break # Connection closed
|
|
buff_len = len(buff)
|
|
|
|
# Statistics
|
|
self.last_recv_time = time.time()
|
|
self.incomplete_buff_recv += 1
|
|
self.bytes_recv += buff_len
|
|
self.server.bytes_recv += buff_len
|
|
req_len += buff_len
|
|
|
|
if not self.unpacker:
|
|
self.unpacker = self.getMsgpackUnpacker()
|
|
self.unpacker_bytes = 0
|
|
|
|
self.unpacker.feed(buff)
|
|
self.unpacker_bytes += buff_len
|
|
|
|
while True:
|
|
try:
|
|
message = next(self.unpacker)
|
|
except StopIteration:
|
|
break
|
|
if not type(message) is dict:
|
|
if config.debug_socket:
|
|
self.log("Invalid message type: %s, content: %r, buffer: %r" % (type(message), message, buff[0:16]))
|
|
raise Exception("Invalid message type: %s" % type(message))
|
|
|
|
# Stats
|
|
self.incomplete_buff_recv = 0
|
|
stat_key = message.get("cmd", "unknown")
|
|
if stat_key == "response" and "to" in message:
|
|
cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"]
|
|
stat_key = "response: %s" % cmd_sent
|
|
if stat_key == "update":
|
|
stat_key = "update: %s" % message["params"]["site"]
|
|
self.server.stat_recv[stat_key]["bytes"] += req_len
|
|
self.server.stat_recv[stat_key]["num"] += 1
|
|
if "stream_bytes" in message:
|
|
self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"]
|
|
req_len = 0
|
|
|
|
# Handle message
|
|
if "stream_bytes" in message:
|
|
buff_left = self.handleStream(message, buff)
|
|
self.unpacker = self.getMsgpackUnpacker()
|
|
self.unpacker.feed(buff_left)
|
|
self.unpacker_bytes = len(buff_left)
|
|
if config.debug_socket:
|
|
self.log("Start new unpacker with buff_left: %r" % buff_left)
|
|
else:
|
|
self.handleMessage(message)
|
|
|
|
message = None
|
|
except Exception as err:
|
|
if not self.closed:
|
|
self.log("Socket error: %s" % Debug.formatException(err))
|
|
self.server.stat_recv["error: %s" % err]["bytes"] += req_len
|
|
self.server.stat_recv["error: %s" % err]["num"] += 1
|
|
self.close("MessageLoop ended (closed: %s)" % self.closed) # MessageLoop ended, close connection
|
|
|
|
def getUnpackerUnprocessedBytesNum(self):
|
|
if "tell" in dir(self.unpacker):
|
|
bytes_num = self.unpacker_bytes - self.unpacker.tell()
|
|
else:
|
|
bytes_num = self.unpacker._fb_buf_n - self.unpacker._fb_buf_o
|
|
return bytes_num
|
|
|
|
# Stream socket directly to a file
|
|
def handleStream(self, message, buff):
|
|
stream_bytes_left = message["stream_bytes"]
|
|
file = self.waiting_streams[message["to"]]
|
|
|
|
unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum()
|
|
|
|
if unprocessed_bytes_num: # Found stream bytes in unpacker
|
|
unpacker_stream_bytes = min(unprocessed_bytes_num, stream_bytes_left)
|
|
buff_stream_start = len(buff) - unprocessed_bytes_num
|
|
file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes])
|
|
stream_bytes_left -= unpacker_stream_bytes
|
|
else:
|
|
unpacker_stream_bytes = 0
|
|
|
|
if config.debug_socket:
|
|
self.log(
|
|
"Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" %
|
|
(message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unprocessed_bytes_num)
|
|
)
|
|
|
|
try:
|
|
while 1:
|
|
if stream_bytes_left <= 0:
|
|
break
|
|
stream_buff = self.sock.recv(min(64 * 1024, stream_bytes_left))
|
|
if not stream_buff:
|
|
break
|
|
buff_len = len(stream_buff)
|
|
stream_bytes_left -= buff_len
|
|
file.write(stream_buff)
|
|
|
|
# Statistics
|
|
self.last_recv_time = time.time()
|
|
self.incomplete_buff_recv += 1
|
|
self.bytes_recv += buff_len
|
|
self.server.bytes_recv += buff_len
|
|
except Exception as err:
|
|
self.log("Stream read error: %s" % Debug.formatException(err))
|
|
|
|
if config.debug_socket:
|
|
self.log("End stream %s, file pos: %s" % (message["to"], file.tell()))
|
|
|
|
self.incomplete_buff_recv = 0
|
|
self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event
|
|
del self.waiting_streams[message["to"]]
|
|
del self.waiting_requests[message["to"]]
|
|
|
|
if unpacker_stream_bytes:
|
|
return buff[buff_stream_start + unpacker_stream_bytes:]
|
|
else:
|
|
return b""
|
|
|
|
# My handshake info
|
|
def getHandshakeInfo(self):
|
|
# No TLS for onion connections
|
|
if self.ip_type == "onion":
|
|
crypt_supported = []
|
|
elif self.ip in self.server.broken_ssl_ips:
|
|
crypt_supported = []
|
|
else:
|
|
crypt_supported = CryptConnection.manager.crypt_supported
|
|
# No peer id for onion connections
|
|
if self.ip_type == "onion" or self.ip in config.ip_local:
|
|
peer_id = ""
|
|
else:
|
|
peer_id = self.server.peer_id
|
|
# Setup peer lock from requested onion address
|
|
if self.handshake and self.handshake.get("target_ip", "").endswith(".onion") and self.server.tor_manager.start_onions:
|
|
self.target_onion = self.handshake.get("target_ip").replace(".onion", "") # My onion address
|
|
if not self.server.tor_manager.site_onions.values():
|
|
self.server.log.warning("Unknown target onion address: %s" % self.target_onion)
|
|
|
|
handshake = {
|
|
"version": "conservancy",
|
|
"protocol": "v2",
|
|
"use_bin_type": True,
|
|
"peer_id": peer_id,
|
|
"fileserver_port": self.server.port,
|
|
"port_opened": self.server.port_opened.get(self.ip_type, None),
|
|
"target_ip": self.ip,
|
|
"rev": 8192,
|
|
"crypt_supported": crypt_supported,
|
|
"crypt": self.crypt,
|
|
"time": int(time.time())
|
|
}
|
|
if self.target_onion:
|
|
handshake["onion"] = self.target_onion
|
|
elif self.ip_type == "onion":
|
|
handshake["onion"] = self.server.tor_manager.getOnion("global")
|
|
|
|
if self.is_tracker_connection:
|
|
handshake["tracker_connection"] = True
|
|
|
|
if config.debug_socket:
|
|
self.log("My Handshake: %s" % handshake)
|
|
|
|
return handshake
|
|
|
|
def setHandshake(self, handshake):
|
|
if config.debug_socket:
|
|
self.log("Remote Handshake: %s" % handshake)
|
|
|
|
if handshake.get("peer_id") == self.server.peer_id and not handshake.get("tracker_connection") and not self.is_tracker_connection:
|
|
self.close("Same peer id, can't connect to myself")
|
|
self.server.peer_blacklist.append((handshake["target_ip"], handshake["fileserver_port"]))
|
|
return False
|
|
|
|
self.handshake = handshake
|
|
if handshake.get("port_opened", None) is False and "onion" not in handshake and not self.is_private_ip: # Not connectable
|
|
self.port = 0
|
|
else:
|
|
self.port = int(handshake["fileserver_port"]) # Set peer fileserver port
|
|
|
|
if handshake.get("use_bin_type") and self.unpacker:
|
|
unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum()
|
|
self.log("Changing unpacker to bin type (unprocessed bytes: %s)" % unprocessed_bytes_num)
|
|
unprocessed_bytes = self.unpacker.read_bytes(unprocessed_bytes_num)
|
|
self.unpacker = self.getMsgpackUnpacker() # Create new unpacker for different msgpack type
|
|
self.unpacker_bytes = 0
|
|
if unprocessed_bytes:
|
|
self.unpacker.feed(unprocessed_bytes)
|
|
|
|
# Check if we can encrypt the connection
|
|
if handshake.get("crypt_supported") and self.ip not in self.server.broken_ssl_ips:
|
|
if type(handshake["crypt_supported"][0]) is bytes:
|
|
handshake["crypt_supported"] = [item.decode() for item in handshake["crypt_supported"]] # Backward compatibility
|
|
|
|
if self.ip_type == "onion" or self.ip in config.ip_local:
|
|
crypt = None
|
|
elif handshake.get("crypt"): # Recommended crypt by server
|
|
crypt = handshake["crypt"]
|
|
else: # Select the best supported on both sides
|
|
crypt = CryptConnection.manager.selectCrypt(handshake["crypt_supported"])
|
|
|
|
if crypt:
|
|
self.crypt = crypt
|
|
|
|
if self.type == "in" and handshake.get("onion") and not self.ip_type == "onion": # Set incoming connection's onion address
|
|
if self.server.ips.get(self.ip) == self:
|
|
del self.server.ips[self.ip]
|
|
self.setIp(handshake["onion"] + ".onion")
|
|
self.log("Changing ip to %s" % self.ip)
|
|
self.server.ips[self.ip] = self
|
|
self.updateName()
|
|
|
|
self.event_connected.set(True) # Mark handshake as done
|
|
self.event_connected = None
|
|
self.handshake_time = time.time()
|
|
|
|
# Handle incoming message
|
|
def handleMessage(self, message):
|
|
cmd = message["cmd"]
|
|
|
|
self.last_message_time = time.time()
|
|
self.last_cmd_recv = cmd
|
|
if cmd == "response": # New style response
|
|
if message["to"] in self.waiting_requests:
|
|
if self.last_send_time and len(self.waiting_requests) == 1:
|
|
ping = time.time() - self.last_send_time
|
|
self.last_ping_delay = ping
|
|
self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event
|
|
del self.waiting_requests[message["to"]]
|
|
elif message["to"] == 0: # Other peers handshake
|
|
ping = time.time() - self.start_time
|
|
if config.debug_socket:
|
|
self.log("Handshake response: %s, ping: %s" % (message, ping))
|
|
self.last_ping_delay = ping
|
|
# Server switched to crypt, lets do it also if not crypted already
|
|
if message.get("crypt") and not self.sock_wrapped:
|
|
self.crypt = message["crypt"]
|
|
server = (self.type == "in")
|
|
self.log("Crypt out connection using: %s (server side: %s, ping: %.3fs)..." % (self.crypt, server, ping))
|
|
self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server, cert_pin=self.cert_pin)
|
|
self.sock.do_handshake()
|
|
self.sock_wrapped = True
|
|
|
|
if not self.sock_wrapped and self.cert_pin:
|
|
self.close("Crypt connection error: Socket not encrypted, but certificate pin present")
|
|
return
|
|
|
|
self.setHandshake(message)
|
|
else:
|
|
self.log("Unknown response: %s" % message)
|
|
elif cmd:
|
|
self.server.num_recv += 1
|
|
if cmd == "handshake":
|
|
self.handleHandshake(message)
|
|
else:
|
|
self.server.handleRequest(self, message)
|
|
|
|
# Incoming handshake set request
|
|
def handleHandshake(self, message):
|
|
self.setHandshake(message["params"])
|
|
data = self.getHandshakeInfo()
|
|
data["cmd"] = "response"
|
|
data["to"] = message["req_id"]
|
|
self.send(data) # Send response to handshake
|
|
# Sent crypt request to client
|
|
if self.crypt and not self.sock_wrapped:
|
|
server = (self.type == "in")
|
|
self.log("Crypt in connection using: %s (server side: %s)..." % (self.crypt, server))
|
|
try:
|
|
self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server, cert_pin=self.cert_pin)
|
|
self.sock_wrapped = True
|
|
except Exception as err:
|
|
if not config.force_encryption:
|
|
self.log("Crypt connection error, adding %s:%s as broken ssl. %s" % (self.ip, self.port, Debug.formatException(err)))
|
|
self.server.broken_ssl_ips[self.ip] = True
|
|
self.close("Broken ssl")
|
|
|
|
if not self.sock_wrapped and self.cert_pin:
|
|
self.close("Crypt connection error: Socket not encrypted, but certificate pin present")
|
|
|
|
# Send data to connection
|
|
def send(self, message, streaming=False):
|
|
self.last_send_time = time.time()
|
|
if config.debug_socket:
|
|
self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (
|
|
message.get("cmd"), message.get("to"), streaming,
|
|
message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"),
|
|
message.get("req_id"))
|
|
)
|
|
|
|
if not self.sock:
|
|
self.log("Send error: missing socket")
|
|
return False
|
|
|
|
if not self.connected and message.get("cmd") != "handshake":
|
|
self.log("Wait for handshake before send request")
|
|
self.event_connected.get()
|
|
|
|
try:
|
|
stat_key = message.get("cmd", "unknown")
|
|
if stat_key == "response":
|
|
stat_key = "response: %s" % self.last_cmd_recv
|
|
else:
|
|
self.server.num_sent += 1
|
|
|
|
self.server.stat_sent[stat_key]["num"] += 1
|
|
if streaming:
|
|
with self.send_lock:
|
|
bytes_sent = Msgpack.stream(message, self.sock.sendall)
|
|
self.bytes_sent += bytes_sent
|
|
self.server.bytes_sent += bytes_sent
|
|
self.server.stat_sent[stat_key]["bytes"] += bytes_sent
|
|
message = None
|
|
else:
|
|
data = Msgpack.pack(message)
|
|
self.bytes_sent += len(data)
|
|
self.server.bytes_sent += len(data)
|
|
self.server.stat_sent[stat_key]["bytes"] += len(data)
|
|
message = None
|
|
with self.send_lock:
|
|
self.sock.sendall(data)
|
|
except Exception as err:
|
|
self.close("Send error: %s (cmd: %s)" % (err, stat_key))
|
|
return False
|
|
self.last_sent_time = time.time()
|
|
return True
|
|
|
|
# Stream file to connection without msgpacking
|
|
def sendRawfile(self, file, read_bytes):
|
|
buff = 64 * 1024
|
|
bytes_left = read_bytes
|
|
bytes_sent = 0
|
|
while True:
|
|
self.last_send_time = time.time()
|
|
data = file.read(min(bytes_left, buff))
|
|
bytes_sent += len(data)
|
|
with self.send_lock:
|
|
self.sock.sendall(data)
|
|
bytes_left -= buff
|
|
if bytes_left <= 0:
|
|
break
|
|
self.bytes_sent += bytes_sent
|
|
self.server.bytes_sent += bytes_sent
|
|
self.server.stat_sent["raw_file"]["num"] += 1
|
|
self.server.stat_sent["raw_file"]["bytes"] += bytes_sent
|
|
return True
|
|
|
|
# Create and send a request to peer
|
|
def request(self, cmd, params={}, stream_to=None):
|
|
# Last command sent more than 10 sec ago, timeout
|
|
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
|
|
self.close("Request %s timeout: %.3fs" % (self.last_cmd_sent, time.time() - self.last_send_time))
|
|
return False
|
|
|
|
self.last_req_time = time.time()
|
|
self.last_cmd_sent = cmd
|
|
self.req_id += 1
|
|
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
|
|
event = gevent.event.AsyncResult() # Create new event for response
|
|
self.waiting_requests[self.req_id] = {"evt": event, "cmd": cmd}
|
|
if stream_to:
|
|
self.waiting_streams[self.req_id] = stream_to
|
|
self.send(data) # Send request
|
|
res = event.get() # Wait until event solves
|
|
return res
|
|
|
|
def ping(self):
|
|
s = time.time()
|
|
response = None
|
|
with gevent.Timeout(10.0, False):
|
|
try:
|
|
response = self.request("ping")
|
|
except Exception as err:
|
|
self.log("Ping error: %s" % Debug.formatException(err))
|
|
if response and "body" in response and response["body"] == b"Pong!":
|
|
self.last_ping_delay = time.time() - s
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
# Close connection
|
|
def close(self, reason="Unknown"):
|
|
if self.closed:
|
|
return False # Already closed
|
|
self.closed = True
|
|
self.connected = False
|
|
if self.event_connected:
|
|
self.event_connected.set(False)
|
|
|
|
self.log(
|
|
"Closing connection: %s, waiting_requests: %s, sites: %s, buff: %s..." %
|
|
(reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv)
|
|
)
|
|
for request in self.waiting_requests.values(): # Mark pending requests failed
|
|
request["evt"].set(False)
|
|
self.waiting_requests = {}
|
|
self.waiting_streams = {}
|
|
self.sites = 0
|
|
self.server.removeConnection(self) # Remove connection from server registry
|
|
try:
|
|
if self.sock:
|
|
self.sock.shutdown(gevent.socket.SHUT_WR)
|
|
self.sock.close()
|
|
except Exception as err:
|
|
if config.debug_socket:
|
|
self.log("Close error: %s" % err)
|
|
|
|
# Little cleanup
|
|
self.sock = None
|
|
self.unpacker = None
|
|
self.event_connected = None
|