version 0.2.4, peerPing and peerGetFile commands, old content update bugfix, new network code and protocol, connection share between sites, connection reuse, dont retry bad file more than 3 times in 20 min, multi threaded include file download, shuffle peers before publish, simple internal stats page, dont retry on failed peers, more than 10 peers publish bugfix

This commit is contained in:
HelloZeroNet 2015-02-23 23:33:31 +01:00
parent 531bf68ddd
commit 31d4609a3b
18 changed files with 790 additions and 130 deletions

View file

@ -0,0 +1,234 @@
import logging, socket, time
from cStringIO import StringIO
import gevent, msgpack
from Config import config
from Debug import Debug
try:
import zmq.green as zmq
except:
zmq = None
class Connection:
def __init__(self, server, ip, port, sock=None):
self.sock = sock
self.ip = ip
self.port = port
self.peer_id = None # Bittorrent style peer id (not used yet)
self.id = server.last_connection_id
self.protocol = "?"
server.last_connection_id += 1
self.server = server
self.log = logging.getLogger(str(self))
self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here
self.req_id = 0 # Last request id
self.handshake = None # Handshake info got from peer
self.event_handshake = gevent.event.AsyncResult() # Solves on handshake received
self.closed = False
self.zmq_sock = None # Zeromq sock if outgoing connection
self.zmq_queue = [] # Messages queued to send
self.zmq_working = False # Zmq currently working, just add to queue
self.forward_thread = None # Zmq forwarder thread
self.waiting_requests = {} # Waiting sent requests
if not sock: self.connect() # Not an incoming connection, connect to peer
def __str__(self):
return "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
def __repr__(self):
return "<%s>" % self.__str__()
# Open connection to peer and wait for handshake
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, self.port))
# Detect protocol
self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()})
gevent.spawn(self.messageLoop)
return self.event_handshake.get() # Wait for handshake
# Handle incoming connection
def handleIncomingConnection(self, sock):
firstchar = sock.recv(1) # Find out if pure socket or zeromq
if firstchar == "\xff": # Backward compatiblity: forward data to zmq
if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ")
self.protocol = "zeromq"
self.log.name = str(self)
self.event_handshake.set(self.protocol)
if self.server.zmq_running:
zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
zmq_sock.connect(("127.0.0.1", self.server.zmq_port))
zmq_sock.send(firstchar)
self.forward_thread = gevent.spawn(self.server.forward, self, zmq_sock, sock)
self.server.forward(self, sock, zmq_sock)
self.close() # Forward ended close connection
else:
self.config.debug("ZeroMQ Server not running, exiting!")
else: # Normal socket
self.messageLoop(firstchar)
# Message loop for connection
def messageLoop(self, firstchar=None):
sock = self.sock
if not firstchar: firstchar = sock.recv(1)
if firstchar == "\xff": # Backward compatibility to zmq
self.sock.close() # Close normal socket
if zmq:
if config.debug_socket: self.log.debug("Connecting as ZeroMQ")
self.protocol = "zeromq"
self.log.name = str(self)
self.event_handshake.set(self.protocol) # Mark handshake as done
try:
context = zmq.Context()
zmq_sock = context.socket(zmq.REQ)
zmq_sock.hwm = 1
zmq_sock.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive
zmq_sock.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send
zmq_sock.setsockopt(zmq.LINGER, 500) # Wait for zmq_sock close
zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port))
self.zmq_sock = zmq_sock
except Exception, err:
self.log.debug("Socket error: %s" % Debug.formatException(err))
else:
return False # No zeromq connection supported
else: # Normal socket
self.protocol = "v2"
self.log.name = str(self)
self.event_handshake.set(self.protocol) # Mark handshake as done
unpacker = self.unpacker
unpacker.feed(firstchar) # Feed the first char we already requested
try:
while True:
buff = sock.recv(16*1024)
if not buff: break # Connection closed
unpacker.feed(buff)
for message in unpacker:
self.handleMessage(message)
except Exception, err:
self.log.debug("Socket error: %s" % Debug.formatException(err))
self.close() # MessageLoop ended, close connection
# Read one line (not used)
def recvLine(self):
sock = self.sock
data = sock.recv(16*1024)
if not data: return
if not data.endswith("\n"): # Multipart, read until \n
buff = StringIO()
buff.write(data)
while not data.endswith("\n"):
data = sock.recv(16*1024)
if not data: break
buff.write(data)
return buff.getvalue().strip("\n")
return data.strip("\n")
# My handshake info
def handshakeInfo(self):
return {
"version": config.version,
"protocol": "v2",
"peer_id": self.server.peer_id,
"fileserver_port": config.fileserver_port
}
# Handle incoming message
def handleMessage(self, message):
if message.get("cmd") == "response": # New style response
if message["to"] in self.waiting_requests:
self.waiting_requests[message["to"]].set(message) # Set the response to event
del self.waiting_requests[message["to"]]
elif message["to"] == 0: # Other peers handshake
if config.debug_socket: self.log.debug("Got handshake response: %s" % message)
self.handshake = message
self.port = message["fileserver_port"] # Set peer fileserver port
else:
self.log.debug("Unknown response: %s" % message)
elif message.get("cmd"): # Handhsake request
if message["cmd"] == "handshake":
self.handshake = message["params"]
self.port = self.handshake["fileserver_port"] # Set peer fileserver port
if config.debug_socket: self.log.debug("Handshake request: %s" % message)
data = self.handshakeInfo()
data["cmd"] = "response"
data["to"] = message["req_id"]
self.send(data)
else:
self.server.handleRequest(self, message)
else: # Old style response, no req_id definied
if config.debug_socket: self.log.debug("Old style response, waiting: %s" % self.waiting_requests.keys())
last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true
self.waiting_requests[last_req_id].set(message)
del self.waiting_requests[last_req_id] # Remove from waiting request
# Send data to connection
def send(self, data):
if config.debug_socket: self.log.debug("Send: %s" % data.get("cmd"))
if self.protocol == "zeromq":
if self.zmq_sock: # Outgoing connection
self.zmq_queue.append(data)
if self.zmq_working:
self.log.debug("ZeroMQ already working...")
return
while self.zmq_queue:
self.zmq_working = True
data = self.zmq_queue.pop(0)
self.zmq_sock.send(msgpack.packb(data))
self.handleMessage(msgpack.unpackb(self.zmq_sock.recv()))
self.zmq_working = False
else: # Incoming request
self.server.zmq_sock.send(msgpack.packb(data))
else: # Normal connection
self.sock.sendall(msgpack.packb(data))
# Create and send a request to peer
def request(self, cmd, params={}):
self.req_id += 1
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
event = gevent.event.AsyncResult() # Create new event for response
self.waiting_requests[self.req_id] = event
self.send(data) # Send request
res = event.get() # Wait until event solves
return res
# Close connection
def close(self):
if self.closed: return False # Already closed
self.closed = True
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s..." % len(self.waiting_requests))
for request in self.waiting_requests.values(): # Mark pending requests failed
request.set(False)
self.waiting_requests = {}
self.server.removeConnection(self) # Remove connection from server registry
try:
if self.forward_thread:
self.forward_thread.kill(exception=Debug.Notify("Closing connection"))
if self.zmq_sock:
self.zmq_sock.close()
if self.sock:
self.sock.shutdown(gevent.socket.SHUT_WR)
self.sock.close()
except Exception, err:
if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err))

View file

@ -0,0 +1,136 @@
import time, socket, msgpack
from cStringIO import StringIO
print "Connecting..."
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 1234))
print "1 Threaded: Send, receive 10000 ping request...",
s = time.time()
for i in range(10000):
sock.sendall(msgpack.packb({"cmd": "Ping"}))
req = sock.recv(16*1024)
print time.time()-s, repr(req), time.time()-s
print "1 Threaded: Send, receive, decode 10000 ping request...",
s = time.time()
unpacker = msgpack.Unpacker()
reqs = 0
for i in range(10000):
sock.sendall(msgpack.packb({"cmd": "Ping"}))
unpacker.feed(sock.recv(16*1024))
for req in unpacker:
reqs += 1
print "Found:", req, "x", reqs, time.time()-s
print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...",
s = time.time()
unpacker = msgpack.Unpacker()
reqs = 0
for i in range(1000):
sock.sendall(msgpack.packb({"cmd": "Ping"}))
unpacker.feed(sock.recv(16*1024))
for req in unpacker:
reqs += 1
sock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 1234))
print "Found:", req, "x", reqs, time.time()-s
print "1 Threaded: Request, receive, decode 10000 x 10k data request...",
s = time.time()
unpacker = msgpack.Unpacker()
reqs = 0
for i in range(10000):
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
"""buff = StringIO()
data = sock.recv(16*1024)
buff.write(data)
if not data:
break
while not data.endswith("\n"):
data = sock.recv(16*1024)
if not data: break
buff.write(data)
req = msgpack.unpackb(buff.getvalue().strip("\n"))
reqs += 1"""
req_found = False
while not req_found:
buff = sock.recv(16*1024)
unpacker.feed(buff)
for req in unpacker:
reqs += 1
req_found = True
break # Only process one request
print "Found:", len(req["res"]), "x", reqs, time.time()-s
print "10 Threaded: Request, receive, decode 10000 x 10k data request...",
import gevent
s = time.time()
reqs = 0
req = None
def requester():
global reqs, req
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 1234))
unpacker = msgpack.Unpacker()
for i in range(1000):
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
req_found = False
while not req_found:
buff = sock.recv(16*1024)
unpacker.feed(buff)
for req in unpacker:
reqs += 1
req_found = True
break # Only process one request
threads = []
for i in range(10):
threads.append(gevent.spawn(requester))
gevent.joinall(threads)
print "Found:", len(req["res"]), "x", reqs, time.time()-s
print "1 Threaded: ZeroMQ Send, receive 1000 ping request...",
s = time.time()
import zmq.green as zmq
c = zmq.Context()
zmq_sock = c.socket(zmq.REQ)
zmq_sock.connect('tcp://127.0.0.1:1234')
for i in range(1000):
zmq_sock.send(msgpack.packb({"cmd": "Ping"}))
req = zmq_sock.recv(16*1024)
print "Found:", req, time.time()-s
print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...",
s = time.time()
import zmq.green as zmq
c = zmq.Context()
zmq_sock = c.socket(zmq.REQ)
zmq_sock.connect('tcp://127.0.0.1:1234')
for i in range(1000):
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
req = msgpack.unpackb(zmq_sock.recv(1024*1024))
print "Found:", len(req["res"]), time.time()-s
print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...",
s = time.time()
import zmq.green as zmq
c = zmq.Context()
zmq_sock = c.socket(zmq.REQ)
zmq_sock.connect('tcp://127.0.0.1:1233')
for i in range(1000):
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
req = msgpack.unpackb(zmq_sock.recv(1024*1024))
print "Found:", len(req["res"]), time.time()-s

View file

@ -0,0 +1,231 @@
from gevent.server import StreamServer
from gevent.pool import Pool
import socket, os, logging, random, string
import gevent, msgpack
import cStringIO as StringIO
from Debug import Debug
from Connection import Connection
from Config import config
class ConnectionServer:
def __init__(self, ip=None, port=None, request_handler=None):
self.ip = ip
self.port = port
self.last_connection_id = 1 # Connection id incrementer
self.log = logging.getLogger(__name__)
self.connections = [] # Connections
self.ips = {} # Connection by ip
self.peer_ids = {} # Connections by peer_ids
self.running = True
self.zmq_running = False
self.zmq_last_connection = None # Last incoming message client
self.peer_id = "-ZN0"+config.version.replace(".", "")+"-"+''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12)) # Bittorrent style peerid
if port: # Listen server on a port
self.zmq_port = port-1
self.pool = Pool(1000) # do not accept more than 1000 connections
self.stream_server = StreamServer((ip.replace("*", ""), port), self.handleIncomingConnection, spawn=self.pool, backlog=100)
if request_handler: self.handleRequest = request_handler
gevent.spawn(self.zmqServer) # Start ZeroMQ server for backward compatibility
def start(self):
self.running = True
try:
self.log.debug("Binding to: %s:%s" % (self.ip, self.port))
self.stream_server.serve_forever() # Start normal connection server
except Exception, err:
self.log.info("StreamServer bind error, must be running already: %s" % err)
def stop(self):
self.running = False
self.stream_server.stop()
def handleIncomingConnection(self, sock, addr):
ip, port = addr
connection = Connection(self, ip, port, sock)
self.connections.append(connection)
self.ips[ip] = connection
connection.handleIncomingConnection(sock)
def connect(self, ip=None, port=None, peer_id=None):
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
return self.peer_ids.get(peer_id)
if ip in self.ips: # Find connection by ip
return self.ips[ip]
# No connection found yet
try:
connection = Connection(self, ip, port)
self.ips[ip] = connection
self.connections.append(connection)
except Exception, err:
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
raise err
return connection
def removeConnection(self, connection):
if self.ips.get(connection.ip) == connection: # Delete if same as in registry
del self.ips[connection.ip]
if connection in self.connections:
self.connections.remove(connection)
if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry
del self.peer_ids[connection.peer_id]
def zmqServer(self):
self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port)
try:
import zmq.green as zmq
context = zmq.Context()
self.zmq_sock = context.socket(zmq.REP)
self.zmq_sock.bind("tcp://127.0.0.1:%s" % self.zmq_port)
self.zmq_sock.hwm = 1
self.zmq_sock.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive
self.zmq_sock.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send
self.zmq_running = True
except Exception, err:
self.log.debug("ZeroMQ start error: %s" % Debug.formatException(err))
return False
while True:
try:
data = self.zmq_sock.recv()
if not data: break
message = msgpack.unpackb(data)
self.zmq_last_connection.handleMessage(message)
except Exception, err:
self.log.debug("ZMQ Server error: %s" % Debug.formatException(err))
self.zmq_sock.send(msgpack.packb({"error": "%s" % err}, use_bin_type=True))
# Forward incoming data to other socket
def forward(self, connection, source, dest):
data = True
try:
while data:
data = source.recv(16*1024)
self.zmq_last_connection = connection
if data:
dest.sendall(data)
else:
source.shutdown(socket.SHUT_RD)
dest.shutdown(socket.SHUT_WR)
except Exception, err:
self.log.debug("%s ZMQ forward error: %s" % (connection.ip, Debug.formatException(err)))
connection.close()
# -- TESTING --
def testCreateServer():
global server
server = ConnectionServer("127.0.0.1", 1234, testRequestHandler)
server.start()
def testRequestHandler(connection, req):
print req
if req["cmd"] == "Bigdata":
connection.send({"res": "HelloWorld"*1024})
else:
connection.send({"res": "pong"})
def testClient(num):
time.sleep(1)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("localhost", 1234))
for i in range(10):
print "[C%s] send..." % num
s.sendall(msgpack.packb({"cmd": "[C] Ping"}))
print "[C%s] recv..." % num
print "[C%s] %s" % (num, repr(s.recv(1024)))
time.sleep(1)
def testSlowClient(num):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("localhost", 1234))
for i in range(1):
print "[C%s] send..." % num
s.sendall(msgpack.packb({"cmd": "Bigdata"}))
print "[C%s] recv..." % num
gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s)
while 1:
data = s.recv(1000)
if not data: break
print "[C%s] %s" % (num, data)
time.sleep(1)
#s.sendall(msgpack.packb({"cmd": "[C] Ping"}))
def testZmqClient(num):
import zmq.green as zmq
c = zmq.Context(1)
for i in range(10):
s = c.socket(zmq.REQ)
s.connect('tcp://127.0.0.1:1234')
print "[Z%s] send..." % num
s.send(msgpack.packb({"cmd": "[Z] Ping %s" % i}))
print "[Z%s] recv..." % num
print "[Z%s] %s" % (num, s.recv(1024))
s.close()
time.sleep(1)
def testZmqSlowClient(num):
import zmq.green as zmq
c = zmq.Context(1)
s = c.socket(zmq.REQ)
for i in range(1):
s.connect('tcp://127.0.0.1:1234')
print "[Z%s] send..." % num
s.send(msgpack.packb({"cmd": "Bigdata"}))
print "[Z%s] recv..." % num
#gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s)
while 1:
data = s.recv(1024*1024)
if not data: break
print "[Z%s] %s" % (num, data)
time.sleep(1)
s.send(msgpack.packb({"cmd": "[Z] Ping"}))
def testConnection():
global server
connection = server.connect("127.0.0.1", 1234)
connection.send({"res": "Sending: Hello!"})
print connection
def greenletsNum():
from greenlet import greenlet
import gc
while 1:
print len([ob for ob in gc.get_objects() if isinstance(ob, greenlet)])
time.sleep(1)
if __name__ == "__main__":
from gevent import monkey; monkey.patch_all(thread=False)
import sys, time
logging.getLogger().setLevel(logging.DEBUG)
gevent.spawn(testZmqClient, 1)
gevent.spawn(greenletsNum)
#gevent.spawn(testClient, 1)
#gevent.spawn_later(1, testConnection)
print "Running server..."
server = None
testCreateServer()

View file

@ -0,0 +1,2 @@
from ConnectionServer import ConnectionServer
from Connection import Connection