Rev477, FindHashId command for optional files, Renaming some variables to make it more clear, Optional files downloading in WorkerManager, Test FindHash, Test Optional download, Test FindOptional
This commit is contained in:
parent
e8ce26587d
commit
a8dae8dd85
9 changed files with 348 additions and 93 deletions
|
@ -8,7 +8,7 @@ class Config(object):
|
|||
|
||||
def __init__(self, argv):
|
||||
self.version = "0.3.2"
|
||||
self.rev = 473
|
||||
self.rev = 477
|
||||
self.argv = argv
|
||||
self.action = None
|
||||
self.createParser()
|
||||
|
|
|
@ -67,6 +67,8 @@ class FileRequest(object):
|
|||
self.actionListModified(params)
|
||||
elif cmd == "getHashfield":
|
||||
self.actionGetHashfield(params)
|
||||
elif cmd == "findHashIds":
|
||||
self.actionFindHashIds(params)
|
||||
elif cmd == "ping":
|
||||
self.actionPing()
|
||||
else:
|
||||
|
@ -272,6 +274,17 @@ class FileRequest(object):
|
|||
|
||||
self.response({"hashfield_raw": site.content_manager.hashfield.tostring()})
|
||||
|
||||
def actionFindHashIds(self, params):
|
||||
site = self.sites.get(params["site"])
|
||||
if not site or not site.settings["serving"]: # Site unknown or not serving
|
||||
self.response({"error": "Unknown site"})
|
||||
return False
|
||||
|
||||
found = site.worker_manager.findOptionalHashIds(params["hash_ids"])
|
||||
back = {}
|
||||
for hash_id, peers in found.iteritems():
|
||||
back[hash_id] = [helper.packAddress(peer.ip, peer.port) for peer in peers]
|
||||
self.response({"peers": back})
|
||||
|
||||
# Send a simple Pong! answer
|
||||
def actionPing(self):
|
||||
|
|
125
src/Peer/Peer.py
125
src/Peer/Peer.py
|
@ -1,6 +1,5 @@
|
|||
import logging
|
||||
import time
|
||||
import array
|
||||
|
||||
import gevent
|
||||
|
||||
|
@ -17,8 +16,8 @@ if config.use_tempfiles:
|
|||
# Communicate remote peers
|
||||
class Peer(object):
|
||||
__slots__ = (
|
||||
"ip", "port", "site", "key", "connection", "last_found", "last_response", "last_ping", "last_hashfield",
|
||||
"hashfield", "added", "connection_error", "hash_failed", "download_bytes", "download_time"
|
||||
"ip", "port", "site", "key", "connection", "time_found", "time_response", "time_hashfield", "time_added",
|
||||
"last_ping", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time"
|
||||
)
|
||||
|
||||
def __init__(self, ip, port, site=None):
|
||||
|
@ -29,11 +28,11 @@ class Peer(object):
|
|||
|
||||
self.connection = None
|
||||
self.hashfield = PeerHashfield() # Got optional files hash_id
|
||||
self.last_hashfield = 0 # Last time hashfiled downloaded
|
||||
self.last_found = time.time() # Time of last found in the torrent tracker
|
||||
self.last_response = None # Time of last successful response from peer
|
||||
self.time_hashfield = None # Last time hashfiled downloaded
|
||||
self.time_found = time.time() # Time of last found in the torrent tracker
|
||||
self.time_response = None # Time of last successful response from peer
|
||||
self.time_added = time.time()
|
||||
self.last_ping = None # Last response time for ping
|
||||
self.added = time.time()
|
||||
|
||||
self.connection_error = 0 # Series of connection error
|
||||
self.hash_failed = 0 # Number of bad files from peer
|
||||
|
@ -86,7 +85,7 @@ class Peer(object):
|
|||
|
||||
# Found a peer on tracker
|
||||
def found(self):
|
||||
self.last_found = time.time()
|
||||
self.time_found = time.time()
|
||||
|
||||
# Send a command to peer
|
||||
def request(self, cmd, params={}, stream_to=None):
|
||||
|
@ -98,16 +97,16 @@ class Peer(object):
|
|||
|
||||
for retry in range(1, 3): # Retry 3 times
|
||||
try:
|
||||
response = self.connection.request(cmd, params, stream_to)
|
||||
if not response:
|
||||
res = self.connection.request(cmd, params, stream_to)
|
||||
if not res:
|
||||
raise Exception("Send error")
|
||||
if "error" in response:
|
||||
self.log("%s error: %s" % (cmd, response["error"]))
|
||||
if "error" in res:
|
||||
self.log("%s error: %s" % (cmd, res["error"]))
|
||||
self.onConnectionError()
|
||||
else: # Successful request, reset connection error num
|
||||
self.connection_error = 0
|
||||
self.last_response = time.time()
|
||||
return response
|
||||
self.time_response = time.time()
|
||||
return res
|
||||
except Exception, err:
|
||||
if type(err).__name__ == "Notify": # Greenlet killed by worker
|
||||
self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
|
||||
|
@ -136,21 +135,21 @@ class Peer(object):
|
|||
|
||||
s = time.time()
|
||||
while True: # Read in 512k parts
|
||||
back = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})
|
||||
res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location})
|
||||
|
||||
if not back or "body" not in back: # Error
|
||||
if not res or "body" not in res: # Error
|
||||
return False
|
||||
|
||||
buff.write(back["body"])
|
||||
back["body"] = None # Save memory
|
||||
if back["location"] == back["size"]: # End of file
|
||||
buff.write(res["body"])
|
||||
res["body"] = None # Save memory
|
||||
if res["location"] == res["size"]: # End of file
|
||||
break
|
||||
else:
|
||||
location = back["location"]
|
||||
location = res["location"]
|
||||
|
||||
self.download_bytes += back["location"]
|
||||
self.download_bytes += res["location"]
|
||||
self.download_time += (time.time() - s)
|
||||
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"]
|
||||
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
|
||||
buff.seek(0)
|
||||
return buff
|
||||
|
||||
|
@ -164,20 +163,20 @@ class Peer(object):
|
|||
|
||||
s = time.time()
|
||||
while True: # Read in 512k parts
|
||||
back = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)
|
||||
res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location}, stream_to=buff)
|
||||
|
||||
if not back: # Error
|
||||
self.log("Invalid response: %s" % back)
|
||||
if not res: # Error
|
||||
self.log("Invalid response: %s" % res)
|
||||
return False
|
||||
|
||||
if back["location"] == back["size"]: # End of file
|
||||
if res["location"] == res["size"]: # End of file
|
||||
break
|
||||
else:
|
||||
location = back["location"]
|
||||
location = res["location"]
|
||||
|
||||
self.download_bytes += back["location"]
|
||||
self.download_bytes += res["location"]
|
||||
self.download_time += (time.time() - s)
|
||||
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + back["location"]
|
||||
self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + res["location"]
|
||||
buff.seek(0)
|
||||
return buff
|
||||
|
||||
|
@ -187,9 +186,9 @@ class Peer(object):
|
|||
for retry in range(1, 3): # Retry 3 times
|
||||
s = time.time()
|
||||
with gevent.Timeout(10.0, False): # 10 sec timeout, don't raise exception
|
||||
response = self.request("ping")
|
||||
res = self.request("ping")
|
||||
|
||||
if response and "body" in response and response["body"] == "Pong!":
|
||||
if res and "body" in res and res["body"] == "Pong!":
|
||||
response_time = time.time() - s
|
||||
break # All fine, exit from for loop
|
||||
# Timeout reached or bad response
|
||||
|
@ -210,11 +209,11 @@ class Peer(object):
|
|||
site = self.site # If no site defined request peers for this site
|
||||
# give him/her 5 connectible peers
|
||||
packed_peers = [peer.packMyAddress() for peer in self.site.getConnectablePeers(5)]
|
||||
response = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num})
|
||||
if not response or "error" in response:
|
||||
res = self.request("pex", {"site": site.address, "peers": packed_peers, "need": need_num})
|
||||
if not res or "error" in res:
|
||||
return False
|
||||
added = 0
|
||||
for peer in response.get("peers", []):
|
||||
for peer in res.get("peers", []):
|
||||
address = helper.unpackAddress(peer)
|
||||
if site.addPeer(*address):
|
||||
added += 1
|
||||
|
@ -222,21 +221,31 @@ class Peer(object):
|
|||
self.log("Added peers using pex: %s" % added)
|
||||
return added
|
||||
|
||||
# Request optional files hashfield from peer
|
||||
def getHashfield(self):
|
||||
self.last_hashfield = time.time()
|
||||
res = self.request("getHashfield", {"site": self.site.address})
|
||||
if res and "error" not in res:
|
||||
self.hashfield.replaceFromString(res["hashfield_raw"])
|
||||
return self.hashfield
|
||||
else:
|
||||
return False
|
||||
|
||||
# List modified files since the date
|
||||
# Return: {inner_path: modification date,...}
|
||||
def listModified(self, since):
|
||||
return self.request("listModified", {"since": since, "site": self.site.address})
|
||||
|
||||
def updateHashfield(self, force=False):
|
||||
# Don't update hashfield again in 15 min
|
||||
if self.time_hashfield and time.time() - self.time_hashfield > 60 * 15 and not force:
|
||||
return False
|
||||
|
||||
self.time_hashfield = time.time()
|
||||
res = self.request("getHashfield", {"site": self.site.address})
|
||||
if not res or "error" in res:
|
||||
return False
|
||||
self.hashfield.replaceFromString(res["hashfield_raw"])
|
||||
|
||||
return self.hashfield
|
||||
|
||||
# Return: {hash1: ["ip:port", "ip:port",...],...}
|
||||
def findHashIds(self, hash_ids):
|
||||
res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids})
|
||||
if not res or "error" in res:
|
||||
return False
|
||||
return {key: map(helper.unpackAddress, val) for key, val in res["peers"].iteritems()}
|
||||
|
||||
# Stop and remove from site
|
||||
def remove(self):
|
||||
self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
|
||||
|
@ -245,34 +254,6 @@ class Peer(object):
|
|||
if self.connection:
|
||||
self.connection.close()
|
||||
|
||||
# - HASHFIELD -
|
||||
|
||||
def updateHashfield(self, force=False):
|
||||
# Don't update hashfield again in 15 min
|
||||
if self.last_hashfield and time.time() - self.last_hashfield > 60 * 15 and not force:
|
||||
return False
|
||||
|
||||
response = self.request("getHashfield", {"site": self.site.address})
|
||||
if not response or "error" in response:
|
||||
return False
|
||||
self.last_hashfield = time.time()
|
||||
self.hashfield = response["hashfield"]
|
||||
|
||||
return self.hashfield
|
||||
|
||||
def setHashfield(self, hashfield_dump):
|
||||
self.hashfield.fromstring(hashfield_dump)
|
||||
|
||||
def hasHash(self, hash_id):
|
||||
return hash_id in self.hashfield
|
||||
|
||||
# Return: ["ip:port", "ip:port",...]
|
||||
def findHash(self, hash_id):
|
||||
response = self.request("findHash", {"site": self.site.address, "hash_id": hash_id})
|
||||
if not response or "error" in response:
|
||||
return False
|
||||
return [helper.unpackAddress(peer) for peer in response["peers"]]
|
||||
|
||||
# - EVENTS -
|
||||
|
||||
# On connection error
|
||||
|
|
|
@ -23,6 +23,13 @@ class PeerHashfield():
|
|||
else:
|
||||
return False
|
||||
|
||||
def appendHashId(self, hash_id):
|
||||
if hash_id not in self:
|
||||
self.append(hash_id)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def removeHash(self, hash):
|
||||
hash_id = int(hash[0:4], 16)
|
||||
if hash_id in self:
|
||||
|
@ -39,4 +46,4 @@ class PeerHashfield():
|
|||
|
||||
def replaceFromString(self, hashfield_raw):
|
||||
self.storage = self.createStoreage()
|
||||
self.fromstring(hashfield_raw)
|
||||
self.fromstring(hashfield_raw)
|
||||
|
|
|
@ -39,7 +39,7 @@ class Site:
|
|||
self.content = None # Load content.json
|
||||
self.peers = {} # Key: ip:port, Value: Peer.Peer
|
||||
self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
|
||||
self.last_announce = 0 # Last announce time to tracker
|
||||
self.time_announce = 0 # Last announce time to tracker
|
||||
self.last_tracker_id = random.randint(0, 10) # Last announced tracker id
|
||||
self.worker_manager = WorkerManager(self) # Handle site download from other peers
|
||||
self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
|
||||
|
@ -589,9 +589,9 @@ class Site:
|
|||
|
||||
# Add myself and get other peers from tracker
|
||||
def announce(self, force=False, num=5, pex=True):
|
||||
if time.time() < self.last_announce + 30 and not force:
|
||||
if time.time() < self.time_announce + 30 and not force:
|
||||
return # No reannouncing within 30 secs
|
||||
self.last_announce = time.time()
|
||||
self.time_announce = time.time()
|
||||
|
||||
trackers = config.trackers
|
||||
if num == 1: # Only announce on one tracker, increment the queried tracker id
|
||||
|
@ -712,7 +712,7 @@ class Site:
|
|||
continue
|
||||
if peer.connection and not peer.connection.connected:
|
||||
peer.connection = None # Dead connection
|
||||
if time.time() - peer.last_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour
|
||||
if time.time() - peer.time_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour
|
||||
peer.remove()
|
||||
removed += 1
|
||||
if removed > 5: # Don't remove too much at once
|
||||
|
|
|
@ -8,7 +8,7 @@ from cStringIO import StringIO
|
|||
|
||||
@pytest.mark.usefixtures("resetSettings")
|
||||
@pytest.mark.usefixtures("resetTempSettings")
|
||||
class TestFileRequest:
|
||||
class TestPeer:
|
||||
def testPing(self, file_server, site, site_temp):
|
||||
file_server.ip_incoming = {} # Reset flood protection
|
||||
file_server.sites[site.address] = site
|
||||
|
@ -53,9 +53,13 @@ class TestFileRequest:
|
|||
|
||||
def testHashfield(self, site):
|
||||
sample_hash = site.content_manager.contents["content.json"]["files_optional"].values()[0]["sha512"]
|
||||
|
||||
assert not site.content_manager.hashfield
|
||||
|
||||
site.storage.verifyFiles(quick_check=True) # Find what optional files we have
|
||||
|
||||
# Check if hashfield has any files
|
||||
assert site.content_manager.hashfield
|
||||
assert len(site.content_manager.hashfield) > 0
|
||||
|
||||
# Check exsist hash
|
||||
|
@ -89,8 +93,35 @@ class TestFileRequest:
|
|||
|
||||
# Testing hashfield sync
|
||||
assert len(peer_file_server.hashfield) == 0
|
||||
assert peer_file_server.getHashfield()
|
||||
assert peer_file_server.updateHashfield()
|
||||
assert len(peer_file_server.hashfield) > 0
|
||||
|
||||
connection.close()
|
||||
client.stop()
|
||||
|
||||
def testFindHash(self, file_server, site, site_temp):
|
||||
file_server.ip_incoming = {} # Reset flood protection
|
||||
file_server.sites[site.address] = site
|
||||
client = FileServer("127.0.0.1", 1545)
|
||||
client.sites[site_temp.address] = site_temp
|
||||
site_temp.connection_server = client
|
||||
|
||||
# Add file_server as peer to client
|
||||
peer_file_server = site_temp.addPeer("127.0.0.1", 1544)
|
||||
|
||||
assert peer_file_server.findHashIds([1234]) == {}
|
||||
|
||||
# Add fake peer with requred hash
|
||||
fake_peer_1 = site.addPeer("1.2.3.4", 1544)
|
||||
fake_peer_1.hashfield.append(1234)
|
||||
fake_peer_2 = site.addPeer("1.2.3.5", 1545)
|
||||
fake_peer_2.hashfield.append(1234)
|
||||
fake_peer_2.hashfield.append(1235)
|
||||
fake_peer_3 = site.addPeer("1.2.3.6", 1546)
|
||||
fake_peer_3.hashfield.append(1235)
|
||||
fake_peer_3.hashfield.append(1236)
|
||||
|
||||
assert peer_file_server.findHashIds([1234, 1235]) == {
|
||||
1234: [('1.2.3.4', 1544), ('1.2.3.5', 1545)],
|
||||
1235: [('1.2.3.5', 1545), ('1.2.3.6', 1546)]
|
||||
}
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
import time
|
||||
|
||||
import pytest
|
||||
import mock
|
||||
import gevent
|
||||
|
||||
from Connection import ConnectionServer
|
||||
from Config import config
|
||||
from File import FileRequest
|
||||
from File import FileServer
|
||||
from Site import Site
|
||||
import Spy
|
||||
|
||||
|
||||
|
@ -11,7 +16,8 @@ import Spy
|
|||
@pytest.mark.usefixtures("resetSettings")
|
||||
class TestSiteDownload:
|
||||
def testDownload(self, file_server, site, site_temp):
|
||||
client = ConnectionServer("127.0.0.1", 1545)
|
||||
file_server.ip_incoming = {} # Reset flood protection
|
||||
|
||||
assert site.storage.directory == config.data_dir + "/" + site.address
|
||||
assert site_temp.storage.directory == config.data_dir + "-temp/" + site.address
|
||||
|
||||
|
@ -20,6 +26,7 @@ class TestSiteDownload:
|
|||
file_server.sites[site.address] = site
|
||||
|
||||
# Init client server
|
||||
client = ConnectionServer("127.0.0.1", 1545)
|
||||
site_temp.connection_server = client
|
||||
site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net
|
||||
|
||||
|
@ -48,7 +55,33 @@ class TestSiteDownload:
|
|||
assert len(site_temp.content_manager.contents) == len(site.content_manager.contents) - 1
|
||||
assert not bad_files
|
||||
|
||||
# Optional file
|
||||
[connection.close() for connection in file_server.connections]
|
||||
|
||||
|
||||
# Test when connected peer has the optional file
|
||||
def testOptionalDownload(self, file_server, site, site_temp):
|
||||
file_server.ip_incoming = {} # Reset flood protection
|
||||
|
||||
# Init source server
|
||||
site.connection_server = file_server
|
||||
file_server.sites[site.address] = site
|
||||
|
||||
# Init client server
|
||||
client = ConnectionServer("127.0.0.1", 1545)
|
||||
site_temp.connection_server = client
|
||||
site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net
|
||||
|
||||
site_temp.addPeer("127.0.0.1", 1544)
|
||||
|
||||
# Download site
|
||||
site_temp.download(blind_includes=True).join(timeout=5)
|
||||
|
||||
# Download optional data/optional.txt
|
||||
site.storage.verifyFiles(quick_check=True) # Find what optional files we have
|
||||
optional_file_info = site_temp.content_manager.getFileInfo("data/optional.txt")
|
||||
assert site.content_manager.hashfield.hasHash(optional_file_info["sha512"])
|
||||
assert not site_temp.content_manager.hashfield.hasHash(optional_file_info["sha512"])
|
||||
|
||||
assert not site_temp.storage.isFile("data/optional.txt")
|
||||
assert site.storage.isFile("data/optional.txt")
|
||||
site_temp.needFile("data/optional.txt")
|
||||
|
@ -59,6 +92,7 @@ class TestSiteDownload:
|
|||
optional_file_info = site_temp.content_manager.getFileInfo(
|
||||
"data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/peanut-butter-jelly-time.gif"
|
||||
)
|
||||
assert site.content_manager.hashfield.hasHash(optional_file_info["sha512"])
|
||||
assert not site_temp.content_manager.hashfield.hasHash(optional_file_info["sha512"])
|
||||
|
||||
site_temp.needFile("data/users/1CjfbrbwtP8Y2QjPy12vpTATkUT7oSiPQ9/peanut-butter-jelly-time.gif")
|
||||
|
@ -66,3 +100,46 @@ class TestSiteDownload:
|
|||
assert site_temp.content_manager.hashfield.hasHash(optional_file_info["sha512"])
|
||||
|
||||
assert site_temp.storage.deleteFiles()
|
||||
[connection.close() for connection in file_server.connections]
|
||||
|
||||
# Test when connected peer does not has the file, so ask him if he know someone who has it
|
||||
def testFindOptional(self, file_server, site, site_temp):
|
||||
file_server.ip_incoming = {} # Reset flood protection
|
||||
|
||||
# Init source server
|
||||
site.connection_server = file_server
|
||||
file_server.sites[site.address] = site
|
||||
|
||||
# Init full source server (has optional files)
|
||||
site_full = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
|
||||
file_server_full = FileServer("127.0.0.1", 1546)
|
||||
site_full.connection_server = file_server_full
|
||||
gevent.spawn(lambda: ConnectionServer.start(file_server_full))
|
||||
time.sleep(0) # Port opening
|
||||
file_server_full.sites[site_full.address] = site_full # Add site
|
||||
site_full.storage.verifyFiles(quick_check=True) # Check optional files
|
||||
site_full_peer = site.addPeer("127.0.0.1", 1546) # Add it to source server
|
||||
assert site_full_peer.updateHashfield() # Update hashfield
|
||||
|
||||
# Init client server
|
||||
site_temp.connection_server = ConnectionServer("127.0.0.1", 1545)
|
||||
site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net
|
||||
site_temp.addPeer("127.0.0.1", 1544) # Add source server
|
||||
|
||||
# Download normal files
|
||||
site_temp.download(blind_includes=True).join(timeout=5)
|
||||
|
||||
# Download optional data/optional.txt
|
||||
optional_file_info = site_temp.content_manager.getFileInfo("data/optional.txt")
|
||||
assert not site_temp.storage.isFile("data/optional.txt")
|
||||
assert not site.content_manager.hashfield.hasHash(optional_file_info["sha512"]) # Source server don't know he has the file
|
||||
assert site_full_peer.hashfield.hasHash(optional_file_info["sha512"]) # Source full peer on source server has the file
|
||||
assert site_full.content_manager.hashfield.hasHash(optional_file_info["sha512"]) # Source full server he has the file
|
||||
|
||||
with Spy.Spy(FileRequest, "route") as requests:
|
||||
site_temp.needFile("data/optional.txt")
|
||||
print requests
|
||||
|
||||
assert site_temp.storage.deleteFiles()
|
||||
file_server_full.stop()
|
||||
[connection.close() for connection in file_server.connections]
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
import time
|
||||
import logging
|
||||
import random
|
||||
import collections
|
||||
|
||||
import gevent
|
||||
|
||||
from Worker import Worker
|
||||
from util import helper
|
||||
import util
|
||||
|
||||
MAX_WORKERS = 10 # Max concurent workers
|
||||
|
||||
|
@ -15,7 +18,7 @@ class WorkerManager:
|
|||
self.site = site
|
||||
self.workers = {} # Key: ip:port, Value: Worker.Worker
|
||||
self.tasks = []
|
||||
# {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
|
||||
# {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
|
||||
# "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
|
||||
self.started_task_num = 0 # Last added task num
|
||||
self.running = True
|
||||
|
@ -60,12 +63,19 @@ class WorkerManager:
|
|||
|
||||
elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
|
||||
# Task started more than 15 sec ago or no workers
|
||||
self.log.debug("Task taking more than 15 secs, find more peers: %s" % task["inner_path"])
|
||||
workers = self.findWorkers(task)
|
||||
self.log.debug(
|
||||
"Task taking more than 15 secs, workers: %s find more peers: %s" %
|
||||
(len(workers), task["inner_path"])
|
||||
)
|
||||
task["site"].announce(num=1) # Find more peers
|
||||
if task["peers"]: # Release the peer lock
|
||||
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
||||
task["peers"] = []
|
||||
self.startWorkers()
|
||||
if task["optional_hash_id"]:
|
||||
self.startFindOptional()
|
||||
else:
|
||||
if task["peers"]: # Release the peer lock
|
||||
self.log.debug("Task peer lock release: %s" % task["inner_path"])
|
||||
task["peers"] = []
|
||||
self.startWorkers()
|
||||
break # One reannounce per loop
|
||||
|
||||
self.log.debug("checkTasks stopped running")
|
||||
|
@ -90,7 +100,7 @@ class WorkerManager:
|
|||
priority += 1 # boost included content.json files priority a bit
|
||||
elif inner_path.endswith(".json"):
|
||||
priority += 2 # boost data json files priority more
|
||||
return priority - task["workers_num"]*5 # Prefer more priority and less workers
|
||||
return priority - task["workers_num"] * 5 # Prefer more priority and less workers
|
||||
|
||||
# Returns the next free or less worked task
|
||||
def getTask(self, peer):
|
||||
|
@ -100,6 +110,8 @@ class WorkerManager:
|
|||
continue # This peer not allowed to pick this task
|
||||
if peer in task["failed"]:
|
||||
continue # Peer already tried to solve this, but failed
|
||||
if task["optional_hash_id"] and task["peers"] is None:
|
||||
continue # No peers found yet for the optional task
|
||||
return task
|
||||
|
||||
# New peers added to site
|
||||
|
@ -135,6 +147,114 @@ class WorkerManager:
|
|||
if worker:
|
||||
self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), MAX_WORKERS))
|
||||
|
||||
# Find peers for optional hash in local hash tables and add to task peers
|
||||
def findOptionalTasks(self, optional_tasks):
|
||||
found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...}
|
||||
|
||||
for peer in self.site.peers.values():
|
||||
if not peer.hashfield:
|
||||
continue
|
||||
|
||||
for task in optional_tasks:
|
||||
optional_hash_id = task["optional_hash_id"]
|
||||
if optional_hash_id in peer.hashfield:
|
||||
found[optional_hash_id].append(peer)
|
||||
if task["peers"] and peer not in task["peers"]:
|
||||
task["peers"].append(peer)
|
||||
else:
|
||||
task["peers"] = [peer]
|
||||
|
||||
return found
|
||||
|
||||
# Find peers for optional hash ids in local hash tables
|
||||
def findOptionalHashIds(self, optional_hash_ids):
|
||||
found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...}
|
||||
|
||||
for peer in self.site.peers.values():
|
||||
if not peer.hashfield:
|
||||
continue
|
||||
for optional_hash_id in optional_hash_ids:
|
||||
if optional_hash_id in peer.hashfield:
|
||||
found[optional_hash_id].append(peer)
|
||||
|
||||
return found
|
||||
|
||||
# Add peers to tasks from found result
|
||||
def addOptionalPeers(self, found_ips):
|
||||
found = collections.defaultdict(list)
|
||||
for hash_id, peer_ips in found_ips.iteritems():
|
||||
task = [task for task in self.tasks if task["optional_hash_id"] == hash_id]
|
||||
if task: # Found task, lets take the first
|
||||
task = task[0]
|
||||
for peer_ip in peer_ips:
|
||||
peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True)
|
||||
if not peer:
|
||||
continue
|
||||
if task["peers"] is None:
|
||||
task["peers"] = []
|
||||
if peer not in task["peers"]:
|
||||
task["peers"].append(peer)
|
||||
peer.hashfield.appendHashId(hash_id) # Peer has this file
|
||||
found[hash_id].append(peer)
|
||||
|
||||
return found
|
||||
|
||||
# Start find peers for optional files
|
||||
@util.Noparallel(blocking=False)
|
||||
def startFindOptional(self):
|
||||
time.sleep(0.01) # Wait for more file requests
|
||||
optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
|
||||
optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
|
||||
self.log.debug("Finding peers for optional files: %s" % optional_hash_ids)
|
||||
found = self.findOptionalTasks(optional_tasks)
|
||||
|
||||
if found:
|
||||
found_peers = set([peer for peers in found.values() for peer in peers])
|
||||
self.startWorkers(found_peers)
|
||||
|
||||
if len(found) < len(optional_hash_ids):
|
||||
self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||
|
||||
# Query hashfield from connected peers
|
||||
threads = []
|
||||
peers = self.site.getConnectedPeers()
|
||||
if not peers:
|
||||
peers = self.site.getConnectablePeers()
|
||||
for peer in peers:
|
||||
if not peer.time_hashfield:
|
||||
threads.append(gevent.spawn(peer.updateHashfield))
|
||||
gevent.joinall(threads, timeout=5)
|
||||
|
||||
found = self.findOptionalTasks(optional_tasks)
|
||||
|
||||
if found:
|
||||
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
||||
self.startWorkers(found_peers)
|
||||
|
||||
if len(found) < len(optional_hash_ids):
|
||||
self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||
|
||||
# Try to query connected peers
|
||||
threads = []
|
||||
peers = self.site.getConnectedPeers()
|
||||
if not peers:
|
||||
peers = self.site.getConnectablePeers()
|
||||
|
||||
for peer in peers:
|
||||
threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
|
||||
|
||||
gevent.joinall(threads, timeout=5)
|
||||
|
||||
found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value])
|
||||
found = self.addOptionalPeers(found_ips)
|
||||
|
||||
if found:
|
||||
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
|
||||
self.startWorkers(found_peers)
|
||||
|
||||
if len(found) < len(optional_hash_ids):
|
||||
self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
|
||||
|
||||
# Stop all worker
|
||||
def stopWorkers(self):
|
||||
for worker in self.workers.values():
|
||||
|
@ -181,16 +301,27 @@ class WorkerManager:
|
|||
peers = [peer] # Only download from this peer
|
||||
else:
|
||||
peers = None
|
||||
file_info = self.site.content_manager.getFileInfo(inner_path)
|
||||
if file_info and file_info["optional"]:
|
||||
optional_hash_id = helper.toHashId(file_info["sha512"])
|
||||
else:
|
||||
optional_hash_id = None
|
||||
task = {
|
||||
"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
|
||||
"time_added": time.time(), "time_started": None, "peers": peers, "priority": priority, "failed": []
|
||||
"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": optional_hash_id,
|
||||
"time_added": time.time(), "time_started": None, "time_action": None, "peers": peers, "priority": priority, "failed": []
|
||||
}
|
||||
|
||||
self.tasks.append(task)
|
||||
|
||||
self.started_task_num += 1
|
||||
self.log.debug(
|
||||
"New task: %s, peer lock: %s, priority: %s, tasks: %s" %
|
||||
(task["inner_path"], peers, priority, self.started_task_num)
|
||||
"New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks: %s" %
|
||||
(task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
|
||||
)
|
||||
|
||||
if optional_hash_id:
|
||||
self.startFindOptional()
|
||||
|
||||
self.startWorkers(peers)
|
||||
return evt
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ import os
|
|||
import socket
|
||||
import struct
|
||||
import re
|
||||
import collections
|
||||
|
||||
|
||||
def atomicWrite(dest, content, mode="w"):
|
||||
|
@ -44,3 +45,17 @@ def getDirname(path):
|
|||
# Return: data/site/content.json -> content.json
|
||||
def getFilename(path):
|
||||
return re.sub("^.*/", "", path)
|
||||
|
||||
|
||||
# Convert hash to hashid for hashfield
|
||||
def toHashId(hash):
|
||||
return int(hash[0:4], 16)
|
||||
|
||||
|
||||
# Merge dict values
|
||||
def mergeDicts(dicts):
|
||||
back = collections.defaultdict(set)
|
||||
for d in dicts:
|
||||
for key, val in d.iteritems():
|
||||
back[key].update(val)
|
||||
return dict(back)
|
||||
|
|
Loading…
Reference in a new issue