Merge branch 'master' of github.com:HelloZeroNet/ZeroNet
This commit is contained in:
commit
6a409a000a
18 changed files with 790 additions and 130 deletions
|
@ -3,7 +3,7 @@ import ConfigParser
|
|||
|
||||
class Config(object):
|
||||
def __init__(self):
|
||||
self.version = "0.2.3"
|
||||
self.version = "0.2.4"
|
||||
self.parser = self.createArguments()
|
||||
argv = sys.argv[:] # Copy command line arguments
|
||||
argv = self.parseConfig(argv) # Add arguments from config file
|
||||
|
@ -52,6 +52,19 @@ class Config(object):
|
|||
action = subparsers.add_parser("siteVerify", help='Verify site files using sha512: address')
|
||||
action.add_argument('address', help='Site to verify')
|
||||
|
||||
# PeerPing
|
||||
action = subparsers.add_parser("peerPing", help='Send Ping command to peer')
|
||||
action.add_argument('peer_ip', help='Peer ip')
|
||||
action.add_argument('peer_port', help='Peer port')
|
||||
|
||||
# PeerGetFile
|
||||
action = subparsers.add_parser("peerGetFile", help='Request and print a file content from peer')
|
||||
action.add_argument('peer_ip', help='Peer ip')
|
||||
action.add_argument('peer_port', help='Peer port')
|
||||
action.add_argument('site', help='Site address')
|
||||
action.add_argument('filename', help='File name to request')
|
||||
|
||||
|
||||
|
||||
# Config parameters
|
||||
parser.add_argument('--debug', help='Debug mode', action='store_true')
|
||||
|
|
234
src/Connection/Connection.py
Normal file
234
src/Connection/Connection.py
Normal file
|
@ -0,0 +1,234 @@
|
|||
import logging, socket, time
|
||||
from cStringIO import StringIO
|
||||
import gevent, msgpack
|
||||
from Config import config
|
||||
from Debug import Debug
|
||||
try:
|
||||
import zmq.green as zmq
|
||||
except:
|
||||
zmq = None
|
||||
|
||||
class Connection:
|
||||
def __init__(self, server, ip, port, sock=None):
|
||||
self.sock = sock
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.peer_id = None # Bittorrent style peer id (not used yet)
|
||||
self.id = server.last_connection_id
|
||||
self.protocol = "?"
|
||||
server.last_connection_id += 1
|
||||
|
||||
self.server = server
|
||||
self.log = logging.getLogger(str(self))
|
||||
self.unpacker = msgpack.Unpacker() # Stream incoming socket messages here
|
||||
self.req_id = 0 # Last request id
|
||||
self.handshake = None # Handshake info got from peer
|
||||
self.event_handshake = gevent.event.AsyncResult() # Solves on handshake received
|
||||
self.closed = False
|
||||
|
||||
self.zmq_sock = None # Zeromq sock if outgoing connection
|
||||
self.zmq_queue = [] # Messages queued to send
|
||||
self.zmq_working = False # Zmq currently working, just add to queue
|
||||
self.forward_thread = None # Zmq forwarder thread
|
||||
|
||||
self.waiting_requests = {} # Waiting sent requests
|
||||
if not sock: self.connect() # Not an incoming connection, connect to peer
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s>" % self.__str__()
|
||||
|
||||
|
||||
# Open connection to peer and wait for handshake
|
||||
def connect(self):
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.connect((self.ip, self.port))
|
||||
# Detect protocol
|
||||
self.send({"cmd": "handshake", "req_id": 0, "params": self.handshakeInfo()})
|
||||
gevent.spawn(self.messageLoop)
|
||||
return self.event_handshake.get() # Wait for handshake
|
||||
|
||||
|
||||
|
||||
# Handle incoming connection
|
||||
def handleIncomingConnection(self, sock):
|
||||
firstchar = sock.recv(1) # Find out if pure socket or zeromq
|
||||
if firstchar == "\xff": # Backward compatiblity: forward data to zmq
|
||||
if config.debug_socket: self.log.debug("Fallback incoming connection to ZeroMQ")
|
||||
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.event_handshake.set(self.protocol)
|
||||
|
||||
if self.server.zmq_running:
|
||||
zmq_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
zmq_sock.connect(("127.0.0.1", self.server.zmq_port))
|
||||
zmq_sock.send(firstchar)
|
||||
|
||||
self.forward_thread = gevent.spawn(self.server.forward, self, zmq_sock, sock)
|
||||
self.server.forward(self, sock, zmq_sock)
|
||||
self.close() # Forward ended close connection
|
||||
else:
|
||||
self.config.debug("ZeroMQ Server not running, exiting!")
|
||||
else: # Normal socket
|
||||
self.messageLoop(firstchar)
|
||||
|
||||
|
||||
# Message loop for connection
|
||||
def messageLoop(self, firstchar=None):
|
||||
sock = self.sock
|
||||
if not firstchar: firstchar = sock.recv(1)
|
||||
if firstchar == "\xff": # Backward compatibility to zmq
|
||||
self.sock.close() # Close normal socket
|
||||
if zmq:
|
||||
if config.debug_socket: self.log.debug("Connecting as ZeroMQ")
|
||||
self.protocol = "zeromq"
|
||||
self.log.name = str(self)
|
||||
self.event_handshake.set(self.protocol) # Mark handshake as done
|
||||
|
||||
try:
|
||||
context = zmq.Context()
|
||||
zmq_sock = context.socket(zmq.REQ)
|
||||
zmq_sock.hwm = 1
|
||||
zmq_sock.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive
|
||||
zmq_sock.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send
|
||||
zmq_sock.setsockopt(zmq.LINGER, 500) # Wait for zmq_sock close
|
||||
zmq_sock.connect('tcp://%s:%s' % (self.ip, self.port))
|
||||
self.zmq_sock = zmq_sock
|
||||
except Exception, err:
|
||||
self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
else:
|
||||
return False # No zeromq connection supported
|
||||
else: # Normal socket
|
||||
self.protocol = "v2"
|
||||
self.log.name = str(self)
|
||||
self.event_handshake.set(self.protocol) # Mark handshake as done
|
||||
|
||||
unpacker = self.unpacker
|
||||
unpacker.feed(firstchar) # Feed the first char we already requested
|
||||
try:
|
||||
while True:
|
||||
buff = sock.recv(16*1024)
|
||||
if not buff: break # Connection closed
|
||||
unpacker.feed(buff)
|
||||
for message in unpacker:
|
||||
self.handleMessage(message)
|
||||
except Exception, err:
|
||||
self.log.debug("Socket error: %s" % Debug.formatException(err))
|
||||
self.close() # MessageLoop ended, close connection
|
||||
|
||||
|
||||
# Read one line (not used)
|
||||
def recvLine(self):
|
||||
sock = self.sock
|
||||
data = sock.recv(16*1024)
|
||||
if not data: return
|
||||
if not data.endswith("\n"): # Multipart, read until \n
|
||||
buff = StringIO()
|
||||
buff.write(data)
|
||||
while not data.endswith("\n"):
|
||||
data = sock.recv(16*1024)
|
||||
if not data: break
|
||||
buff.write(data)
|
||||
return buff.getvalue().strip("\n")
|
||||
|
||||
return data.strip("\n")
|
||||
|
||||
|
||||
# My handshake info
|
||||
def handshakeInfo(self):
|
||||
return {
|
||||
"version": config.version,
|
||||
"protocol": "v2",
|
||||
"peer_id": self.server.peer_id,
|
||||
"fileserver_port": config.fileserver_port
|
||||
}
|
||||
|
||||
|
||||
# Handle incoming message
|
||||
def handleMessage(self, message):
|
||||
if message.get("cmd") == "response": # New style response
|
||||
if message["to"] in self.waiting_requests:
|
||||
self.waiting_requests[message["to"]].set(message) # Set the response to event
|
||||
del self.waiting_requests[message["to"]]
|
||||
elif message["to"] == 0: # Other peers handshake
|
||||
if config.debug_socket: self.log.debug("Got handshake response: %s" % message)
|
||||
self.handshake = message
|
||||
self.port = message["fileserver_port"] # Set peer fileserver port
|
||||
else:
|
||||
self.log.debug("Unknown response: %s" % message)
|
||||
elif message.get("cmd"): # Handhsake request
|
||||
if message["cmd"] == "handshake":
|
||||
self.handshake = message["params"]
|
||||
self.port = self.handshake["fileserver_port"] # Set peer fileserver port
|
||||
if config.debug_socket: self.log.debug("Handshake request: %s" % message)
|
||||
data = self.handshakeInfo()
|
||||
data["cmd"] = "response"
|
||||
data["to"] = message["req_id"]
|
||||
self.send(data)
|
||||
else:
|
||||
self.server.handleRequest(self, message)
|
||||
else: # Old style response, no req_id definied
|
||||
if config.debug_socket: self.log.debug("Old style response, waiting: %s" % self.waiting_requests.keys())
|
||||
last_req_id = min(self.waiting_requests.keys()) # Get the oldest waiting request and set it true
|
||||
self.waiting_requests[last_req_id].set(message)
|
||||
del self.waiting_requests[last_req_id] # Remove from waiting request
|
||||
|
||||
|
||||
|
||||
# Send data to connection
|
||||
def send(self, data):
|
||||
if config.debug_socket: self.log.debug("Send: %s" % data.get("cmd"))
|
||||
if self.protocol == "zeromq":
|
||||
if self.zmq_sock: # Outgoing connection
|
||||
self.zmq_queue.append(data)
|
||||
if self.zmq_working:
|
||||
self.log.debug("ZeroMQ already working...")
|
||||
return
|
||||
while self.zmq_queue:
|
||||
self.zmq_working = True
|
||||
data = self.zmq_queue.pop(0)
|
||||
self.zmq_sock.send(msgpack.packb(data))
|
||||
self.handleMessage(msgpack.unpackb(self.zmq_sock.recv()))
|
||||
self.zmq_working = False
|
||||
|
||||
else: # Incoming request
|
||||
self.server.zmq_sock.send(msgpack.packb(data))
|
||||
else: # Normal connection
|
||||
self.sock.sendall(msgpack.packb(data))
|
||||
|
||||
|
||||
# Create and send a request to peer
|
||||
def request(self, cmd, params={}):
|
||||
self.req_id += 1
|
||||
data = {"cmd": cmd, "req_id": self.req_id, "params": params}
|
||||
event = gevent.event.AsyncResult() # Create new event for response
|
||||
self.waiting_requests[self.req_id] = event
|
||||
self.send(data) # Send request
|
||||
res = event.get() # Wait until event solves
|
||||
|
||||
return res
|
||||
|
||||
|
||||
# Close connection
|
||||
def close(self):
|
||||
if self.closed: return False # Already closed
|
||||
self.closed = True
|
||||
if config.debug_socket: self.log.debug("Closing connection, waiting_requests: %s..." % len(self.waiting_requests))
|
||||
for request in self.waiting_requests.values(): # Mark pending requests failed
|
||||
request.set(False)
|
||||
self.waiting_requests = {}
|
||||
self.server.removeConnection(self) # Remove connection from server registry
|
||||
try:
|
||||
if self.forward_thread:
|
||||
self.forward_thread.kill(exception=Debug.Notify("Closing connection"))
|
||||
if self.zmq_sock:
|
||||
self.zmq_sock.close()
|
||||
if self.sock:
|
||||
self.sock.shutdown(gevent.socket.SHUT_WR)
|
||||
self.sock.close()
|
||||
except Exception, err:
|
||||
if config.debug_socket: self.log.debug("Close error: %s" % Debug.formatException(err))
|
136
src/Connection/ConnectionBenchmark.py
Normal file
136
src/Connection/ConnectionBenchmark.py
Normal file
|
@ -0,0 +1,136 @@
|
|||
import time, socket, msgpack
|
||||
from cStringIO import StringIO
|
||||
|
||||
print "Connecting..."
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", 1234))
|
||||
|
||||
|
||||
print "1 Threaded: Send, receive 10000 ping request...",
|
||||
s = time.time()
|
||||
for i in range(10000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Ping"}))
|
||||
req = sock.recv(16*1024)
|
||||
print time.time()-s, repr(req), time.time()-s
|
||||
|
||||
|
||||
print "1 Threaded: Send, receive, decode 10000 ping request...",
|
||||
s = time.time()
|
||||
unpacker = msgpack.Unpacker()
|
||||
reqs = 0
|
||||
for i in range(10000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Ping"}))
|
||||
unpacker.feed(sock.recv(16*1024))
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
print "Found:", req, "x", reqs, time.time()-s
|
||||
|
||||
|
||||
print "1 Threaded: Send, receive, decode, reconnect 1000 ping request...",
|
||||
s = time.time()
|
||||
unpacker = msgpack.Unpacker()
|
||||
reqs = 0
|
||||
for i in range(1000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Ping"}))
|
||||
unpacker.feed(sock.recv(16*1024))
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
sock.close()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", 1234))
|
||||
print "Found:", req, "x", reqs, time.time()-s
|
||||
|
||||
|
||||
print "1 Threaded: Request, receive, decode 10000 x 10k data request...",
|
||||
s = time.time()
|
||||
unpacker = msgpack.Unpacker()
|
||||
reqs = 0
|
||||
for i in range(10000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
|
||||
|
||||
"""buff = StringIO()
|
||||
data = sock.recv(16*1024)
|
||||
buff.write(data)
|
||||
if not data:
|
||||
break
|
||||
while not data.endswith("\n"):
|
||||
data = sock.recv(16*1024)
|
||||
if not data: break
|
||||
buff.write(data)
|
||||
req = msgpack.unpackb(buff.getvalue().strip("\n"))
|
||||
reqs += 1"""
|
||||
|
||||
req_found = False
|
||||
while not req_found:
|
||||
buff = sock.recv(16*1024)
|
||||
unpacker.feed(buff)
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
req_found = True
|
||||
break # Only process one request
|
||||
print "Found:", len(req["res"]), "x", reqs, time.time()-s
|
||||
|
||||
|
||||
print "10 Threaded: Request, receive, decode 10000 x 10k data request...",
|
||||
import gevent
|
||||
s = time.time()
|
||||
reqs = 0
|
||||
req = None
|
||||
def requester():
|
||||
global reqs, req
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", 1234))
|
||||
unpacker = msgpack.Unpacker()
|
||||
for i in range(1000):
|
||||
sock.sendall(msgpack.packb({"cmd": "Bigdata"}))
|
||||
|
||||
req_found = False
|
||||
while not req_found:
|
||||
buff = sock.recv(16*1024)
|
||||
unpacker.feed(buff)
|
||||
for req in unpacker:
|
||||
reqs += 1
|
||||
req_found = True
|
||||
break # Only process one request
|
||||
|
||||
threads = []
|
||||
for i in range(10):
|
||||
threads.append(gevent.spawn(requester))
|
||||
gevent.joinall(threads)
|
||||
print "Found:", len(req["res"]), "x", reqs, time.time()-s
|
||||
|
||||
|
||||
print "1 Threaded: ZeroMQ Send, receive 1000 ping request...",
|
||||
s = time.time()
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context()
|
||||
zmq_sock = c.socket(zmq.REQ)
|
||||
zmq_sock.connect('tcp://127.0.0.1:1234')
|
||||
for i in range(1000):
|
||||
zmq_sock.send(msgpack.packb({"cmd": "Ping"}))
|
||||
req = zmq_sock.recv(16*1024)
|
||||
print "Found:", req, time.time()-s
|
||||
|
||||
|
||||
print "1 Threaded: ZeroMQ Send, receive 1000 x 10k data request...",
|
||||
s = time.time()
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context()
|
||||
zmq_sock = c.socket(zmq.REQ)
|
||||
zmq_sock.connect('tcp://127.0.0.1:1234')
|
||||
for i in range(1000):
|
||||
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
|
||||
req = msgpack.unpackb(zmq_sock.recv(1024*1024))
|
||||
print "Found:", len(req["res"]), time.time()-s
|
||||
|
||||
|
||||
print "1 Threaded: direct ZeroMQ Send, receive 1000 x 10k data request...",
|
||||
s = time.time()
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context()
|
||||
zmq_sock = c.socket(zmq.REQ)
|
||||
zmq_sock.connect('tcp://127.0.0.1:1233')
|
||||
for i in range(1000):
|
||||
zmq_sock.send(msgpack.packb({"cmd": "Bigdata"}))
|
||||
req = msgpack.unpackb(zmq_sock.recv(1024*1024))
|
||||
print "Found:", len(req["res"]), time.time()-s
|
231
src/Connection/ConnectionServer.py
Normal file
231
src/Connection/ConnectionServer.py
Normal file
|
@ -0,0 +1,231 @@
|
|||
from gevent.server import StreamServer
|
||||
from gevent.pool import Pool
|
||||
import socket, os, logging, random, string
|
||||
import gevent, msgpack
|
||||
import cStringIO as StringIO
|
||||
from Debug import Debug
|
||||
from Connection import Connection
|
||||
from Config import config
|
||||
|
||||
|
||||
class ConnectionServer:
|
||||
def __init__(self, ip=None, port=None, request_handler=None):
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.last_connection_id = 1 # Connection id incrementer
|
||||
self.log = logging.getLogger(__name__)
|
||||
|
||||
self.connections = [] # Connections
|
||||
self.ips = {} # Connection by ip
|
||||
self.peer_ids = {} # Connections by peer_ids
|
||||
|
||||
self.running = True
|
||||
self.zmq_running = False
|
||||
self.zmq_last_connection = None # Last incoming message client
|
||||
|
||||
self.peer_id = "-ZN0"+config.version.replace(".", "")+"-"+''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(12)) # Bittorrent style peerid
|
||||
|
||||
if port: # Listen server on a port
|
||||
self.zmq_port = port-1
|
||||
self.pool = Pool(1000) # do not accept more than 1000 connections
|
||||
self.stream_server = StreamServer((ip.replace("*", ""), port), self.handleIncomingConnection, spawn=self.pool, backlog=100)
|
||||
if request_handler: self.handleRequest = request_handler
|
||||
gevent.spawn(self.zmqServer) # Start ZeroMQ server for backward compatibility
|
||||
|
||||
|
||||
|
||||
def start(self):
|
||||
self.running = True
|
||||
try:
|
||||
self.log.debug("Binding to: %s:%s" % (self.ip, self.port))
|
||||
self.stream_server.serve_forever() # Start normal connection server
|
||||
except Exception, err:
|
||||
self.log.info("StreamServer bind error, must be running already: %s" % err)
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
self.stream_server.stop()
|
||||
|
||||
|
||||
def handleIncomingConnection(self, sock, addr):
|
||||
ip, port = addr
|
||||
connection = Connection(self, ip, port, sock)
|
||||
self.connections.append(connection)
|
||||
self.ips[ip] = connection
|
||||
connection.handleIncomingConnection(sock)
|
||||
|
||||
|
||||
|
||||
def connect(self, ip=None, port=None, peer_id=None):
|
||||
if peer_id and peer_id in self.peer_ids: # Find connection by peer id
|
||||
return self.peer_ids.get(peer_id)
|
||||
if ip in self.ips: # Find connection by ip
|
||||
return self.ips[ip]
|
||||
# No connection found yet
|
||||
try:
|
||||
connection = Connection(self, ip, port)
|
||||
self.ips[ip] = connection
|
||||
self.connections.append(connection)
|
||||
except Exception, err:
|
||||
self.log.debug("%s Connect error: %s" % (ip, Debug.formatException(err)))
|
||||
raise err
|
||||
return connection
|
||||
|
||||
|
||||
|
||||
def removeConnection(self, connection):
|
||||
if self.ips.get(connection.ip) == connection: # Delete if same as in registry
|
||||
del self.ips[connection.ip]
|
||||
if connection in self.connections:
|
||||
self.connections.remove(connection)
|
||||
if connection.peer_id and self.peer_ids.get(connection.peer_id) == connection: # Delete if same as in registry
|
||||
del self.peer_ids[connection.peer_id]
|
||||
|
||||
|
||||
def zmqServer(self):
|
||||
self.log.debug("Starting ZeroMQ on: tcp://127.0.0.1:%s..." % self.zmq_port)
|
||||
try:
|
||||
import zmq.green as zmq
|
||||
context = zmq.Context()
|
||||
self.zmq_sock = context.socket(zmq.REP)
|
||||
self.zmq_sock.bind("tcp://127.0.0.1:%s" % self.zmq_port)
|
||||
self.zmq_sock.hwm = 1
|
||||
self.zmq_sock.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive
|
||||
self.zmq_sock.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send
|
||||
self.zmq_running = True
|
||||
except Exception, err:
|
||||
self.log.debug("ZeroMQ start error: %s" % Debug.formatException(err))
|
||||
return False
|
||||
|
||||
while True:
|
||||
try:
|
||||
data = self.zmq_sock.recv()
|
||||
if not data: break
|
||||
message = msgpack.unpackb(data)
|
||||
self.zmq_last_connection.handleMessage(message)
|
||||
except Exception, err:
|
||||
self.log.debug("ZMQ Server error: %s" % Debug.formatException(err))
|
||||
self.zmq_sock.send(msgpack.packb({"error": "%s" % err}, use_bin_type=True))
|
||||
|
||||
|
||||
# Forward incoming data to other socket
|
||||
def forward(self, connection, source, dest):
|
||||
data = True
|
||||
try:
|
||||
while data:
|
||||
data = source.recv(16*1024)
|
||||
self.zmq_last_connection = connection
|
||||
if data:
|
||||
dest.sendall(data)
|
||||
else:
|
||||
source.shutdown(socket.SHUT_RD)
|
||||
dest.shutdown(socket.SHUT_WR)
|
||||
except Exception, err:
|
||||
self.log.debug("%s ZMQ forward error: %s" % (connection.ip, Debug.formatException(err)))
|
||||
connection.close()
|
||||
|
||||
|
||||
# -- TESTING --
|
||||
|
||||
def testCreateServer():
|
||||
global server
|
||||
server = ConnectionServer("127.0.0.1", 1234, testRequestHandler)
|
||||
server.start()
|
||||
|
||||
|
||||
def testRequestHandler(connection, req):
|
||||
print req
|
||||
if req["cmd"] == "Bigdata":
|
||||
connection.send({"res": "HelloWorld"*1024})
|
||||
else:
|
||||
connection.send({"res": "pong"})
|
||||
|
||||
|
||||
def testClient(num):
|
||||
time.sleep(1)
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect(("localhost", 1234))
|
||||
for i in range(10):
|
||||
print "[C%s] send..." % num
|
||||
s.sendall(msgpack.packb({"cmd": "[C] Ping"}))
|
||||
print "[C%s] recv..." % num
|
||||
print "[C%s] %s" % (num, repr(s.recv(1024)))
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def testSlowClient(num):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect(("localhost", 1234))
|
||||
for i in range(1):
|
||||
print "[C%s] send..." % num
|
||||
s.sendall(msgpack.packb({"cmd": "Bigdata"}))
|
||||
print "[C%s] recv..." % num
|
||||
gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s)
|
||||
while 1:
|
||||
data = s.recv(1000)
|
||||
if not data: break
|
||||
print "[C%s] %s" % (num, data)
|
||||
time.sleep(1)
|
||||
#s.sendall(msgpack.packb({"cmd": "[C] Ping"}))
|
||||
|
||||
|
||||
def testZmqClient(num):
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context(1)
|
||||
for i in range(10):
|
||||
s = c.socket(zmq.REQ)
|
||||
s.connect('tcp://127.0.0.1:1234')
|
||||
print "[Z%s] send..." % num
|
||||
s.send(msgpack.packb({"cmd": "[Z] Ping %s" % i}))
|
||||
print "[Z%s] recv..." % num
|
||||
print "[Z%s] %s" % (num, s.recv(1024))
|
||||
s.close()
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def testZmqSlowClient(num):
|
||||
import zmq.green as zmq
|
||||
c = zmq.Context(1)
|
||||
s = c.socket(zmq.REQ)
|
||||
for i in range(1):
|
||||
s.connect('tcp://127.0.0.1:1234')
|
||||
print "[Z%s] send..." % num
|
||||
s.send(msgpack.packb({"cmd": "Bigdata"}))
|
||||
print "[Z%s] recv..." % num
|
||||
#gevent.spawn_later(1, lambda s: s.send(msgpack.packb({"cmd": "[Z] Ping"})), s)
|
||||
while 1:
|
||||
data = s.recv(1024*1024)
|
||||
if not data: break
|
||||
print "[Z%s] %s" % (num, data)
|
||||
time.sleep(1)
|
||||
s.send(msgpack.packb({"cmd": "[Z] Ping"}))
|
||||
|
||||
|
||||
def testConnection():
|
||||
global server
|
||||
connection = server.connect("127.0.0.1", 1234)
|
||||
connection.send({"res": "Sending: Hello!"})
|
||||
print connection
|
||||
|
||||
|
||||
def greenletsNum():
|
||||
from greenlet import greenlet
|
||||
import gc
|
||||
while 1:
|
||||
print len([ob for ob in gc.get_objects() if isinstance(ob, greenlet)])
|
||||
time.sleep(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
from gevent import monkey; monkey.patch_all(thread=False)
|
||||
import sys, time
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
gevent.spawn(testZmqClient, 1)
|
||||
gevent.spawn(greenletsNum)
|
||||
#gevent.spawn(testClient, 1)
|
||||
#gevent.spawn_later(1, testConnection)
|
||||
print "Running server..."
|
||||
server = None
|
||||
testCreateServer()
|
||||
|
2
src/Connection/__init__.py
Normal file
2
src/Connection/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from ConnectionServer import ConnectionServer
|
||||
from Connection import Connection
|
|
@ -1,4 +1,4 @@
|
|||
import json, time, re, os
|
||||
import json, time, re, os, gevent
|
||||
from Debug import Debug
|
||||
from Crypt import CryptHash
|
||||
from Config import config
|
||||
|
@ -42,7 +42,7 @@ class ContentManager:
|
|||
|
||||
new_hash = info[hash_type]
|
||||
if old_content and old_content["files"].get(relative_path): # We have the file in the old content
|
||||
old_hash = old_content["files"][relative_path][hash_type]
|
||||
old_hash = old_content["files"][relative_path].get(hash_type)
|
||||
else: # The file is not in the old content
|
||||
old_hash = None
|
||||
if old_hash != new_hash: changed.append(content_dir+relative_path)
|
||||
|
@ -293,6 +293,7 @@ class ContentManager:
|
|||
return None
|
||||
elif old_content["modified"] > new_content["modified"]: # We have newer
|
||||
self.log.debug("We have newer %s (Our: %s, Sent: %s)" % (inner_path, old_content["modified"], new_content["modified"]))
|
||||
gevent.spawn(self.site.publish, inner_path=inner_path) # Try to fix the broken peers
|
||||
return False
|
||||
if new_content["modified"] > time.time()+60*60*24: # Content modified in the far future (allow 1 day window)
|
||||
self.log.error("%s modify is in the future!" % inner_path)
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import os, msgpack, shutil, gevent
|
||||
from Site import SiteManager
|
||||
from cStringIO import StringIO
|
||||
from Debug import Debug
|
||||
from Config import config
|
||||
|
@ -8,21 +7,30 @@ FILE_BUFF = 1024*512
|
|||
|
||||
# Request from me
|
||||
class FileRequest:
|
||||
def __init__(self, server = None):
|
||||
if server:
|
||||
self.server = server
|
||||
self.log = server.log
|
||||
self.sites = SiteManager.list()
|
||||
def __init__(self, server, connection):
|
||||
self.server = server
|
||||
self.connection = connection
|
||||
|
||||
self.req_id = None
|
||||
self.sites = self.server.sites
|
||||
self.log = server.log
|
||||
|
||||
|
||||
def send(self, msg):
|
||||
self.connection.send(msg)
|
||||
|
||||
|
||||
def response(self, msg):
|
||||
if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
|
||||
msg = {"body": msg}
|
||||
self.server.socket.send(msgpack.packb(msg, use_bin_type=True))
|
||||
msg["cmd"] = "response"
|
||||
msg["to"] = self.req_id
|
||||
self.send(msg)
|
||||
|
||||
|
||||
# Route file requests
|
||||
def route(self, cmd, params):
|
||||
def route(self, cmd, req_id, params):
|
||||
self.req_id = req_id
|
||||
if cmd == "getFile":
|
||||
self.actionGetFile(params)
|
||||
elif cmd == "update":
|
||||
|
@ -37,7 +45,7 @@ class FileRequest:
|
|||
def actionUpdate(self, params):
|
||||
site = self.sites.get(params["site"])
|
||||
if not site or not site.settings["serving"]: # Site unknown or not serving
|
||||
self.send({"error": "Unknown site"})
|
||||
self.response({"error": "Unknown site"})
|
||||
return False
|
||||
if site.settings["own"] and params["inner_path"].endswith("content.json"):
|
||||
self.log.debug("Someone trying to push a file to own site %s, reload local %s first" % (site.address, params["inner_path"]))
|
||||
|
@ -61,7 +69,7 @@ class FileRequest:
|
|||
lambda: site.downloadContent(params["inner_path"], peer=peer)
|
||||
) # Load new content file and download changed files in new thread
|
||||
|
||||
self.send({"ok": "Thanks, file %s updated!" % params["inner_path"]})
|
||||
self.response({"ok": "Thanks, file %s updated!" % params["inner_path"]})
|
||||
|
||||
elif valid == None: # Not changed
|
||||
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer
|
||||
|
@ -70,18 +78,18 @@ class FileRequest:
|
|||
for task in site.worker_manager.tasks: # New peer add to every ongoing task
|
||||
if task["peers"]: site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from this peer too if its peer locked
|
||||
|
||||
self.send({"ok": "File not changed"})
|
||||
self.response({"ok": "File not changed"})
|
||||
|
||||
else: # Invalid sign or sha1 hash
|
||||
self.log.debug("Update for %s is invalid" % params["inner_path"])
|
||||
self.send({"error": "File invalid"})
|
||||
self.response({"error": "File invalid"})
|
||||
|
||||
|
||||
# Send file content request
|
||||
def actionGetFile(self, params):
|
||||
site = self.sites.get(params["site"])
|
||||
if not site or not site.settings["serving"]: # Site unknown or not serving
|
||||
self.send({"error": "Unknown site"})
|
||||
self.response({"error": "Unknown site"})
|
||||
return False
|
||||
try:
|
||||
file_path = site.getPath(params["inner_path"])
|
||||
|
@ -93,18 +101,18 @@ class FileRequest:
|
|||
back["location"] = file.tell()
|
||||
back["size"] = os.fstat(file.fileno()).st_size
|
||||
if config.debug_socket: self.log.debug("Sending file %s from position %s to %s" % (file_path, params["location"], back["location"]))
|
||||
self.send(back)
|
||||
self.response(back)
|
||||
if config.debug_socket: self.log.debug("File %s sent" % file_path)
|
||||
except Exception, err:
|
||||
self.send({"error": "File read error: %s" % Debug.formatException(err)})
|
||||
self.response({"error": "File read error: %s" % Debug.formatException(err)})
|
||||
return False
|
||||
|
||||
|
||||
# Send a simple Pong! answer
|
||||
def actionPing(self):
|
||||
self.send("Pong!")
|
||||
self.response("Pong!")
|
||||
|
||||
|
||||
# Unknown command
|
||||
def actionUnknown(self, cmd, params):
|
||||
self.send({"error": "Unknown command: %s" % cmd})
|
||||
self.response({"error": "Unknown command: %s" % cmd})
|
||||
|
|
|
@ -5,30 +5,28 @@ from Config import config
|
|||
from FileRequest import FileRequest
|
||||
from Site import SiteManager
|
||||
from Debug import Debug
|
||||
from Connection import ConnectionServer
|
||||
|
||||
|
||||
class FileServer:
|
||||
class FileServer(ConnectionServer):
|
||||
def __init__(self):
|
||||
self.ip = config.fileserver_ip
|
||||
self.port = config.fileserver_port
|
||||
self.log = logging.getLogger(__name__)
|
||||
ConnectionServer.__init__(self, config.fileserver_ip, config.fileserver_port, self.handleRequest)
|
||||
if config.ip_external: # Ip external definied in arguments
|
||||
self.port_opened = True
|
||||
SiteManager.peer_blacklist.append((config.ip_external, self.port)) # Add myself to peer blacklist
|
||||
else:
|
||||
self.port_opened = None # Is file server opened on router
|
||||
self.sites = SiteManager.list()
|
||||
self.running = True
|
||||
|
||||
|
||||
# Handle request to fileserver
|
||||
def handleRequest(self, msg):
|
||||
if "params" in msg:
|
||||
self.log.debug("FileRequest: %s %s %s" % (msg["cmd"], msg["params"].get("site"), msg["params"].get("inner_path")))
|
||||
def handleRequest(self, connection, message):
|
||||
if "params" in message:
|
||||
self.log.debug("FileRequest: %s %s %s" % (message["cmd"], message["params"].get("site"), message["params"].get("inner_path")))
|
||||
else:
|
||||
self.log.debug("FileRequest: %s" % msg["cmd"])
|
||||
req = FileRequest(self)
|
||||
req.route(msg["cmd"], msg.get("params"))
|
||||
self.log.debug("FileRequest: %s" % req["cmd"])
|
||||
req = FileRequest(self, connection)
|
||||
req.route(message["cmd"], message.get("req_id"), message.get("params"))
|
||||
|
||||
|
||||
# Reload the FileRequest class to prevent restarts in debug mode
|
||||
|
@ -124,13 +122,15 @@ class FileServer:
|
|||
time.sleep(2) # Prevent too quick request
|
||||
|
||||
|
||||
# Announce sites every 10 min
|
||||
# Announce sites every 20 min
|
||||
def announceSites(self):
|
||||
while 1:
|
||||
time.sleep(20*60) # Announce sites every 20 min
|
||||
for address, site in self.sites.items():
|
||||
if site.settings["serving"]:
|
||||
site.announce() # Announce site to tracker
|
||||
for inner_path in site.bad_files: # Reset bad file retry counter
|
||||
site.bad_files[inner_path] = 0
|
||||
time.sleep(2) # Prevent too quick request
|
||||
|
||||
|
||||
|
@ -155,40 +155,14 @@ class FileServer:
|
|||
from Debug import DebugReloader
|
||||
DebugReloader(self.reload)
|
||||
|
||||
self.context = zmq.Context()
|
||||
socket = self.context.socket(zmq.REP)
|
||||
self.socket = socket
|
||||
self.socket.setsockopt(zmq.RCVTIMEO, 5000) # Wait for data receive
|
||||
self.socket.setsockopt(zmq.SNDTIMEO, 50000) # Wait for data send
|
||||
self.log.info("Binding to tcp://%s:%s" % (self.ip, self.port))
|
||||
try:
|
||||
self.socket.bind('tcp://%s:%s' % (self.ip, self.port))
|
||||
except Exception, err:
|
||||
self.log.error("Can't bind, FileServer must be running already")
|
||||
return
|
||||
if check_sites: # Open port, Update sites, Check files integrity
|
||||
gevent.spawn(self.checkSites)
|
||||
|
||||
thread_announce_sites = gevent.spawn(self.announceSites)
|
||||
thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
ret = {}
|
||||
req = msgpack.unpackb(socket.recv())
|
||||
self.handleRequest(req)
|
||||
except Exception, err:
|
||||
self.log.error(err)
|
||||
if self.running: self.socket.send(msgpack.packb({"error": "%s" % Debug.formatException(err)}, use_bin_type=True))
|
||||
if config.debug: # Raise exception
|
||||
import sys
|
||||
sys.modules["src.main"].DebugHook.handleError()
|
||||
thread_wakeup_watcher.kill(exception=Debug.Notify("Stopping FileServer"))
|
||||
thread_announce_sites.kill(exception=Debug.Notify("Stopping FileServer"))
|
||||
ConnectionServer.start(self)
|
||||
|
||||
# thread_wakeup_watcher.kill(exception=Debug.Notify("Stopping FileServer"))
|
||||
# thread_announce_sites.kill(exception=Debug.Notify("Stopping FileServer"))
|
||||
self.log.debug("Stopped.")
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
self.socket.close()
|
||||
|
||||
|
|
|
@ -1,21 +1,20 @@
|
|||
import os, logging, gevent, time, msgpack
|
||||
import os, logging, gevent, time, msgpack, sys
|
||||
import zmq.green as zmq
|
||||
from cStringIO import StringIO
|
||||
from Config import config
|
||||
from Debug import Debug
|
||||
|
||||
context = zmq.Context()
|
||||
|
||||
# Communicate remote peers
|
||||
class Peer:
|
||||
def __init__(self, ip, port, site):
|
||||
def __init__(self, ip, port, site=None):
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.site = site
|
||||
self.key = "%s:%s" % (ip, port)
|
||||
self.log = None
|
||||
self.connection_server = sys.modules["src.main"].file_server
|
||||
|
||||
self.socket = None
|
||||
self.connection = None
|
||||
self.last_found = None # Time of last found in the torrent tracker
|
||||
self.last_response = None # Time of last successfull response from peer
|
||||
self.last_ping = None # Last response time for ping
|
||||
|
@ -29,19 +28,22 @@ class Peer:
|
|||
|
||||
# Connect to host
|
||||
def connect(self):
|
||||
if self.connection: self.connection.close()
|
||||
self.connection = None
|
||||
if not self.log: self.log = logging.getLogger("Peer:%s:%s" % (self.ip, self.port))
|
||||
if self.socket: self.socket.close()
|
||||
|
||||
self.socket = context.socket(zmq.REQ)
|
||||
self.socket.setsockopt(zmq.RCVTIMEO, 50000) # Wait for data arrive
|
||||
self.socket.setsockopt(zmq.SNDTIMEO, 5000) # Wait for data send
|
||||
self.socket.setsockopt(zmq.LINGER, 500) # Wait for socket close
|
||||
# self.socket.setsockopt(zmq.TCP_KEEPALIVE, 1) # Enable keepalive
|
||||
# self.socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 4*60) # Send after 4 minute idle
|
||||
# self.socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 15) # Wait 15 sec to response
|
||||
# self.socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 4) # 4 Probes
|
||||
self.socket.connect('tcp://%s:%s' % (self.ip, self.port))
|
||||
self.log.debug("Connecting...")
|
||||
try:
|
||||
self.connection = self.connection_server.connect(self.ip, self.port)
|
||||
except Exception, err:
|
||||
self.log.debug("Connecting error: %s" % Debug.formatException(err))
|
||||
self.onConnectionError()
|
||||
|
||||
def __str__(self):
|
||||
return "Peer %-12s" % self.ip
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s>" % self.__str__()
|
||||
|
||||
# Found a peer on tracker
|
||||
def found(self):
|
||||
|
@ -49,18 +51,20 @@ class Peer:
|
|||
|
||||
|
||||
# Send a command to peer
|
||||
def sendCmd(self, cmd, params = {}):
|
||||
if not self.socket: self.connect()
|
||||
def request(self, cmd, params = {}):
|
||||
if not self.connection or self.connection.closed:
|
||||
self.connect()
|
||||
if not self.connection: return None # Connection failed
|
||||
|
||||
if cmd != "ping" and self.last_response and time.time() - self.last_response > 20*60: # If last response if older than 20 minute, ping first to see if still alive
|
||||
if not self.ping(): return None
|
||||
|
||||
for retry in range(1,3): # Retry 3 times
|
||||
if config.debug_socket: self.log.debug("sendCmd: %s %s" % (cmd, params.get("inner_path")))
|
||||
#if config.debug_socket: self.log.debug("sendCmd: %s %s" % (cmd, params.get("inner_path")))
|
||||
try:
|
||||
self.socket.send(msgpack.packb({"cmd": cmd, "params": params}, use_bin_type=True))
|
||||
if config.debug_socket: self.log.debug("Sent command: %s" % cmd)
|
||||
response = msgpack.unpackb(self.socket.recv())
|
||||
if config.debug_socket: self.log.debug("Got response to: %s" % cmd)
|
||||
response = self.connection.request(cmd, params)
|
||||
if not response: raise Exception("Send error")
|
||||
#if config.debug_socket: self.log.debug("Got response to: %s" % cmd)
|
||||
if "error" in response:
|
||||
self.log.debug("%s error: %s" % (cmd, response["error"]))
|
||||
self.onConnectionError()
|
||||
|
@ -69,13 +73,14 @@ class Peer:
|
|||
self.last_response = time.time()
|
||||
return response
|
||||
except Exception, err:
|
||||
self.onConnectionError()
|
||||
self.log.debug("%s (connection_error: %s, hash_failed: %s, retry: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed, retry))
|
||||
time.sleep(1*retry)
|
||||
self.connect()
|
||||
if type(err).__name__ == "Notify" and err.message == "Worker stopped": # Greenlet kill by worker
|
||||
self.log.debug("Peer worker got killed, aborting cmd: %s" % cmd)
|
||||
if type(err).__name__ == "Notify": # Greenlet kill by worker
|
||||
self.log.debug("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
|
||||
break
|
||||
else:
|
||||
self.onConnectionError()
|
||||
self.log.debug("%s (connection_error: %s, hash_failed: %s, retry: %s)" % (Debug.formatException(err), self.connection_error, self.hash_failed, retry))
|
||||
time.sleep(1*retry)
|
||||
self.connect()
|
||||
return None # Failed after 4 retry
|
||||
|
||||
|
||||
|
@ -85,7 +90,7 @@ class Peer:
|
|||
buff = StringIO()
|
||||
s = time.time()
|
||||
while 1: # Read in 512k parts
|
||||
back = self.sendCmd("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location
|
||||
back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location
|
||||
if not back or "body" not in back: # Error
|
||||
return False
|
||||
|
||||
|
@ -106,7 +111,8 @@ class Peer:
|
|||
for retry in range(1,3): # Retry 3 times
|
||||
s = time.time()
|
||||
with gevent.Timeout(10.0, False): # 10 sec timeout, dont raise exception
|
||||
response = self.sendCmd("ping")
|
||||
response = self.request("ping")
|
||||
|
||||
if response and "body" in response and response["body"] == "Pong!":
|
||||
response_time = time.time()-s
|
||||
break # All fine, exit from for loop
|
||||
|
@ -126,7 +132,8 @@ class Peer:
|
|||
def remove(self):
|
||||
self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
||||
if self.key in self.site.peers: del(self.site.peers[self.key])
|
||||
self.socket.close()
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
|
||||
|
||||
# - EVENTS -
|
||||
|
|
|
@ -27,7 +27,7 @@ class Site:
|
|||
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
|
||||
self.last_announce = 0 # Last announce time to tracker
|
||||
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
||||
self.bad_files = {} # SHA512 check failed files, need to redownload
|
||||
self.bad_files = {} # SHA512 check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
||||
self.content_updated = None # Content.js update time
|
||||
self.last_downloads = [] # Files downloaded in run of self.download()
|
||||
self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
|
||||
|
@ -115,28 +115,39 @@ class Site:
|
|||
changed = self.content_manager.loadContent(inner_path, load_includes=False)
|
||||
|
||||
# Start download files
|
||||
evts = []
|
||||
file_threads = []
|
||||
if download_files:
|
||||
for file_relative_path in self.content_manager.contents[inner_path].get("files", {}).keys():
|
||||
file_inner_path = content_inner_dir+file_relative_path
|
||||
res = self.needFile(file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer) # No waiting for finish, return the event
|
||||
if res != True: # Need downloading
|
||||
self.last_downloads.append(file_inner_path)
|
||||
evts.append(res) # Append evt
|
||||
file_threads.append(res) # Append evt
|
||||
|
||||
# Wait for includes download
|
||||
include_threads = []
|
||||
for file_relative_path in self.content_manager.contents[inner_path].get("includes", {}).keys():
|
||||
file_inner_path = content_inner_dir+file_relative_path
|
||||
self.downloadContent(file_inner_path, download_files=download_files, peer=peer)
|
||||
include_thread = gevent.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer)
|
||||
include_threads.append(include_thread)
|
||||
|
||||
self.log.debug("%s: Downloading %s includes..." % (inner_path, len(include_threads)))
|
||||
gevent.joinall(include_threads)
|
||||
self.log.debug("%s: Includes downloaded" % inner_path)
|
||||
self.log.debug("%s: Downloading %s files..." % (inner_path, len(evts)))
|
||||
gevent.joinall(evts)
|
||||
|
||||
self.log.debug("%s: Downloading %s files..." % (inner_path, len(file_threads)))
|
||||
gevent.joinall(file_threads)
|
||||
self.log.debug("%s: All file downloaded in %.2fs" % (inner_path, time.time()-s))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Return bad files with less than 3 retry
|
||||
def getReachableBadFiles(self):
|
||||
if not self.bad_files: return False
|
||||
return [bad_file for bad_file, retry in self.bad_files.iteritems() if retry < 3]
|
||||
|
||||
|
||||
# Download all files of the site
|
||||
@util.Noparallel(blocking=False)
|
||||
def download(self, check_size=False):
|
||||
|
@ -163,7 +174,7 @@ class Site:
|
|||
changed = self.content_manager.loadContent("content.json")
|
||||
if changed:
|
||||
for changed_file in changed:
|
||||
self.bad_files[changed_file] = True
|
||||
self.bad_files[changed_file] = self.bad_files.get(changed_file, 0)+1
|
||||
if not self.settings["own"]: self.checkFiles(quick_check=True) # Quick check files based on file size
|
||||
if self.bad_files:
|
||||
self.download()
|
||||
|
@ -178,16 +189,19 @@ class Site:
|
|||
if not peers or len(published) >= limit: break # All peers done, or published engouht
|
||||
peer = peers.pop(0)
|
||||
result = {"exception": "Timeout"}
|
||||
try:
|
||||
with gevent.Timeout(timeout, False):
|
||||
result = peer.sendCmd("update", {
|
||||
"site": self.address,
|
||||
"inner_path": inner_path,
|
||||
"body": open(self.getPath(inner_path), "rb").read(),
|
||||
"peer": (config.ip_external, config.fileserver_port)
|
||||
})
|
||||
except Exception, err:
|
||||
result = {"exception": Debug.formatException(err)}
|
||||
|
||||
for retry in range(2):
|
||||
try:
|
||||
with gevent.Timeout(timeout, False):
|
||||
result = peer.request("update", {
|
||||
"site": self.address,
|
||||
"inner_path": inner_path,
|
||||
"body": open(self.getPath(inner_path), "rb").read(),
|
||||
"peer": (config.ip_external, config.fileserver_port)
|
||||
})
|
||||
if result: break
|
||||
except Exception, err:
|
||||
result = {"exception": Debug.formatException(err)}
|
||||
|
||||
if result and "ok" in result:
|
||||
published.append(peer)
|
||||
|
@ -202,6 +216,8 @@ class Site:
|
|||
published = [] # Successfuly published (Peer)
|
||||
publishers = [] # Publisher threads
|
||||
peers = self.peers.values()
|
||||
|
||||
random.shuffle(peers)
|
||||
for i in range(limit):
|
||||
publisher = gevent.spawn(self.publisher, inner_path, peers, published, limit)
|
||||
publishers.append(publisher)
|
||||
|
@ -303,7 +319,7 @@ class Site:
|
|||
bad_files = self.verifyFiles(quick_check)
|
||||
if bad_files:
|
||||
for bad_file in bad_files:
|
||||
self.bad_files[bad_file] = True
|
||||
self.bad_files[bad_file] = self.bad_files.get("bad_file", 0)+1
|
||||
|
||||
|
||||
def deleteFiles(self):
|
||||
|
@ -387,6 +403,8 @@ class Site:
|
|||
if inner_path == "content.json":
|
||||
self.content_updated = False
|
||||
self.log.error("Can't update content.json")
|
||||
if inner_path in self.bad_files:
|
||||
self.bad_files[inner_path] = self.bad_files.get(inner_path, 0)+1
|
||||
|
||||
self.updateWebsocket(file_failed=inner_path)
|
||||
|
||||
|
|
|
@ -46,6 +46,8 @@ class UiRequest:
|
|||
return self.actionDebug()
|
||||
elif path == "/Console" and config.debug:
|
||||
return self.actionConsole()
|
||||
elif path == "/Stats":
|
||||
return self.actionStats()
|
||||
# Test
|
||||
elif path == "/Test/Websocket":
|
||||
return self.actionFile("Data/temp/ws_test.html")
|
||||
|
@ -114,7 +116,7 @@ class UiRequest:
|
|||
if not inner_path: inner_path = "index.html" # If inner path defaults to index.html
|
||||
|
||||
site = self.server.sites.get(match.group("site"))
|
||||
if site and site.content_manager.contents.get("content.json") and (not site.bad_files or site.settings["own"]): # Its downloaded or own
|
||||
if site and site.content_manager.contents.get("content.json") and (not site.getReachableBadFiles() or site.settings["own"]): # Its downloaded or own
|
||||
title = site.content_manager.contents["content.json"]["title"]
|
||||
else:
|
||||
title = "Loading %s..." % match.group("site")
|
||||
|
@ -268,10 +270,30 @@ class UiRequest:
|
|||
|
||||
# Just raise an error to get console
|
||||
def actionConsole(self):
|
||||
import sys
|
||||
sites = self.server.sites
|
||||
main = sys.modules["src.main"]
|
||||
raise Exception("Here is your console")
|
||||
|
||||
|
||||
def actionStats(self):
|
||||
import gc, sys
|
||||
from greenlet import greenlet
|
||||
greenlets = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
|
||||
self.sendHeader()
|
||||
main = sys.modules["src.main"]
|
||||
|
||||
yield "<pre>"
|
||||
yield "Connections (%s):<br>" % len(main.file_server.connections)
|
||||
for connection in main.file_server.connections:
|
||||
yield "%s: %s %s<br>" % (connection.protocol, connection.ip, connection.zmq_sock)
|
||||
|
||||
yield "Greenlets (%s):<br>" % len(greenlets)
|
||||
for thread in greenlets:
|
||||
yield " - %s<br>" % cgi.escape(repr(thread))
|
||||
yield "</pre>"
|
||||
|
||||
|
||||
# - Tests -
|
||||
|
||||
def actionTestStream(self):
|
||||
|
|
|
@ -94,11 +94,12 @@ class UiServer:
|
|||
browser = webbrowser.get(config.open_browser)
|
||||
browser.open("http://%s:%s" % (config.ui_ip, config.ui_port), new=2)
|
||||
|
||||
self.server = WSGIServer((self.ip, self.port), handler, handler_class=UiWSGIHandler, log=self.log)
|
||||
self.server = WSGIServer((self.ip.replace("*", ""), self.port), handler, handler_class=UiWSGIHandler, log=self.log)
|
||||
self.server.sockets = {}
|
||||
self.server.serve_forever()
|
||||
self.log.debug("Stopped.")
|
||||
|
||||
|
||||
def stop(self):
|
||||
# Close WS sockets
|
||||
for client in self.server.clients.values():
|
||||
|
|
|
@ -53,7 +53,8 @@ class Worker:
|
|||
self.manager.doneTask(task)
|
||||
self.task = None
|
||||
else: # Hash failed
|
||||
self.manager.log.debug("%s: Hash failed: %s" % (self.key, task["inner_path"]))
|
||||
self.manager.log.debug("%s: Hash failed: %s, failed peers: %s" % (self.key, task["inner_path"], len(task["failed"])))
|
||||
task["failed"].append(self.key)
|
||||
self.task = None
|
||||
self.peer.hash_failed += 1
|
||||
if self.peer.hash_failed >= 3: # Broken peer
|
||||
|
|
|
@ -20,8 +20,8 @@ class WorkerManager:
|
|||
time.sleep(15) # Check every 15 sec
|
||||
|
||||
# Clean up workers
|
||||
if not self.tasks and self.workers: # No task but workers still running
|
||||
for worker in self.workers.values(): worker.stop()
|
||||
for worker in self.workers.values():
|
||||
if worker.task and worker.task["done"]: worker.stop() # Stop workers with task done
|
||||
|
||||
if not self.tasks: continue
|
||||
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
|
||||
|
@ -38,7 +38,7 @@ class WorkerManager:
|
|||
elif (task["time_started"] and time.time() >= task["time_started"]+15) or not self.workers: # Task started more than 15 sec ago or no workers
|
||||
self.log.debug("Task taking more than 15 secs, find more peers: %s" % task["inner_path"])
|
||||
task["site"].announce() # Find more peers
|
||||
if task["peers"]: # Release the peer olck
|
||||
if task["peers"]: # Release the peer lock
|
||||
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
||||
task["peers"] = []
|
||||
self.startWorkers()
|
||||
|
@ -62,6 +62,7 @@ class WorkerManager:
|
|||
self.tasks.sort(key=self.taskSorter, reverse=True) # Sort tasks by priority and worker numbers
|
||||
for task in self.tasks: # Find a task
|
||||
if task["peers"] and peer not in task["peers"]: continue # This peer not allowed to pick this task
|
||||
if peer.key in task["failed"]: continue # Peer already tried to solve this, but failed
|
||||
return task
|
||||
|
||||
|
||||
|
@ -85,9 +86,9 @@ class WorkerManager:
|
|||
|
||||
# Start workers to process tasks
|
||||
def startWorkers(self, peers=None):
|
||||
if len(self.workers) >= MAX_WORKERS and not peers: return False # Workers number already maxed
|
||||
if len(self.workers) >= MAX_WORKERS and not peers: return False # Workers number already maxed and no starting peers definied
|
||||
if not self.tasks: return False # No task for workers
|
||||
peers = self.site.peers.values()
|
||||
if not peers: peers = self.site.peers.values() # No peers definied, use any from site
|
||||
random.shuffle(peers)
|
||||
for peer in peers: # One worker for every peer
|
||||
if peers and peer not in peers: continue # If peers definied and peer not valid
|
||||
|
@ -139,7 +140,7 @@ class WorkerManager:
|
|||
peers = [peer] # Only download from this peer
|
||||
else:
|
||||
peers = None
|
||||
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority}
|
||||
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority, "failed": []}
|
||||
self.tasks.append(task)
|
||||
self.log.debug("New task: %s, peer lock: %s, priority: %s" % (task["inner_path"], peers, priority))
|
||||
self.startWorkers(peers)
|
||||
|
@ -168,5 +169,5 @@ class WorkerManager:
|
|||
self.tasks.remove(task) # Remove from queue
|
||||
self.site.onFileDone(task["inner_path"])
|
||||
task["evt"].set(True)
|
||||
if not self.tasks: self.site.onComplete() # No more task trigger site compelte
|
||||
if not self.tasks: self.site.onComplete() # No more task trigger site complete
|
||||
|
||||
|
|
26
src/main.py
26
src/main.py
|
@ -150,8 +150,10 @@ def siteNeedFile(address, inner_path):
|
|||
|
||||
|
||||
def sitePublish(address, peer_ip=None, peer_port=15441, inner_path="content.json"):
|
||||
global file_server
|
||||
from Site import Site
|
||||
from File import FileServer # We need fileserver to handle incoming file requests
|
||||
|
||||
logging.info("Creating FileServer....")
|
||||
file_server = FileServer()
|
||||
file_server_thread = gevent.spawn(file_server.start, check_sites=False) # Dont check every site integrity
|
||||
|
@ -184,10 +186,15 @@ def cryptoPrivatekeyToAddress(privatekey=None):
|
|||
|
||||
# Peer
|
||||
|
||||
def peerPing(ip, port):
|
||||
def peerPing(peer_ip, peer_port):
|
||||
logging.info("Opening a simple connection server")
|
||||
global file_server
|
||||
from Connection import ConnectionServer
|
||||
file_server = ConnectionServer("127.0.0.1", 1234)
|
||||
|
||||
from Peer import Peer
|
||||
logging.info("Pinging 5 times peer: %s:%s..." % (ip, port))
|
||||
peer = Peer(ip, port)
|
||||
logging.info("Pinging 5 times peer: %s:%s..." % (peer_ip, peer_port))
|
||||
peer = Peer(peer_ip, peer_port)
|
||||
for i in range(5):
|
||||
s = time.time()
|
||||
print peer.ping(),
|
||||
|
@ -195,12 +202,15 @@ def peerPing(ip, port):
|
|||
time.sleep(1)
|
||||
|
||||
|
||||
def peerGetFile(ip, port, site, filename=None):
|
||||
def peerGetFile(peer_ip, peer_port, site, filename):
|
||||
logging.info("Opening a simple connection server")
|
||||
global file_server
|
||||
from Connection import ConnectionServer
|
||||
file_server = ConnectionServer()
|
||||
|
||||
from Peer import Peer
|
||||
if not site: site = config.homepage
|
||||
if not filename: filename = "content.json"
|
||||
logging.info("Getting %s/%s from peer: %s:%s..." % (site, filename, ip, port))
|
||||
peer = Peer(ip, port)
|
||||
logging.info("Getting %s/%s from peer: %s:%s..." % (site, filename, peer_ip, peer_port))
|
||||
peer = Peer(peer_ip, peer_port)
|
||||
s = time.time()
|
||||
print peer.getFile(site, filename).read()
|
||||
print "Response time: %.3fs" % (time.time()-s)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
class Event(list):
|
||||
def __call__(self, *args, **kwargs):
|
||||
for f in self[:]:
|
||||
if "once" in dir(f):
|
||||
if "once" in dir(f) and f in self:
|
||||
self.remove(f)
|
||||
f(*args, **kwargs)
|
||||
|
||||
|
|
Binary file not shown.
|
@ -35,3 +35,4 @@ def main():
|
|||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
|
Loading…
Reference in a new issue