more logging on update requests, only add peer to already peer locked tasks, no traceback on peer send error, only allow 1 sec timeout when publishing, request 50 peers from tracker, peer limited startworkers, sitePublish to 20 peers
This commit is contained in:
parent
bcd0c0025a
commit
aae1022c79
6 changed files with 33 additions and 21 deletions
|
@ -72,9 +72,10 @@ Site:13DNDk..bhC2 Successfuly published to 3 peers
|
||||||
```
|
```
|
||||||
- That's it! You successfuly signed and published your modifications.
|
- That's it! You successfuly signed and published your modifications.
|
||||||
|
|
||||||
|
|
||||||
## If you want to help keep this project alive
|
## If you want to help keep this project alive
|
||||||
|
|
||||||
Bitcoin: 1QDhxQ6PraUZa21ET5fYUCPgdrwBomnFgX
|
Bitcoin: 1QDhxQ6PraUZa21ET5fYUCPgdrwBomnFgX
|
||||||
|
|
||||||
#### Thank you!
|
#### Thank you!
|
||||||
|
|
||||||
|
More info, help, changelog, zeronet sites: http://www.reddit.com/r/zeronet/
|
|
@ -43,6 +43,7 @@ class FileRequest:
|
||||||
buff = StringIO(params["body"])
|
buff = StringIO(params["body"])
|
||||||
valid = site.verifyFile(params["inner_path"], buff)
|
valid = site.verifyFile(params["inner_path"], buff)
|
||||||
if valid == True: # Valid and changed
|
if valid == True: # Valid and changed
|
||||||
|
self.log.debug("Update for %s looks valid, saving..." % params["inner_path"])
|
||||||
buff.seek(0)
|
buff.seek(0)
|
||||||
file = open(site.getPath(params["inner_path"]), "wb")
|
file = open(site.getPath(params["inner_path"]), "wb")
|
||||||
shutil.copyfileobj(buff, file) # Write buff to disk
|
shutil.copyfileobj(buff, file) # Write buff to disk
|
||||||
|
@ -60,12 +61,14 @@ class FileRequest:
|
||||||
|
|
||||||
elif valid == None: # Not changed
|
elif valid == None: # Not changed
|
||||||
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer
|
peer = site.addPeer(*params["peer"], return_peer = True) # Add or get peer
|
||||||
|
self.log.debug("New peer for locked files: %s, tasks: %s" % (peer.key, len(site.worker_manager.tasks)) )
|
||||||
for task in site.worker_manager.tasks: # New peer add to every ongoing task
|
for task in site.worker_manager.tasks: # New peer add to every ongoing task
|
||||||
site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from peer
|
if task["peers"]: site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) # Download file from this peer too if its peer locked
|
||||||
|
|
||||||
self.send({"ok": "File file not changed"})
|
self.send({"ok": "File not changed"})
|
||||||
|
|
||||||
else: # Invalid sign or sha1 hash
|
else: # Invalid sign or sha1 hash
|
||||||
|
self.log.debug("Update for %s is invalid" % params["inner_path"])
|
||||||
self.send({"error": "File invalid"})
|
self.send({"error": "File invalid"})
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -52,9 +52,6 @@ class Peer:
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
self.onConnectionError()
|
self.onConnectionError()
|
||||||
self.log.error("%s" % err)
|
self.log.error("%s" % err)
|
||||||
if config.debug:
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc()
|
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.connect()
|
self.connect()
|
||||||
|
|
|
@ -152,12 +152,12 @@ class Site:
|
||||||
|
|
||||||
# Update content.json on peers
|
# Update content.json on peers
|
||||||
def publish(self, limit=3):
|
def publish(self, limit=3):
|
||||||
self.log.info( "Publishing to %s/%s peers..." % (len(self.peers), limit) )
|
self.log.info( "Publishing to %s/%s peers..." % (limit, len(self.peers)) )
|
||||||
published = 0
|
published = 0
|
||||||
for key, peer in self.peers.items(): # Send update command to each peer
|
for key, peer in self.peers.items(): # Send update command to each peer
|
||||||
result = {"exception": "Timeout"}
|
result = {"exception": "Timeout"}
|
||||||
try:
|
try:
|
||||||
with gevent.Timeout(2, False): # 2 sec timeout
|
with gevent.Timeout(1, False): # 1 sec timeout
|
||||||
result = peer.sendCmd("update", {
|
result = peer.sendCmd("update", {
|
||||||
"site": self.address,
|
"site": self.address,
|
||||||
"inner_path": "content.json",
|
"inner_path": "content.json",
|
||||||
|
@ -229,7 +229,7 @@ class Site:
|
||||||
try:
|
try:
|
||||||
tracker.connect()
|
tracker.connect()
|
||||||
tracker.poll_once()
|
tracker.poll_once()
|
||||||
tracker.announce(info_hash=hashlib.sha1(self.address).hexdigest())
|
tracker.announce(info_hash=hashlib.sha1(self.address).hexdigest(), num_want=50)
|
||||||
back = tracker.poll_once()
|
back = tracker.poll_once()
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
self.log.error("Tracker error: %s" % err)
|
self.log.error("Tracker error: %s" % err)
|
||||||
|
|
|
@ -20,7 +20,7 @@ class WorkerManager:
|
||||||
if not self.tasks: continue
|
if not self.tasks: continue
|
||||||
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
|
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
if (task["time_started"] and time.time() >= task["time_started"]+60) or (time.time() >= task["time_added"]+60 and not self.workers): # Task taking too long time, kill it
|
if (task["time_started"] and time.time() >= task["time_started"]+60) or (time.time() >= task["time_added"]+60 and not self.workers): # Task taking too long time, or no peer after 60sec kill it
|
||||||
self.log.debug("Timeout, Cleaning up task: %s" % task)
|
self.log.debug("Timeout, Cleaning up task: %s" % task)
|
||||||
# Clean up workers
|
# Clean up workers
|
||||||
workers = self.findWorkers(task)
|
workers = self.findWorkers(task)
|
||||||
|
@ -62,17 +62,27 @@ class WorkerManager:
|
||||||
self.startWorkers()
|
self.startWorkers()
|
||||||
|
|
||||||
|
|
||||||
|
# Add new worker
|
||||||
|
def addWorker(self, peer):
|
||||||
|
key = peer.key
|
||||||
|
if key not in self.workers and len(self.workers) < MAX_WORKERS: # We dont have worker for that peer and workers num less than max
|
||||||
|
worker = Worker(self, peer)
|
||||||
|
self.workers[key] = worker
|
||||||
|
worker.key = key
|
||||||
|
worker.start()
|
||||||
|
return worker
|
||||||
|
else: # We have woker for this peer or its over the limit
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
# Start workers to process tasks
|
# Start workers to process tasks
|
||||||
def startWorkers(self):
|
def startWorkers(self, peers=None):
|
||||||
if len(self.workers) >= MAX_WORKERS: return False # Workers number already maxed
|
if len(self.workers) >= MAX_WORKERS: return False # Workers number already maxed
|
||||||
if not self.tasks: return False # No task for workers
|
if not self.tasks: return False # No task for workers
|
||||||
for key, peer in self.site.peers.iteritems(): # One worker for every peer
|
for key, peer in self.site.peers.iteritems(): # One worker for every peer
|
||||||
if key not in self.workers and len(self.workers) < MAX_WORKERS: # We dont have worker for that peer and workers num less than max
|
if peers and peer not in peers: continue # If peers definied and peer not valid
|
||||||
worker = Worker(self, peer)
|
worker = self.addWorker(peer)
|
||||||
self.workers[key] = worker
|
if worker: self.log.debug("Added worker: %s, workers: %s/%s" % (key, len(self.workers), MAX_WORKERS))
|
||||||
worker.key = key
|
|
||||||
worker.start()
|
|
||||||
self.log.debug("Added worker: %s, workers: %s/%s" % (key, len(self.workers), MAX_WORKERS))
|
|
||||||
|
|
||||||
|
|
||||||
# Find workers by task
|
# Find workers by task
|
||||||
|
@ -97,7 +107,8 @@ class WorkerManager:
|
||||||
if task: # Already has task for that file
|
if task: # Already has task for that file
|
||||||
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
if peer and task["peers"]: # This peer also has new version, add it to task possible peers
|
||||||
task["peers"].append(peer)
|
task["peers"].append(peer)
|
||||||
self.startWorkers()
|
self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
|
||||||
|
self.startWorkers([peer])
|
||||||
if priority:
|
if priority:
|
||||||
task["priority"] += priority # Boost on priority
|
task["priority"] += priority # Boost on priority
|
||||||
return task["evt"]
|
return task["evt"]
|
||||||
|
@ -109,8 +120,8 @@ class WorkerManager:
|
||||||
peers = None
|
peers = None
|
||||||
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority}
|
task = {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "time_added": time.time(), "time_started": None, "peers": peers, "priority": priority}
|
||||||
self.tasks.append(task)
|
self.tasks.append(task)
|
||||||
self.log.debug("New task: %s" % task)
|
self.log.debug("New task: %s, peer lock: %s" % (task, peers))
|
||||||
self.startWorkers()
|
self.startWorkers(peers)
|
||||||
return evt
|
return evt
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -148,7 +148,7 @@ def sitePublish(address):
|
||||||
site = file_server.sites[address]
|
site = file_server.sites[address]
|
||||||
site.settings["serving"] = True # Serving the site even if its disabled
|
site.settings["serving"] = True # Serving the site even if its disabled
|
||||||
site.announce() # Gather peers
|
site.announce() # Gather peers
|
||||||
site.publish(10) # Push to 10 peers
|
site.publish(20) # Push to 20 peers
|
||||||
logging.info("Serving files....")
|
logging.info("Serving files....")
|
||||||
gevent.joinall([file_server_thread])
|
gevent.joinall([file_server_thread])
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue