692 lines
28 KiB
Python
692 lines
28 KiB
Python
import time
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
import collections
|
|
import gevent
|
|
import math
|
|
|
|
import msgpack
|
|
|
|
from Plugin import PluginManager
|
|
from Debug import Debug
|
|
from Crypt import CryptHash
|
|
from lib import merkletools
|
|
from util import helper
|
|
import util
|
|
from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
|
|
|
|
|
|
# We can only import plugin host clases after the plugins are loaded
|
|
@PluginManager.afterLoad
|
|
def importPluginnedClasses():
|
|
global VerifyError, config
|
|
from Content.ContentManager import VerifyError
|
|
from Config import config
|
|
|
|
if "upload_nonces" not in locals():
|
|
upload_nonces = {}
|
|
|
|
|
|
@PluginManager.registerTo("UiRequest")
|
|
class UiRequestPlugin(object):
|
|
def isCorsAllowed(self, path):
|
|
if path == "/ZeroNet-Internal/BigfileUpload":
|
|
return True
|
|
else:
|
|
return super(UiRequestPlugin, self).isCorsAllowed(path)
|
|
|
|
def actionBigfileUpload(self):
|
|
nonce = self.get.get("upload_nonce")
|
|
if nonce not in upload_nonces:
|
|
return self.error403("Upload nonce error.")
|
|
|
|
upload_info = upload_nonces[nonce]
|
|
del upload_nonces[nonce]
|
|
|
|
self.sendHeader(200, "text/html", noscript=True, extra_headers=[
|
|
("Access-Control-Allow-Origin", "null"),
|
|
("Access-Control-Allow-Credentials", "true")
|
|
])
|
|
|
|
self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
|
|
|
|
site = upload_info["site"]
|
|
inner_path = upload_info["inner_path"]
|
|
|
|
with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
|
|
merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
|
|
self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
|
|
)
|
|
|
|
if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
|
|
hash = piecemap_info["sha512_pieces"][0].encode("hex")
|
|
site.content_manager.optionalDownloaded(inner_path, hash, upload_info["size"], own=True)
|
|
|
|
else: # Big file
|
|
file_name = helper.getFilename(inner_path)
|
|
msgpack.pack({file_name: piecemap_info}, site.storage.open(upload_info["piecemap"], "wb"))
|
|
|
|
# Find piecemap and file relative path to content.json
|
|
file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
|
|
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
|
|
piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
|
|
file_relative_path = inner_path[len(content_inner_path_dir):]
|
|
|
|
# Add file to content.json
|
|
if site.storage.isFile(file_info["content_inner_path"]):
|
|
content = site.storage.loadJson(file_info["content_inner_path"])
|
|
else:
|
|
content = {}
|
|
if "files_optional" not in content:
|
|
content["files_optional"] = {}
|
|
|
|
content["files_optional"][file_relative_path] = {
|
|
"sha512": merkle_root,
|
|
"size": upload_info["size"],
|
|
"piecemap": piecemap_relative_path,
|
|
"piece_size": piece_size
|
|
}
|
|
|
|
site.content_manager.optionalDownloaded(inner_path, merkle_root, upload_info["size"], own=True)
|
|
site.storage.writeJson(file_info["content_inner_path"], content)
|
|
|
|
site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
|
|
|
|
return {
|
|
"merkle_root": merkle_root,
|
|
"piece_num": len(piecemap_info["sha512_pieces"]),
|
|
"piece_size": piece_size,
|
|
"inner_path": inner_path
|
|
}
|
|
|
|
def readMultipartHeaders(self, wsgi_input):
|
|
for i in range(100):
|
|
line = wsgi_input.readline()
|
|
if line == "\r\n":
|
|
break
|
|
return i
|
|
|
|
def actionFile(self, file_path, *args, **kwargs):
|
|
if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
|
|
path_parts = kwargs["path_parts"]
|
|
site = self.server.site_manager.get(path_parts["address"])
|
|
kwargs["file_obj"] = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
|
|
|
|
return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
|
|
|
|
|
|
@PluginManager.registerTo("UiWebsocket")
|
|
class UiWebsocketPlugin(object):
|
|
def actionBigfileUploadInit(self, to, inner_path, size):
|
|
valid_signers = self.site.content_manager.getValidSigners(inner_path)
|
|
auth_address = self.user.getAuthAddress(self.site.address)
|
|
if not self.site.settings["own"] and auth_address not in valid_signers:
|
|
self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
|
|
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
|
|
|
|
nonce = CryptHash.random()
|
|
piece_size = 1024 * 1024
|
|
inner_path = self.site.content_manager.sanitizePath(inner_path)
|
|
file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
|
|
|
|
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
|
|
file_relative_path = inner_path[len(content_inner_path_dir):]
|
|
|
|
upload_nonces[nonce] = {
|
|
"added": time.time(),
|
|
"site": self.site,
|
|
"inner_path": inner_path,
|
|
"websocket_client": self,
|
|
"size": size,
|
|
"piece_size": piece_size,
|
|
"piecemap": inner_path + ".piecemap.msgpack"
|
|
}
|
|
self.response(to, {
|
|
"url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
|
|
"pice_size": piece_size,
|
|
"inner_path": inner_path,
|
|
"file_relative_path": file_relative_path
|
|
})
|
|
|
|
|
|
@PluginManager.registerTo("ContentManager")
|
|
class ContentManagerPlugin(object):
|
|
def getFileInfo(self, inner_path, *args, **kwargs):
|
|
if "|" not in inner_path:
|
|
return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
|
|
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
|
|
return file_info
|
|
|
|
def readFile(self, file_in, size, buff_size=1024 * 64):
|
|
part_num = 0
|
|
recv_left = size
|
|
|
|
while 1:
|
|
part_num += 1
|
|
read_size = min(buff_size, recv_left)
|
|
part = file_in.read(read_size)
|
|
|
|
if not part:
|
|
break
|
|
yield part
|
|
|
|
if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
|
|
time.sleep(0.001)
|
|
|
|
recv_left -= read_size
|
|
if recv_left <= 0:
|
|
break
|
|
|
|
def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
|
|
self.site.settings["has_bigfile"] = True
|
|
|
|
recv = 0
|
|
try:
|
|
piece_hash = CryptHash.sha512t()
|
|
piece_hashes = []
|
|
piece_recv = 0
|
|
|
|
mt = merkletools.MerkleTools()
|
|
mt.hash_function = CryptHash.sha512t
|
|
|
|
part = ""
|
|
for part in self.readFile(file_in, size):
|
|
if file_out:
|
|
file_out.write(part)
|
|
|
|
recv += len(part)
|
|
piece_recv += len(part)
|
|
piece_hash.update(part)
|
|
if piece_recv >= piece_size:
|
|
piece_digest = piece_hash.digest()
|
|
piece_hashes.append(piece_digest)
|
|
mt.leaves.append(piece_digest)
|
|
piece_hash = CryptHash.sha512t()
|
|
piece_recv = 0
|
|
|
|
if len(piece_hashes) % 100 == 0 or recv == size:
|
|
self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
|
|
float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
|
|
))
|
|
part = ""
|
|
if len(part) > 0:
|
|
piece_digest = piece_hash.digest()
|
|
piece_hashes.append(piece_digest)
|
|
mt.leaves.append(piece_digest)
|
|
except Exception as err:
|
|
raise err
|
|
finally:
|
|
if file_out:
|
|
file_out.close()
|
|
|
|
mt.make_tree()
|
|
return mt.get_merkle_root(), piece_size, {
|
|
"sha512_pieces": piece_hashes
|
|
}
|
|
|
|
def hashFile(self, dir_inner_path, file_relative_path, optional=False):
|
|
inner_path = dir_inner_path + file_relative_path
|
|
|
|
file_size = self.site.storage.getSize(inner_path)
|
|
# Only care about optional files >1MB
|
|
if not optional or file_size < 1 * 1024 * 1024:
|
|
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
|
|
|
|
back = {}
|
|
content = self.contents.get(dir_inner_path + "content.json")
|
|
|
|
hash = None
|
|
piecemap_relative_path = None
|
|
piece_size = None
|
|
|
|
# Don't re-hash if it's already in content.json
|
|
if content and file_relative_path in content.get("files_optional", {}):
|
|
file_node = content["files_optional"][file_relative_path]
|
|
if file_node["size"] == file_size:
|
|
self.log.info("- [SAME SIZE] %s" % file_relative_path)
|
|
hash = file_node.get("sha512")
|
|
piecemap_relative_path = file_node.get("piecemap")
|
|
piece_size = file_node.get("piece_size")
|
|
|
|
if not hash or not piecemap_relative_path: # Not in content.json yet
|
|
if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
|
|
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
|
|
|
|
self.log.info("- [HASHING] %s" % file_relative_path)
|
|
merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
|
|
if not hash:
|
|
hash = merkle_root
|
|
|
|
if not piecemap_relative_path:
|
|
file_name = helper.getFilename(file_relative_path)
|
|
piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
|
|
piecemap_inner_path = inner_path + ".piecemap.msgpack"
|
|
|
|
msgpack.pack({file_name: piecemap_info}, self.site.storage.open(piecemap_inner_path, "wb"))
|
|
back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, True))
|
|
|
|
piece_num = int(math.ceil(float(file_size) / piece_size))
|
|
|
|
# Add the merkle root to hashfield
|
|
self.optionalDownloaded(inner_path, hash, file_size, own=True)
|
|
self.site.storage.piecefields[hash].fromstring("1" * piece_num)
|
|
|
|
back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
|
|
return back
|
|
|
|
def getPiecemap(self, inner_path):
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
|
|
self.site.needFile(piecemap_inner_path, priority=20)
|
|
piecemap = msgpack.unpack(self.site.storage.open(piecemap_inner_path))[helper.getFilename(inner_path)]
|
|
piecemap["piece_size"] = file_info["piece_size"]
|
|
return piecemap
|
|
|
|
def verifyPiece(self, inner_path, pos, piece):
|
|
piecemap = self.getPiecemap(inner_path)
|
|
piece_i = pos / piecemap["piece_size"]
|
|
if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
|
|
raise VerifyError("Invalid hash")
|
|
return True
|
|
|
|
def verifyFile(self, inner_path, file, ignore_same=True):
|
|
if "|" not in inner_path:
|
|
return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
|
|
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
|
|
return self.verifyPiece(inner_path, pos_from, file)
|
|
|
|
def optionalDownloaded(self, inner_path, hash, size=None, own=False):
|
|
if "|" in inner_path:
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
file_info = self.getFileInfo(inner_path)
|
|
|
|
# Mark piece downloaded
|
|
piece_i = pos_from / file_info["piece_size"]
|
|
self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
|
|
|
|
# Only add to site size on first request
|
|
if hash in self.hashfield:
|
|
size = 0
|
|
|
|
return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash, size, own)
|
|
|
|
def optionalRemove(self, inner_path, hash, size=None):
|
|
if size and size > 1024 * 1024:
|
|
file_info = self.getFileInfo(inner_path)
|
|
sha512 = file_info["sha512"]
|
|
if sha512 in self.site.storage.piecefields:
|
|
del self.site.storage.piecefields[sha512]
|
|
|
|
# Also remove other pieces of the file from download queue
|
|
for key in self.site.bad_files.keys():
|
|
if key.startswith(inner_path + "|"):
|
|
del self.site.bad_files[key]
|
|
return super(ContentManagerPlugin, self).optionalRemove(inner_path, hash, size)
|
|
|
|
|
|
@PluginManager.registerTo("SiteStorage")
|
|
class SiteStoragePlugin(object):
|
|
def __init__(self, *args, **kwargs):
|
|
super(SiteStoragePlugin, self).__init__(*args, **kwargs)
|
|
self.piecefields = collections.defaultdict(BigfilePiecefield)
|
|
if "piecefields" in self.site.settings.get("cache", {}):
|
|
for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").iteritems():
|
|
if piecefield_packed:
|
|
self.piecefields[sha512].unpack(piecefield_packed.decode("base64"))
|
|
self.site.settings["cache"]["piecefields"] = {}
|
|
|
|
def createSparseFile(self, inner_path, size, sha512=None):
|
|
file_path = self.getPath(inner_path)
|
|
f = open(file_path, 'wb')
|
|
f.truncate(size)
|
|
f.close()
|
|
if os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
|
|
|
|
if sha512 and sha512 in self.piecefields:
|
|
self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
|
|
del self.piecefields[sha512]
|
|
|
|
def write(self, inner_path, content):
|
|
if "|" not in inner_path:
|
|
return super(SiteStoragePlugin, self).write(inner_path, content)
|
|
|
|
# Write to specific position by passing |{pos} after the filename
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
file_path = self.getPath(inner_path)
|
|
|
|
# Create dir if not exist
|
|
file_dir = os.path.dirname(file_path)
|
|
if not os.path.isdir(file_dir):
|
|
os.makedirs(file_dir)
|
|
|
|
if not os.path.isfile(file_path):
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
self.createSparseFile(inner_path, file_info["size"])
|
|
|
|
# Write file
|
|
with open(file_path, "rb+") as file:
|
|
file.seek(pos_from)
|
|
if hasattr(content, 'read'): # File-like object
|
|
shutil.copyfileobj(content, file) # Write buff to disk
|
|
else: # Simple string
|
|
file.write(content)
|
|
del content
|
|
self.onUpdated(inner_path)
|
|
|
|
def openBigfile(self, inner_path, prebuffer=0):
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
if file_info and "piecemap" not in file_info: # It's not a big file
|
|
return False
|
|
|
|
self.site.needFile(inner_path, blocking=False) # Download piecemap
|
|
file_path = self.getPath(inner_path)
|
|
sha512 = file_info["sha512"]
|
|
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
|
|
if os.path.isfile(file_path):
|
|
if sha512 not in self.piecefields:
|
|
if open(file_path).read(128) == "\0" * 128:
|
|
piece_data = "0"
|
|
else:
|
|
piece_data = "1"
|
|
self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
|
|
self.piecefields[sha512].fromstring(piece_data * piece_num)
|
|
else:
|
|
self.log.debug("Creating bigfile: %s" % inner_path)
|
|
self.createSparseFile(inner_path, file_info["size"], sha512)
|
|
self.piecefields[sha512].fromstring(piece_data * "0")
|
|
return BigFile(self.site, inner_path, prebuffer=prebuffer)
|
|
|
|
|
|
class BigFile(object):
|
|
def __init__(self, site, inner_path, prebuffer=0):
|
|
self.site = site
|
|
self.inner_path = inner_path
|
|
file_path = site.storage.getPath(inner_path)
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
self.piece_size = file_info["piece_size"]
|
|
self.sha512 = file_info["sha512"]
|
|
self.size = file_info["size"]
|
|
self.prebuffer = prebuffer
|
|
self.read_bytes = 0
|
|
|
|
self.piecefield = self.site.storage.piecefields[self.sha512]
|
|
self.f = open(file_path, "rb+")
|
|
|
|
def read(self, buff=64 * 1024):
|
|
pos = self.f.tell()
|
|
read_until = pos + buff
|
|
requests = []
|
|
# Request all required blocks
|
|
while 1:
|
|
piece_i = pos / self.piece_size
|
|
if piece_i * self.piece_size >= read_until:
|
|
break
|
|
pos_from = piece_i * self.piece_size
|
|
pos_to = pos_from + self.piece_size
|
|
if not self.piecefield[piece_i]:
|
|
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
|
|
pos += self.piece_size
|
|
|
|
if not all(requests):
|
|
return None
|
|
|
|
# Request prebuffer
|
|
if self.prebuffer:
|
|
prebuffer_until = min(self.size, read_until + self.prebuffer)
|
|
priority = 3
|
|
while 1:
|
|
piece_i = pos / self.piece_size
|
|
if piece_i * self.piece_size >= prebuffer_until:
|
|
break
|
|
pos_from = piece_i * self.piece_size
|
|
pos_to = pos_from + self.piece_size
|
|
if not self.piecefield[piece_i]:
|
|
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
|
|
priority -= 1
|
|
pos += self.piece_size
|
|
|
|
gevent.joinall(requests)
|
|
self.read_bytes += buff
|
|
|
|
# Increase buffer for long reads
|
|
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
|
|
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
|
|
self.prebuffer = 5 * 1024 * 1024
|
|
|
|
return self.f.read(buff)
|
|
|
|
def seek(self, pos):
|
|
return self.f.seek(pos)
|
|
|
|
def tell(self):
|
|
self.f.tell()
|
|
|
|
def close(self):
|
|
self.f.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
|
|
@PluginManager.registerTo("WorkerManager")
|
|
class WorkerManagerPlugin(object):
|
|
def addTask(self, inner_path, *args, **kwargs):
|
|
file_info = kwargs.get("file_info")
|
|
if file_info and "piecemap" in file_info: # Bigfile
|
|
self.site.settings["has_bigfile"] = True
|
|
|
|
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
|
|
piecemap_task = None
|
|
if not self.site.storage.isFile(piecemap_inner_path):
|
|
# Start download piecemap
|
|
piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
|
|
if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= config.autodownload_bigfile_size_limit:
|
|
gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
|
|
|
|
if "|" in inner_path:
|
|
# Start download piece
|
|
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
|
|
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
task["piece_i"] = pos_from / file_info["piece_size"]
|
|
task["sha512"] = file_info["sha512"]
|
|
else:
|
|
if inner_path in self.site.bad_files:
|
|
del self.site.bad_files[inner_path]
|
|
if piecemap_task:
|
|
task = piecemap_task
|
|
else:
|
|
fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
|
|
fake_evt.set(True)
|
|
task = {"evt": fake_evt}
|
|
|
|
if not self.site.storage.isFile(inner_path):
|
|
self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
|
|
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
|
|
self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
|
|
else:
|
|
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
|
|
return task
|
|
|
|
def taskAddPeer(self, task, peer):
|
|
if "piece_i" in task:
|
|
if not peer.piecefields[task["sha512"]][task["piece_i"]]:
|
|
if task["sha512"] not in peer.piecefields:
|
|
gevent.spawn(peer.updatePiecefields, force=True)
|
|
elif not task["peers"]:
|
|
gevent.spawn(peer.updatePiecefields)
|
|
|
|
return False # Deny to add peers to task if file not in piecefield
|
|
return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
|
|
|
|
|
|
@PluginManager.registerTo("FileRequest")
|
|
class FileRequestPlugin(object):
|
|
def isReadable(self, site, inner_path, file, pos):
|
|
# Peek into file
|
|
if file.read(10) == "\0" * 10:
|
|
# Looks empty, but makes sures we don't have that piece
|
|
file_info = site.content_manager.getFileInfo(inner_path)
|
|
piece_i = pos / file_info["piece_size"]
|
|
if not site.storage.piecefields[file_info["sha512"]][piece_i]:
|
|
return False
|
|
# Seek back to position we want to read
|
|
file.seek(pos)
|
|
return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
|
|
|
|
def actionGetPiecefields(self, params):
|
|
site = self.sites.get(params["site"])
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
|
self.response({"error": "Unknown site"})
|
|
return False
|
|
|
|
# Add peer to site if not added before
|
|
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
|
|
if not peer.connection: # Just added
|
|
peer.connect(self.connection) # Assign current connection to peer
|
|
|
|
piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.iteritems()}
|
|
self.response({"piecefields_packed": piecefields_packed})
|
|
|
|
def actionSetPiecefields(self, params):
|
|
site = self.sites.get(params["site"])
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
|
self.response({"error": "Unknown site"})
|
|
self.connection.badAction(5)
|
|
return False
|
|
|
|
# Add or get peer
|
|
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
|
|
if not peer.connection:
|
|
peer.connect(self.connection)
|
|
|
|
peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
|
|
for sha512, piecefield_packed in params["piecefields_packed"].iteritems():
|
|
peer.piecefields[sha512].unpack(piecefield_packed)
|
|
site.settings["has_bigfile"] = True
|
|
|
|
self.response({"ok": "Updated"})
|
|
|
|
|
|
@PluginManager.registerTo("Peer")
|
|
class PeerPlugin(object):
|
|
def __getattr__(self, key):
|
|
if key == "piecefields":
|
|
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
|
|
return self.piecefields
|
|
elif key == "time_piecefields_updated":
|
|
self.time_piecefields_updated = None
|
|
return self.time_piecefields_updated
|
|
else:
|
|
return super(PeerPlugin, self).__getattr__(key)
|
|
|
|
@util.Noparallel(ignore_args=True)
|
|
def updatePiecefields(self, force=False):
|
|
if self.connection and self.connection.handshake.get("rev", 0) < 2190:
|
|
return False # Not supported
|
|
|
|
# Don't update piecefield again in 1 min
|
|
if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
|
|
return False
|
|
|
|
self.time_piecefields_updated = time.time()
|
|
res = self.request("getPiecefields", {"site": self.site.address})
|
|
if not res or "error" in res:
|
|
return False
|
|
|
|
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
|
|
try:
|
|
for sha512, piecefield_packed in res["piecefields_packed"].iteritems():
|
|
self.piecefields[sha512].unpack(piecefield_packed)
|
|
except Exception as err:
|
|
self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
|
|
|
|
return self.piecefields
|
|
|
|
def sendMyHashfield(self, *args, **kwargs):
|
|
return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
|
|
|
|
def updateHashfield(self, *args, **kwargs):
|
|
if self.site.settings.get("has_bigfile"):
|
|
thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
|
|
back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
|
|
thread.join()
|
|
return back
|
|
else:
|
|
return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
|
|
|
|
def getFile(self, site, inner_path, *args, **kwargs):
|
|
if "|" in inner_path:
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
kwargs["pos_from"] = pos_from
|
|
kwargs["pos_to"] = pos_to
|
|
return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
|
|
|
|
|
|
@PluginManager.registerTo("Site")
|
|
class SitePlugin(object):
|
|
def isFileDownloadAllowed(self, inner_path, file_info):
|
|
if "piecemap" in file_info:
|
|
file_info = file_info.copy()
|
|
file_info["size"] = file_info["piece_size"]
|
|
return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
|
|
|
|
def getSettingsCache(self):
|
|
back = super(SitePlugin, self).getSettingsCache()
|
|
if self.storage.piecefields:
|
|
back["piecefields"] = {sha512: piecefield.pack().encode("base64") for sha512, piecefield in self.storage.piecefields.iteritems()}
|
|
return back
|
|
|
|
def needFile(self, inner_path, *args, **kwargs):
|
|
if inner_path.endswith("|all"):
|
|
@util.Pooled(20)
|
|
def pooledNeedBigfile(*args, **kwargs):
|
|
return self.needFile(*args, **kwargs)
|
|
|
|
inner_path = inner_path.replace("|all", "")
|
|
file_info = self.needFileInfo(inner_path)
|
|
file_size = file_info["size"]
|
|
piece_size = file_info["piece_size"]
|
|
|
|
piece_num = int(math.ceil(float(file_size) / piece_size))
|
|
|
|
file_threads = []
|
|
|
|
piecefield = self.storage.piecefields.get(file_info["sha512"])
|
|
|
|
for piece_i in range(piece_num):
|
|
piece_from = piece_i * piece_size
|
|
piece_to = min(file_size, piece_from + piece_size)
|
|
if not piecefield or not piecefield[piece_i]:
|
|
res = pooledNeedBigfile("%s|%s-%s" % (inner_path, piece_from, piece_to), blocking=False)
|
|
if res is not True and res is not False:
|
|
file_threads.append(res)
|
|
gevent.joinall(file_threads)
|
|
else:
|
|
return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
|
|
|
|
|
|
@PluginManager.registerTo("ConfigPlugin")
|
|
class ConfigPlugin(object):
|
|
def createArguments(self):
|
|
group = self.parser.add_argument_group("Bigfile plugin")
|
|
group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles until this limit if help distribute option is checked', default=1, metavar="MB", type=int)
|
|
|
|
return super(ConfigPlugin, self).createArguments()
|