count peer connection error, remove offline peers, content sign modification time fix, tracker order fix, reset peer hash_failed on download start, avoid util package name conflict
This commit is contained in:
parent
6424c82887
commit
3f974e0bc7
5 changed files with 36 additions and 15 deletions
|
@ -7,13 +7,15 @@ context = zmq.Context()
|
||||||
|
|
||||||
# Communicate remote peers
|
# Communicate remote peers
|
||||||
class Peer:
|
class Peer:
|
||||||
def __init__(self, ip, port):
|
def __init__(self, ip, port, site):
|
||||||
self.ip = ip
|
self.ip = ip
|
||||||
self.port = port
|
self.port = port
|
||||||
|
self.site = site
|
||||||
self.socket = None
|
self.socket = None
|
||||||
self.last_found = None
|
self.last_found = None
|
||||||
self.added = time.time()
|
self.added = time.time()
|
||||||
|
|
||||||
|
self.connection_error = 0
|
||||||
self.hash_failed = 0
|
self.hash_failed = 0
|
||||||
self.download_bytes = 0
|
self.download_bytes = 0
|
||||||
self.download_time = 0
|
self.download_time = 0
|
||||||
|
@ -28,11 +30,6 @@ class Peer:
|
||||||
self.socket.connect('tcp://%s:%s' % (self.ip, self.port))
|
self.socket.connect('tcp://%s:%s' % (self.ip, self.port))
|
||||||
|
|
||||||
|
|
||||||
# Done working with peer
|
|
||||||
def disconnect(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# Found a peer on tracker
|
# Found a peer on tracker
|
||||||
def found(self):
|
def found(self):
|
||||||
self.last_found = time.time()
|
self.last_found = time.time()
|
||||||
|
@ -45,9 +42,12 @@ class Peer:
|
||||||
self.socket.send(msgpack.packb({"cmd": cmd, "params": params}, use_bin_type=True))
|
self.socket.send(msgpack.packb({"cmd": cmd, "params": params}, use_bin_type=True))
|
||||||
response = msgpack.unpackb(self.socket.recv())
|
response = msgpack.unpackb(self.socket.recv())
|
||||||
if "error" in response:
|
if "error" in response:
|
||||||
self.log.error("%s %s error: %s" % (cmd, params, response["error"]))
|
self.log.debug("%s %s error: %s" % (cmd, params, response["error"]))
|
||||||
|
else: # Successful request, reset connection error num
|
||||||
|
self.connection_error = 0
|
||||||
return response
|
return response
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
|
self.onConnectionError()
|
||||||
self.log.error("%s" % err)
|
self.log.error("%s" % err)
|
||||||
if config.debug:
|
if config.debug:
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -65,7 +65,7 @@ class Peer:
|
||||||
s = time.time()
|
s = time.time()
|
||||||
while 1: # Read in 512k parts
|
while 1: # Read in 512k parts
|
||||||
back = self.sendCmd("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location
|
back = self.sendCmd("getFile", {"site": site, "inner_path": inner_path, "location": location}) # Get file content from last location
|
||||||
if "body" not in back: # Error
|
if not back or "body" not in back: # Error
|
||||||
return False
|
return False
|
||||||
|
|
||||||
buff.write(back["body"])
|
buff.write(back["body"])
|
||||||
|
@ -82,3 +82,24 @@ class Peer:
|
||||||
# Send a ping request
|
# Send a ping request
|
||||||
def ping(self):
|
def ping(self):
|
||||||
return self.sendCmd("ping")
|
return self.sendCmd("ping")
|
||||||
|
|
||||||
|
|
||||||
|
# Stop and remove from site
|
||||||
|
def remove(self):
|
||||||
|
self.log.debug("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
||||||
|
del(self.site.peers[self.key])
|
||||||
|
self.socket.close()
|
||||||
|
|
||||||
|
|
||||||
|
# - EVENTS -
|
||||||
|
|
||||||
|
# On connection error
|
||||||
|
def onConnectionError(self):
|
||||||
|
self.connection_error += 1
|
||||||
|
if self.connection_error > 5: # Dead peer
|
||||||
|
self.remove()
|
||||||
|
|
||||||
|
|
||||||
|
# Done working with peer
|
||||||
|
def onWorkerDone(self):
|
||||||
|
pass
|
||||||
|
|
|
@ -211,7 +211,7 @@ class Site:
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
else: # New peer
|
else: # New peer
|
||||||
peer = Peer(ip, port)
|
peer = Peer(ip, port, self)
|
||||||
self.peers[key] = peer
|
self.peers[key] = peer
|
||||||
return peer
|
return peer
|
||||||
|
|
||||||
|
@ -403,12 +403,11 @@ class Site:
|
||||||
|
|
||||||
# Generate new content.json
|
# Generate new content.json
|
||||||
self.log.info("Adding timestamp and sha1sums to new content.json...")
|
self.log.info("Adding timestamp and sha1sums to new content.json...")
|
||||||
import datetime, time
|
|
||||||
|
|
||||||
content = self.content.copy() # Create a copy of current content.json
|
content = self.content.copy() # Create a copy of current content.json
|
||||||
content["address"] = self.address # Add files sha1 hash
|
content["address"] = self.address # Add files sha1 hash
|
||||||
content["files"] = hashed_files # Add files sha1 hash
|
content["files"] = hashed_files # Add files sha1 hash
|
||||||
content["modified"] = time.mktime(datetime.datetime.utcnow().utctimetuple()) # Add timestamp
|
content["modified"] = time.time() # Add timestamp
|
||||||
del(content["sign"]) # Delete old site
|
del(content["sign"]) # Delete old site
|
||||||
|
|
||||||
# Signing content
|
# Signing content
|
||||||
|
|
|
@ -2,9 +2,9 @@ import json, logging, time, re, os
|
||||||
import gevent
|
import gevent
|
||||||
|
|
||||||
TRACKERS = [
|
TRACKERS = [
|
||||||
|
("udp", "sugoi.pomf.se", 2710), # Retry 3 times
|
||||||
|
("udp", "sugoi.pomf.se", 2710),
|
||||||
("udp", "sugoi.pomf.se", 2710),
|
("udp", "sugoi.pomf.se", 2710),
|
||||||
("udp", "open.demonii.com", 1337), # Retry 3 times
|
|
||||||
("udp", "open.demonii.com", 1337),
|
|
||||||
("udp", "open.demonii.com", 1337),
|
("udp", "open.demonii.com", 1337),
|
||||||
("udp", "bigfoot1942.sektori.org", 6969),
|
("udp", "bigfoot1942.sektori.org", 6969),
|
||||||
("udp", "tracker.coppersurfer.tk", 80),
|
("udp", "tracker.coppersurfer.tk", 80),
|
||||||
|
|
|
@ -13,6 +13,7 @@ class Worker:
|
||||||
|
|
||||||
# Downloader thread
|
# Downloader thread
|
||||||
def downloader(self):
|
def downloader(self):
|
||||||
|
self.peer.hash_failed = 0 # Reset hash error counter
|
||||||
while self.running:
|
while self.running:
|
||||||
# Try to pickup free file download task
|
# Try to pickup free file download task
|
||||||
task = self.manager.getTask(self.peer)
|
task = self.manager.getTask(self.peer)
|
||||||
|
@ -53,8 +54,8 @@ class Worker:
|
||||||
task["workers_num"] -= 1
|
task["workers_num"] -= 1
|
||||||
self.manager.log.error("%s: Hash failed: %s" % (self.key, task["inner_path"]))
|
self.manager.log.error("%s: Hash failed: %s" % (self.key, task["inner_path"]))
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
self.peer.onWorkerDone()
|
||||||
self.running = False
|
self.running = False
|
||||||
self.peer.disconnect()
|
|
||||||
self.manager.removeWorker(self)
|
self.manager.removeWorker(self)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import os, sys
|
import os, sys
|
||||||
sys.path.append(os.path.dirname(__file__)) # Imports relative to main.py
|
sys.path.insert(0, os.path.dirname(__file__)) # Imports relative to main.py
|
||||||
|
|
||||||
# Create necessary files and dirs
|
# Create necessary files and dirs
|
||||||
if not os.path.isdir("log"): os.mkdir("log")
|
if not os.path.isdir("log"): os.mkdir("log")
|
||||||
|
|
Loading…
Reference in a new issue