rev324, Store and display peers last found time, Fix UiPassword plugin cleanup bug, Experimental option to use tempfiles when downloading, Experimental option to stream files out of msgpack context, FileRequest streamFile command, Cleanup peers if not found for 4 hours, Don't reopen openssl in every 5min, Peer fileGet benchmark option

This commit is contained in:
HelloZeroNet 2015-07-25 13:38:58 +02:00
parent a93ca2c3b4
commit d331eea384
12 changed files with 233 additions and 171 deletions

View file

@ -116,7 +116,7 @@ class UiRequestPlugin(object):
# Sites
yield "<br><br><b>Sites</b>:"
yield "<table>"
yield "<tr><th>address</th> <th>connected</th> <th>peers</th> <th>content.json</th> </tr>"
yield "<tr><th>address</th> <th>connected</th> <th title='connected/good/total'>peers</th> <th>content.json</th> </tr>"
for site in self.server.sites.values():
yield self.formatTableRow([
(
@ -133,7 +133,15 @@ class UiRequestPlugin(object):
])
yield "<tr><td id='peers_%s' style='display: none; white-space: pre'>" % site.address
for key, peer in site.peers.items():
yield "(%s, err: %s) %22s -<br>" % (peer.connection, peer.connection_error, key)
if peer.last_found:
last_found = int(time.time()-peer.last_found)/60
else:
last_found = "--"
if peer.connection:
connection_id = peer.connection.id
else:
connection_id = None
yield "(#%s, err: %s, found: %s min ago) %22s -<br>" % (connection_id, peer.connection_error, last_found, key)
yield "<br></td></tr>"
yield "</table>"

View file

@ -67,6 +67,7 @@ class UiRequestPlugin(object):
@classmethod
def cleanup(cls):
cls.last_cleanup = time.time()
for session_id, session in cls.sessions.items():
if session["keep"] and time.time() - session["added"] > 60 * 60 * 24 * 60: # Max 60days for keep sessions
del(cls.sessions[session_id])

View file

@ -8,7 +8,7 @@ class Config(object):
def __init__(self, argv):
self.version = "0.3.1"
self.rev = 307
self.rev = 324
self.argv = argv
self.action = None
self.createParser()
@ -85,8 +85,9 @@ class Config(object):
action.add_argument('peer_port', help='Peer port')
action.add_argument('site', help='Site address')
action.add_argument('filename', help='File name to request')
action.add_argument('--benchmark', help='Request file 10x then displays the total time', action='store_true')
# PeerGetFile
# PeerCmd
action = self.subparsers.add_parser("peerCmd", help='Request and print a file content from peer')
action.add_argument('peer_ip', help='Peer ip')
action.add_argument('peer_port', help='Peer port')
@ -125,6 +126,10 @@ class Config(object):
self.parser.add_argument('--disable_encryption', help='Disable connection encryption', action='store_true')
self.parser.add_argument('--disable_sslcompression', help='Disable SSL compression to save memory',
type='bool', choices=[True, False], default=True)
self.parser.add_argument('--use_tempfiles', help='Use temporary files when downloading (experimental)',
type='bool', choices=[True, False], default=False)
self.parser.add_argument('--stream_downloads', help='Stream download directly to files (experimental)',
type='bool', choices=[True, False], default=False)
self.parser.add_argument('--coffeescript_compiler', help='Coffeescript compiler for developing', default=coffeescript,
metavar='executable_path')

View file

@ -15,7 +15,7 @@ class Connection(object):
"sock", "sock_wrapped", "ip", "port", "peer_id", "id", "protocol", "type", "server", "unpacker", "req_id",
"handshake", "crypt", "connected", "event_connected", "closed", "start_time", "last_recv_time",
"last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent",
"last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests"
"last_ping_delay", "last_req_time", "last_cmd", "name", "updateName", "waiting_requests", "waiting_streams"
)
def __init__(self, server, ip, port, sock=None):
@ -56,6 +56,7 @@ class Connection(object):
self.updateName()
self.waiting_requests = {} # Waiting sent requests
self.waiting_streams = {} # Waiting response file streams
def updateName(self):
self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
@ -116,18 +117,25 @@ class Connection(object):
buff = self.sock.recv(16 * 1024)
if not buff:
break # Connection closed
# Statistics
self.last_recv_time = time.time()
self.incomplete_buff_recv += 1
self.bytes_recv += len(buff)
self.server.bytes_recv += len(buff)
if not self.unpacker:
self.unpacker = msgpack.Unpacker()
self.unpacker.feed(buff)
buff = None
for message in self.unpacker:
self.incomplete_buff_recv = 0
self.handleMessage(message)
if "stream_bytes" in message:
self.handleStream(message)
else:
self.handleMessage(message)
message = None
buff = None
except Exception, err:
if not self.closed:
self.log("Socket error: %s" % Debug.formatException(err))
@ -209,6 +217,46 @@ class Connection(object):
self.waiting_requests[last_req_id].set(message)
del self.waiting_requests[last_req_id] # Remove from waiting request
# Stream socket directly to a file
def handleStream(self, message):
if config.debug_socket:
self.log("Starting stream %s: %s bytes" % (message["to"], message["stream_bytes"]))
read_bytes = message["stream_bytes"] # Bytes left we have to read from socket
try:
buff = self.unpacker.read_bytes(min(16 * 1024, read_bytes)) # Check if the unpacker has something left in buffer
except Exception, err:
buff = ""
file = self.waiting_streams[message["to"]]
if buff:
read_bytes -= len(buff)
file.write(buff)
try:
while 1:
if read_bytes <= 0:
break
buff = self.sock.recv(16 * 1024)
buff_len = len(buff)
read_bytes -= buff_len
file.write(buff)
# Statistics
self.last_recv_time = time.time()
self.incomplete_buff_recv += 1
self.bytes_recv += buff_len
self.server.bytes_recv += buff_len
except Exception, err:
self.log("Stream read error: %s" % Debug.formatException(err))
if config.debug_socket:
self.log("End stream %s" % message["to"])
self.incomplete_buff_recv = 0
self.waiting_requests[message["to"]].set(message) # Set the response to event
del self.waiting_streams[message["to"]]
del self.waiting_requests[message["to"]]
# Send data to connection
def send(self, message, streaming=False):
if config.debug_socket:
@ -218,22 +266,43 @@ class Connection(object):
message.get("req_id"))
)
self.last_send_time = time.time()
if streaming:
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
message = None
self.bytes_sent += bytes_sent
self.server.bytes_sent += bytes_sent
else:
data = msgpack.packb(message)
message = None
self.bytes_sent += len(data)
self.server.bytes_sent += len(data)
self.sock.sendall(data)
try:
if streaming:
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
message = None
self.bytes_sent += bytes_sent
self.server.bytes_sent += bytes_sent
else:
data = msgpack.packb(message)
message = None
self.bytes_sent += len(data)
self.server.bytes_sent += len(data)
self.sock.sendall(data)
except Exception, err:
self.log("Send errror: %s" % Debug.formatException(err))
self.close()
return False
self.last_sent_time = time.time()
return True
# Stream raw file to connection
def sendRawfile(self, file, read_bytes):
buff = 64 * 1024
bytes_left = read_bytes
while True:
self.last_send_time = time.time()
self.sock.sendall(
file.read(min(bytes_left, buff))
)
bytes_left -= buff
if bytes_left <= 0:
break
self.bytes_sent += read_bytes
self.server.bytes_sent += read_bytes
return True
# Create and send a request to peer
def request(self, cmd, params={}):
def request(self, cmd, params={}, stream_to=None):
# Last command sent more than 10 sec ago, timeout
if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
self.log("Request %s timeout: %s" % (self.last_cmd, time.time() - self.last_send_time))
@ -246,6 +315,8 @@ class Connection(object):
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
event = gevent.event.AsyncResult() # Create new event for response
self.waiting_requests[self.req_id] = event
if stream_to:
self.waiting_streams[self.req_id] = stream_to
self.send(data) # Send request
res = event.get() # Wait until event solves
return res
@ -280,6 +351,7 @@ class Connection(object):
for request in self.waiting_requests.values(): # Mark pending requests failed
request.set(False)
self.waiting_requests = {}
self.waiting_streams = {}
self.server.removeConnection(self) # Remove connection from server registry
try:
if self.sock:

View file

@ -1,136 +0,0 @@
import time, socket, msgpack
from cStringIO import StringIO
print "Connecting..."
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 1234))
print "1 Threaded: Send, receive 10000 ping request...",
s = time.time()
for i in range(10000):
sock.sendall(msgpack.packb({"cmd": "Ping"}))
req = sock.recv(16*1024)
print time.time()-s, repr(req), time.time()-s
print "1 Threaded: Send, receive, decode 10000 ping request...",
s = time.time()
unpacker = msgpack.Unpacker()
reqs = 0
for i in range(10000):
sock.sendall(msgpack.packb({"cmd": "Ping"}))
unpacker.feed(sock.recv(16*1024))
for req in unpacker:
reqs += 1
print "Found:", req, "x", reqs, time.time()-s
print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...",
s = time.time()
unpacker = msgpack.Unpacker()
reqs = 0
for i in range(1000):
sock.sendall(msgpack.packb({"cmd": "Ping"}))
unpacker.feed(sock.recv(16*1024))
for req in unpacker:
reqs += 1
sock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 1234))
print "Found:", req, "x", reqs, time.time()-s
print "1 Threaded: Request, receive, decode 10000 x 10k data request...",
s = time.time()
unpacker = msgpack.Unpacker()
reqs = 0
for i in range(10000):
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
"""buff = StringIO()
data = sock.recv(16*1024)
buff.write(data)
if not data:
break
while not data.endswith("\n"):
data = sock.recv(16*1024)
if not data: break
buff.write(data)
req = msgpack.unpackb(buff.getvalue().strip("\n"))
reqs += 1"""
req_found = False
while not req_found:
buff = sock.recv(16*1024)
unpacker.feed(buff)
for req in unpacker:
reqs += 1
req_found = True
break # Only process one request
print "Found:", len(req["res"]), "x", reqs, time.time()-s
print "10 Threaded: Request, receive, decode 10000 x 10k data request...",
import gevent
s = time.time()
reqs = 0
req = None
def requester():
global reqs, req
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 1234))
unpacker = msgpack.Unpacker()
for i in range(1000):
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
req_found = False
while not req_found:
buff = sock.recv(16*1024)
unpacker.feed(buff)
for req in unpacker:
reqs += 1
req_found = True
break # Only process one request
threads = []
for i in range(10):
threads.append(gevent.spawn(requester))
gevent.joinall(threads)
print "Found:", len(req["res"]), "x", reqs, time.time()-s
print "1 Threaded: ZeroMQ Send, receive 1000 ping request...",
s = time.time()
import zmq.green as zmq
c = zmq.Context()
zmq_sock = c.socket(zmq.REQ)
zmq_sock.connect('tcp://127.0.0.1:1234')
for i in range(1000):
zmq_sock.send(msgpack.packb({"cmd": "Ping"}))
req = zmq_sock.recv(16*1024)
print "Found:", req, time.time()-s
print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...",
s = time.time()
import zmq.green as zmq
c = zmq.Context()
zmq_sock = c.socket(zmq.REQ)
zmq_sock.connect('tcp://127.0.0.1:1234')
for i in range(1000):
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
req = msgpack.unpackb(zmq_sock.recv(1024*1024))
print "Found:", len(req["res"]), time.time()-s
print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...",
s = time.time()
import zmq.green as zmq
c = zmq.Context()
zmq_sock = c.socket(zmq.REQ)
zmq_sock.connect('tcp://127.0.0.1:1233')
for i in range(1000):
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
req = msgpack.unpackb(zmq_sock.recv(1024*1024))
print "Found:", len(req["res"]), time.time()-s

View file

@ -34,6 +34,10 @@ class FileRequest(object):
if not self.connection.closed:
self.connection.send(msg, streaming)
def sendRawfile(self, file, read_bytes):
if not self.connection.closed:
self.connection.sendRawfile(file, read_bytes)
def response(self, msg, streaming=False):
if self.responded:
self.log.debug("Req id %s already responded" % self.req_id)
@ -51,6 +55,8 @@ class FileRequest(object):
if cmd == "getFile":
self.actionGetFile(params)
elif cmd == "streamFile":
self.actionStreamFile(params)
elif cmd == "update":
event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"])
if not RateLimit.isAllowed(event): # There was already an update for this file in the last 10 second
@ -157,6 +163,43 @@ class FileRequest(object):
self.response({"error": "File read error: %s" % Debug.formatException(err)})
return False
# New-style file streaming out of Msgpack context
def actionStreamFile(self, params):
site = self.sites.get(params["site"])
if not site or not site.settings["serving"]: # Site unknown or not serving
self.response({"error": "Unknown site"})
return False
try:
if config.debug_socket:
self.log.debug("Opening file: %s" % params["inner_path"])
with site.storage.open(params["inner_path"]) as file:
file.seek(params["location"])
stream_bytes = min(FILE_BUFF, os.fstat(file.fileno()).st_size-params["location"])
back = {
"size": os.fstat(file.fileno()).st_size,
"location": min(file.tell() + FILE_BUFF, os.fstat(file.fileno()).st_size),
"stream_bytes": stream_bytes
}
if config.debug_socket:
self.log.debug(
"Sending file %s from position %s to %s" %
(params["inner_path"], params["location"], back["location"])
)
self.response(back)
self.sendRawfile(file, read_bytes=FILE_BUFF)
if config.debug_socket:
self.log.debug("File %s sent" % params["inner_path"])
# Add peer to site if not added before
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
if connected_peer: # Just added
connected_peer.connect(self.connection) # Assign current connection to peer
except Exception, err:
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
self.response({"error": "File read error: %s" % Debug.formatException(err)})
return False
# Peer exchange request
def actionPex(self, params):
site = self.sites.get(params["site"])

View file

@ -184,6 +184,8 @@ class FileServer(ConnectionServer):
if site.bad_files:
site.retryBadFiles()
site.cleanupPeers()
# In passive mode keep 5 active peer connection to get the updates
if self.port_opened is False:
site.needConnections()

View file

@ -7,12 +7,17 @@ import struct
from cStringIO import StringIO
from Debug import Debug
from Config import config
if config.use_tempfiles:
import tempfile
# Communicate remote peers
class Peer(object):
__slots__ = ("ip", "port", "site", "key", "connection_server", "connection", "last_found", "last_response",
"last_ping", "added", "connection_error", "hash_failed", "download_bytes", "download_time")
__slots__ = (
"ip", "port", "site", "key", "connection_server", "connection", "last_found", "last_response",
"last_ping", "added", "connection_error", "hash_failed", "download_bytes", "download_time"
)
def __init__(self, ip, port, site=None):
self.ip = ip
@ -22,7 +27,7 @@ class Peer(object):
self.connection_server = sys.modules["main"].file_server
self.connection = None
self.last_found = None # Time of last found in the torrent tracker
self.last_found = time.time() # Time of last found in the torrent tracker
self.last_response = None # Time of last successful response from peer
self.last_ping = None # Last response time for ping
self.added = time.time()
@ -85,7 +90,7 @@ class Peer(object):
self.last_found = time.time()
# Send a command to peer
def request(self, cmd, params={}):
def request(self, cmd, params={}, stream_to=None):
if not self.connection or self.connection.closed:
self.connect()
if not self.connection:
@ -94,7 +99,7 @@ class Peer(object):
for retry in range(1, 3): # Retry 3 times
try:
response = self.connection.request(cmd, params)
response = self.connection.request(cmd, params, stream_to)
if not response:
raise Exception("Send error")
if "error" in response:
@ -120,8 +125,16 @@ class Peer(object):
# Get a file content from peer
def getFile(self, site, inner_path):
# Use streamFile if client supports it
if config.stream_downloads and self.connection and self.connection.handshake["rev"] > 310:
return self.streamFile(site, inner_path)
location = 0
buff = StringIO()
if config.use_tempfiles:
buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
else:
buff = StringIO()
s = time.time()
while True: # Read in 512k parts
back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})
@ -135,6 +148,33 @@ class Peer(object):
break
else:
location = back["location"]
self.download_bytes += back["location"]
self.download_time += (time.time() - s)
buff.seek(0)
return buff
# Download file out of msgpack context to save memory and cpu
def streamFile(self, site, inner_path):
location = 0
if config.use_tempfiles:
buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
else:
buff = StringIO()
s = time.time()
while True: # Read in 512k parts
back = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)
if not back: # Error
self.log("Invalid response: %s" % back)
return False
if back["location"] == back["size"]: # End of file
break
else:
location = back["location"]
self.download_bytes += back["location"]
self.download_time += (time.time() - s)
buff.seek(0)

View file

@ -252,7 +252,6 @@ class Site:
# Update site by redownload all content.json
def redownloadContents(self):
# Download all content.json again
content_threads = []
for inner_path in self.content_manager.contents.keys():
@ -449,7 +448,7 @@ class Site:
return False # Ignore blacklist (eg. myself)
key = "%s:%s" % (ip, port)
if key in self.peers: # Already has this ip
# self.peers[key].found()
self.peers[key].found()
if return_peer: # Always return peer
return self.peers[key]
else:
@ -651,17 +650,38 @@ class Site:
break # Found requested number of peers
if (not found and not ignore) or (need_num > 5 and need_num < 100 and len(found) < need_num):
# Return not that good peers: Not found any peer and the requester dont have any or cant give enought peer
# Return not that good peers: Not found any peer and the requester dont have any or cant give enough peer
found = [peer for peer in peers if not peer.key.endswith(":0") and peer.key not in ignore][0:need_num - len(found)]
return found
# Cleanup probably dead peers
def cleanupPeers(self):
peers = self.peers.values()
if len(peers) < 20:
return False
removed = 0
for peer in peers:
if peer.connection and peer.connection.connected:
continue
if peer.connection and not peer.connection.connected:
peer.connection = None # Dead connection
if time.time() - peer.last_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour
peer.remove()
removed += 1
if removed > 5: # Don't remove too much at once
break
if removed:
self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers)))
# - Events -
# Add event listeners
def addEventListeners(self):
self.onFileStart = util.Event() # If WorkerManager added new task
self.onFileDone = util.Event() # If WorkerManager successfuly downloaded a file
self.onFileDone = util.Event() # If WorkerManager successfully downloaded a file
self.onFileFail = util.Event() # If WorkerManager failed to download a file
self.onComplete = util.Event() # All file finished

View file

@ -45,9 +45,9 @@ class Worker(object):
try:
buff = self.peer.getFile(site.address, task["inner_path"])
except Exception, err:
self.manager.log.debug("%s: getFile error: err" % (self.key, err))
self.manager.log.debug("%s: getFile error: %s" % (self.key, err))
buff = None
if self.running is False: # Worker no longer needed or got killed
if self.running is False or task["done"] is True: # Worker no longer needed or got killed
self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"]))
break
if buff: # Download ok

View file

@ -8,6 +8,7 @@
import ctypes
import ctypes.util
import _ctypes
import hashlib
import base64
import time
@ -395,7 +396,6 @@ def ECDSA_SIG_recover_key_GFp(eckey, r, s, msg, msglen, recid, check):
def closeLibrary():
import _ctypes
if "FreeLibrary" in dir(_ctypes):
_ctypes.FreeLibrary(ssl._lib._handle)
else:
@ -414,10 +414,12 @@ def getMessagePubkey(message, sig):
mb = ctypes.create_string_buffer(size)
ssl.i2o_ECPublicKey(eckey, ctypes.byref(ctypes.pointer(mb)))
pub = mb.raw
"""
if time.time() - ssl.time_opened > 60 * 5: # Reopen every 5 min
logging.debug("Reopening OpenSSL...")
closeLibrary()
openLibrary()
"""
return pub

View file

@ -263,7 +263,7 @@ class Actions(object):
print "Response time: %.3fs (crypt: %s)" % (time.time() - s, peer.connection.crypt)
time.sleep(1)
def peerGetFile(self, peer_ip, peer_port, site, filename):
def peerGetFile(self, peer_ip, peer_port, site, filename, benchmark=False):
logging.info("Opening a simple connection server")
global file_server
from Connection import ConnectionServer
@ -273,8 +273,13 @@ class Actions(object):
logging.info("Getting %s/%s from peer: %s:%s..." % (site, filename, peer_ip, peer_port))
peer = Peer(peer_ip, peer_port)
s = time.time()
print peer.getFile(site, filename).read()
print "Response time: %.3fs" % (time.time() - s)
peer.getFile(site, filename)
if benchmark:
for i in range(10):
print peer.getFile(site, filename),
print "Response time: %.3fs" % (time.time() - s)
raw_input("Check memory")
def peerCmd(self, peer_ip, peer_port, cmd, parameters):
logging.info("Opening a simple connection server")