
Big File is an optional file which is bigger than 10MB, so the default value should be at least 10MB.
778 lines
33 KiB
Python
778 lines
33 KiB
Python
import time
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
import collections
|
|
import math
|
|
import warnings
|
|
import base64
|
|
import binascii
|
|
import json
|
|
|
|
import gevent
|
|
import gevent.lock
|
|
|
|
from Plugin import PluginManager
|
|
from Debug import Debug
|
|
from Crypt import CryptHash
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore") # Ignore missing sha3 warning
|
|
import merkletools
|
|
|
|
from util import helper
|
|
from util import Msgpack
|
|
import util
|
|
from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
|
|
|
|
|
|
# We can only import plugin host clases after the plugins are loaded
|
|
@PluginManager.afterLoad
|
|
def importPluginnedClasses():
|
|
global VerifyError, config
|
|
from Content.ContentManager import VerifyError
|
|
from Config import config
|
|
|
|
if "upload_nonces" not in locals():
|
|
upload_nonces = {}
|
|
|
|
|
|
@PluginManager.registerTo("UiRequest")
|
|
class UiRequestPlugin(object):
|
|
def isCorsAllowed(self, path):
|
|
if path == "/ZeroNet-Internal/BigfileUpload":
|
|
return True
|
|
else:
|
|
return super(UiRequestPlugin, self).isCorsAllowed(path)
|
|
|
|
@helper.encodeResponse
|
|
def actionBigfileUpload(self):
|
|
nonce = self.get.get("upload_nonce")
|
|
if nonce not in upload_nonces:
|
|
return self.error403("Upload nonce error.")
|
|
|
|
upload_info = upload_nonces[nonce]
|
|
del upload_nonces[nonce]
|
|
|
|
self.sendHeader(200, "text/html", noscript=True, extra_headers={
|
|
"Access-Control-Allow-Origin": "null",
|
|
"Access-Control-Allow-Credentials": "true"
|
|
})
|
|
|
|
self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
|
|
|
|
site = upload_info["site"]
|
|
inner_path = upload_info["inner_path"]
|
|
|
|
with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
|
|
merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
|
|
self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
|
|
)
|
|
|
|
if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
|
|
hash = binascii.hexlify(piecemap_info["sha512_pieces"][0].encode())
|
|
hash_id = site.content_manager.hashfield.getHashId(hash)
|
|
site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
|
|
|
|
else: # Big file
|
|
file_name = helper.getFilename(inner_path)
|
|
site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info}))
|
|
|
|
# Find piecemap and file relative path to content.json
|
|
file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
|
|
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
|
|
piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
|
|
file_relative_path = inner_path[len(content_inner_path_dir):]
|
|
|
|
# Add file to content.json
|
|
if site.storage.isFile(file_info["content_inner_path"]):
|
|
content = site.storage.loadJson(file_info["content_inner_path"])
|
|
else:
|
|
content = {}
|
|
if "files_optional" not in content:
|
|
content["files_optional"] = {}
|
|
|
|
content["files_optional"][file_relative_path] = {
|
|
"sha512": merkle_root,
|
|
"size": upload_info["size"],
|
|
"piecemap": piecemap_relative_path,
|
|
"piece_size": piece_size
|
|
}
|
|
|
|
merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
|
|
site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
|
|
site.storage.writeJson(file_info["content_inner_path"], content)
|
|
|
|
site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
|
|
|
|
return json.dumps({
|
|
"merkle_root": merkle_root,
|
|
"piece_num": len(piecemap_info["sha512_pieces"]),
|
|
"piece_size": piece_size,
|
|
"inner_path": inner_path
|
|
})
|
|
|
|
def readMultipartHeaders(self, wsgi_input):
|
|
for i in range(100):
|
|
line = wsgi_input.readline()
|
|
if line == "\r\n":
|
|
break
|
|
return i
|
|
|
|
def actionFile(self, file_path, *args, **kwargs):
|
|
if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
|
|
path_parts = kwargs["path_parts"]
|
|
site = self.server.site_manager.get(path_parts["address"])
|
|
big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
|
|
if big_file:
|
|
kwargs["file_obj"] = big_file
|
|
kwargs["file_size"] = big_file.size
|
|
|
|
return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
|
|
|
|
|
|
@PluginManager.registerTo("UiWebsocket")
|
|
class UiWebsocketPlugin(object):
|
|
def actionBigfileUploadInit(self, to, inner_path, size):
|
|
valid_signers = self.site.content_manager.getValidSigners(inner_path)
|
|
auth_address = self.user.getAuthAddress(self.site.address)
|
|
if not self.site.settings["own"] and auth_address not in valid_signers:
|
|
self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
|
|
return self.response(to, {"error": "Forbidden, you can only modify your own files"})
|
|
|
|
nonce = CryptHash.random()
|
|
piece_size = 1024 * 1024
|
|
inner_path = self.site.content_manager.sanitizePath(inner_path)
|
|
file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
|
|
|
|
content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
|
|
file_relative_path = inner_path[len(content_inner_path_dir):]
|
|
|
|
upload_nonces[nonce] = {
|
|
"added": time.time(),
|
|
"site": self.site,
|
|
"inner_path": inner_path,
|
|
"websocket_client": self,
|
|
"size": size,
|
|
"piece_size": piece_size,
|
|
"piecemap": inner_path + ".piecemap.msgpack"
|
|
}
|
|
return {
|
|
"url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
|
|
"piece_size": piece_size,
|
|
"inner_path": inner_path,
|
|
"file_relative_path": file_relative_path
|
|
}
|
|
|
|
def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
|
|
permissions = self.getPermissions(to)
|
|
if "ADMIN" not in permissions:
|
|
return self.response(to, "You don't have permission to run this command")
|
|
|
|
self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
|
|
self.response(to, "ok")
|
|
|
|
def actionFileDelete(self, to, inner_path):
|
|
piecemap_inner_path = inner_path + ".piecemap.msgpack"
|
|
if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
|
|
# Also delete .piecemap.msgpack file if exists
|
|
self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
|
|
file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
|
|
if file_info:
|
|
content_json = self.site.storage.loadJson(file_info["content_inner_path"])
|
|
relative_path = file_info["relative_path"]
|
|
if relative_path in content_json.get("files_optional", {}):
|
|
del content_json["files_optional"][relative_path]
|
|
self.site.storage.writeJson(file_info["content_inner_path"], content_json)
|
|
self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
|
|
try:
|
|
self.site.storage.delete(piecemap_inner_path)
|
|
except Exception as err:
|
|
self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
|
|
|
|
return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
|
|
|
|
|
|
@PluginManager.registerTo("ContentManager")
|
|
class ContentManagerPlugin(object):
|
|
def getFileInfo(self, inner_path, *args, **kwargs):
|
|
if "|" not in inner_path:
|
|
return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
|
|
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
|
|
return file_info
|
|
|
|
def readFile(self, file_in, size, buff_size=1024 * 64):
|
|
part_num = 0
|
|
recv_left = size
|
|
|
|
while 1:
|
|
part_num += 1
|
|
read_size = min(buff_size, recv_left)
|
|
part = file_in.read(read_size)
|
|
|
|
if not part:
|
|
break
|
|
yield part
|
|
|
|
if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
|
|
time.sleep(0.001)
|
|
|
|
recv_left -= read_size
|
|
if recv_left <= 0:
|
|
break
|
|
|
|
def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
|
|
self.site.settings["has_bigfile"] = True
|
|
|
|
recv = 0
|
|
try:
|
|
piece_hash = CryptHash.sha512t()
|
|
piece_hashes = []
|
|
piece_recv = 0
|
|
|
|
mt = merkletools.MerkleTools()
|
|
mt.hash_function = CryptHash.sha512t
|
|
|
|
part = ""
|
|
for part in self.readFile(file_in, size):
|
|
if file_out:
|
|
file_out.write(part)
|
|
|
|
recv += len(part)
|
|
piece_recv += len(part)
|
|
piece_hash.update(part)
|
|
if piece_recv >= piece_size:
|
|
piece_digest = piece_hash.digest()
|
|
piece_hashes.append(piece_digest)
|
|
mt.leaves.append(piece_digest)
|
|
piece_hash = CryptHash.sha512t()
|
|
piece_recv = 0
|
|
|
|
if len(piece_hashes) % 100 == 0 or recv == size:
|
|
self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
|
|
float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
|
|
))
|
|
part = ""
|
|
if len(part) > 0:
|
|
piece_digest = piece_hash.digest()
|
|
piece_hashes.append(piece_digest)
|
|
mt.leaves.append(piece_digest)
|
|
except Exception as err:
|
|
raise err
|
|
finally:
|
|
if file_out:
|
|
file_out.close()
|
|
|
|
mt.make_tree()
|
|
merkle_root = mt.get_merkle_root()
|
|
if type(merkle_root) is bytes: # Python <3.5
|
|
merkle_root = merkle_root.decode()
|
|
return merkle_root, piece_size, {
|
|
"sha512_pieces": piece_hashes
|
|
}
|
|
|
|
def hashFile(self, dir_inner_path, file_relative_path, optional=False):
|
|
inner_path = dir_inner_path + file_relative_path
|
|
|
|
file_size = self.site.storage.getSize(inner_path)
|
|
# Only care about optional files >1MB
|
|
if not optional or file_size < 1 * 1024 * 1024:
|
|
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
|
|
|
|
back = {}
|
|
content = self.contents.get(dir_inner_path + "content.json")
|
|
|
|
hash = None
|
|
piecemap_relative_path = None
|
|
piece_size = None
|
|
|
|
# Don't re-hash if it's already in content.json
|
|
if content and file_relative_path in content.get("files_optional", {}):
|
|
file_node = content["files_optional"][file_relative_path]
|
|
if file_node["size"] == file_size:
|
|
self.log.info("- [SAME SIZE] %s" % file_relative_path)
|
|
hash = file_node.get("sha512")
|
|
piecemap_relative_path = file_node.get("piecemap")
|
|
piece_size = file_node.get("piece_size")
|
|
|
|
if not hash or not piecemap_relative_path: # Not in content.json yet
|
|
if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
|
|
return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
|
|
|
|
self.log.info("- [HASHING] %s" % file_relative_path)
|
|
merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
|
|
if not hash:
|
|
hash = merkle_root
|
|
|
|
if not piecemap_relative_path:
|
|
file_name = helper.getFilename(file_relative_path)
|
|
piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
|
|
piecemap_inner_path = inner_path + ".piecemap.msgpack"
|
|
|
|
self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info}))
|
|
|
|
back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
|
|
|
|
piece_num = int(math.ceil(float(file_size) / piece_size))
|
|
|
|
# Add the merkle root to hashfield
|
|
hash_id = self.site.content_manager.hashfield.getHashId(hash)
|
|
self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
|
|
self.site.storage.piecefields[hash].fromstring("1" * piece_num)
|
|
|
|
back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
|
|
return back
|
|
|
|
def getPiecemap(self, inner_path):
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
|
|
self.site.needFile(piecemap_inner_path, priority=20)
|
|
piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path).read())[helper.getFilename(inner_path)]
|
|
piecemap["piece_size"] = file_info["piece_size"]
|
|
return piecemap
|
|
|
|
def verifyPiece(self, inner_path, pos, piece):
|
|
piecemap = self.getPiecemap(inner_path)
|
|
piece_i = int(pos / piecemap["piece_size"])
|
|
if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
|
|
raise VerifyError("Invalid hash")
|
|
return True
|
|
|
|
def verifyFile(self, inner_path, file, ignore_same=True):
|
|
if "|" not in inner_path:
|
|
return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
|
|
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
|
|
return self.verifyPiece(inner_path, pos_from, file)
|
|
|
|
def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
|
|
if "|" in inner_path:
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
file_info = self.getFileInfo(inner_path)
|
|
|
|
# Mark piece downloaded
|
|
piece_i = int(pos_from / file_info["piece_size"])
|
|
self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
|
|
|
|
# Only add to site size on first request
|
|
if hash_id in self.hashfield:
|
|
size = 0
|
|
elif size > 1024 * 1024:
|
|
file_info = self.getFileInfo(inner_path)
|
|
if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
|
|
sha512 = file_info["sha512"]
|
|
if sha512 not in self.site.storage.piecefields:
|
|
self.site.storage.checkBigfile(inner_path)
|
|
|
|
return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
|
|
|
|
def optionalRemoved(self, inner_path, hash_id, size=None):
|
|
if size and size > 1024 * 1024:
|
|
file_info = self.getFileInfo(inner_path)
|
|
sha512 = file_info["sha512"]
|
|
if sha512 in self.site.storage.piecefields:
|
|
del self.site.storage.piecefields[sha512]
|
|
|
|
# Also remove other pieces of the file from download queue
|
|
for key in list(self.site.bad_files.keys()):
|
|
if key.startswith(inner_path + "|"):
|
|
del self.site.bad_files[key]
|
|
self.site.worker_manager.removeSolvedFileTasks()
|
|
return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
|
|
|
|
|
|
@PluginManager.registerTo("SiteStorage")
|
|
class SiteStoragePlugin(object):
|
|
def __init__(self, *args, **kwargs):
|
|
super(SiteStoragePlugin, self).__init__(*args, **kwargs)
|
|
self.piecefields = collections.defaultdict(BigfilePiecefield)
|
|
if "piecefields" in self.site.settings.get("cache", {}):
|
|
for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
|
|
if piecefield_packed:
|
|
self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
|
|
self.site.settings["cache"]["piecefields"] = {}
|
|
|
|
def createSparseFile(self, inner_path, size, sha512=None):
|
|
file_path = self.getPath(inner_path)
|
|
|
|
file_dir = os.path.dirname(file_path)
|
|
if not os.path.isdir(file_dir):
|
|
os.makedirs(file_dir)
|
|
|
|
f = open(file_path, 'wb')
|
|
f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
|
|
f.close()
|
|
if os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO()
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
|
subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
|
|
|
|
if sha512 and sha512 in self.piecefields:
|
|
self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
|
|
del self.piecefields[sha512]
|
|
|
|
def write(self, inner_path, content):
|
|
if "|" not in inner_path:
|
|
return super(SiteStoragePlugin, self).write(inner_path, content)
|
|
|
|
# Write to specific position by passing |{pos} after the filename
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
file_path = self.getPath(inner_path)
|
|
|
|
# Create dir if not exist
|
|
file_dir = os.path.dirname(file_path)
|
|
if not os.path.isdir(file_dir):
|
|
os.makedirs(file_dir)
|
|
|
|
if not os.path.isfile(file_path):
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
self.createSparseFile(inner_path, file_info["size"])
|
|
|
|
# Write file
|
|
with open(file_path, "rb+") as file:
|
|
file.seek(pos_from)
|
|
if hasattr(content, 'read'): # File-like object
|
|
shutil.copyfileobj(content, file) # Write buff to disk
|
|
else: # Simple string
|
|
file.write(content)
|
|
del content
|
|
self.onUpdated(inner_path)
|
|
|
|
def checkBigfile(self, inner_path):
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
|
|
return False
|
|
|
|
self.site.settings["has_bigfile"] = True
|
|
file_path = self.getPath(inner_path)
|
|
sha512 = file_info["sha512"]
|
|
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
|
|
if os.path.isfile(file_path):
|
|
if sha512 not in self.piecefields:
|
|
if open(file_path).read(128) == "\0" * 128:
|
|
piece_data = "0"
|
|
else:
|
|
piece_data = "1"
|
|
self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
|
|
self.piecefields[sha512].fromstring(piece_data * piece_num)
|
|
else:
|
|
self.log.debug("Creating bigfile: %s" % inner_path)
|
|
self.createSparseFile(inner_path, file_info["size"], sha512)
|
|
self.piecefields[sha512].fromstring("0" * piece_num)
|
|
return True
|
|
|
|
def openBigfile(self, inner_path, prebuffer=0):
|
|
if not self.checkBigfile(inner_path):
|
|
return False
|
|
self.site.needFile(inner_path, blocking=False) # Download piecemap
|
|
return BigFile(self.site, inner_path, prebuffer=prebuffer)
|
|
|
|
|
|
class BigFile(object):
|
|
def __init__(self, site, inner_path, prebuffer=0):
|
|
self.site = site
|
|
self.inner_path = inner_path
|
|
file_path = site.storage.getPath(inner_path)
|
|
file_info = self.site.content_manager.getFileInfo(inner_path)
|
|
self.piece_size = file_info["piece_size"]
|
|
self.sha512 = file_info["sha512"]
|
|
self.size = file_info["size"]
|
|
self.prebuffer = prebuffer
|
|
self.read_bytes = 0
|
|
|
|
self.piecefield = self.site.storage.piecefields[self.sha512]
|
|
self.f = open(file_path, "rb+")
|
|
self.read_lock = gevent.lock.Semaphore()
|
|
|
|
def read(self, buff=64 * 1024):
|
|
with self.read_lock:
|
|
pos = self.f.tell()
|
|
read_until = min(self.size, pos + buff)
|
|
requests = []
|
|
# Request all required blocks
|
|
while 1:
|
|
piece_i = int(pos / self.piece_size)
|
|
if piece_i * self.piece_size >= read_until:
|
|
break
|
|
pos_from = piece_i * self.piece_size
|
|
pos_to = pos_from + self.piece_size
|
|
if not self.piecefield[piece_i]:
|
|
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
|
|
pos += self.piece_size
|
|
|
|
if not all(requests):
|
|
return None
|
|
|
|
# Request prebuffer
|
|
if self.prebuffer:
|
|
prebuffer_until = min(self.size, read_until + self.prebuffer)
|
|
priority = 3
|
|
while 1:
|
|
piece_i = int(pos / self.piece_size)
|
|
if piece_i * self.piece_size >= prebuffer_until:
|
|
break
|
|
pos_from = piece_i * self.piece_size
|
|
pos_to = pos_from + self.piece_size
|
|
if not self.piecefield[piece_i]:
|
|
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
|
|
priority -= 1
|
|
pos += self.piece_size
|
|
|
|
gevent.joinall(requests)
|
|
self.read_bytes += buff
|
|
|
|
# Increase buffer for long reads
|
|
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
|
|
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
|
|
self.prebuffer = 5 * 1024 * 1024
|
|
|
|
return self.f.read(buff)
|
|
|
|
def seek(self, pos, whence=0):
|
|
with self.read_lock:
|
|
if whence == 2: # Relative from file end
|
|
pos = self.size + pos # Use the real size instead of size on the disk
|
|
whence = 0
|
|
return self.f.seek(pos, whence)
|
|
|
|
def tell(self):
|
|
return self.f.tell()
|
|
|
|
def close(self):
|
|
self.f.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
|
|
@PluginManager.registerTo("WorkerManager")
|
|
class WorkerManagerPlugin(object):
|
|
def addTask(self, inner_path, *args, **kwargs):
|
|
file_info = kwargs.get("file_info")
|
|
if file_info and "piecemap" in file_info: # Bigfile
|
|
self.site.settings["has_bigfile"] = True
|
|
|
|
piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
|
|
piecemap_task = None
|
|
if not self.site.storage.isFile(piecemap_inner_path):
|
|
# Start download piecemap
|
|
piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
|
|
autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
|
|
if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
|
|
gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
|
|
|
|
if "|" in inner_path:
|
|
# Start download piece
|
|
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
|
|
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
task["piece_i"] = int(pos_from / file_info["piece_size"])
|
|
task["sha512"] = file_info["sha512"]
|
|
else:
|
|
if inner_path in self.site.bad_files:
|
|
del self.site.bad_files[inner_path]
|
|
if piecemap_task:
|
|
task = piecemap_task
|
|
else:
|
|
fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
|
|
fake_evt.set(True)
|
|
task = {"evt": fake_evt}
|
|
|
|
if not self.site.storage.isFile(inner_path):
|
|
self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
|
|
piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
|
|
self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
|
|
else:
|
|
task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
|
|
return task
|
|
|
|
def taskAddPeer(self, task, peer):
|
|
if "piece_i" in task:
|
|
if not peer.piecefields[task["sha512"]][task["piece_i"]]:
|
|
if task["sha512"] not in peer.piecefields:
|
|
gevent.spawn(peer.updatePiecefields, force=True)
|
|
elif not task["peers"]:
|
|
gevent.spawn(peer.updatePiecefields)
|
|
|
|
return False # Deny to add peers to task if file not in piecefield
|
|
return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
|
|
|
|
|
|
@PluginManager.registerTo("FileRequest")
|
|
class FileRequestPlugin(object):
|
|
def isReadable(self, site, inner_path, file, pos):
|
|
# Peek into file
|
|
if file.read(10) == b"\0" * 10:
|
|
# Looks empty, but makes sures we don't have that piece
|
|
file_info = site.content_manager.getFileInfo(inner_path)
|
|
piece_i = int(pos / file_info["piece_size"])
|
|
if not site.storage.piecefields[file_info["sha512"]][piece_i]:
|
|
return False
|
|
# Seek back to position we want to read
|
|
file.seek(pos)
|
|
return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
|
|
|
|
def actionGetPiecefields(self, params):
|
|
site = self.sites.get(params["site"])
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
|
self.response({"error": "Unknown site"})
|
|
return False
|
|
|
|
# Add peer to site if not added before
|
|
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
|
|
if not peer.connection: # Just added
|
|
peer.connect(self.connection) # Assign current connection to peer
|
|
|
|
piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
|
|
self.response({"piecefields_packed": piecefields_packed})
|
|
|
|
def actionSetPiecefields(self, params):
|
|
site = self.sites.get(params["site"])
|
|
if not site or not site.settings["serving"]: # Site unknown or not serving
|
|
self.response({"error": "Unknown site"})
|
|
self.connection.badAction(5)
|
|
return False
|
|
|
|
# Add or get peer
|
|
peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
|
|
if not peer.connection:
|
|
peer.connect(self.connection)
|
|
|
|
peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
|
|
for sha512, piecefield_packed in params["piecefields_packed"].items():
|
|
peer.piecefields[sha512].unpack(piecefield_packed)
|
|
site.settings["has_bigfile"] = True
|
|
|
|
self.response({"ok": "Updated"})
|
|
|
|
|
|
@PluginManager.registerTo("Peer")
|
|
class PeerPlugin(object):
|
|
def __getattr__(self, key):
|
|
if key == "piecefields":
|
|
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
|
|
return self.piecefields
|
|
elif key == "time_piecefields_updated":
|
|
self.time_piecefields_updated = None
|
|
return self.time_piecefields_updated
|
|
else:
|
|
return super(PeerPlugin, self).__getattr__(key)
|
|
|
|
@util.Noparallel(ignore_args=True)
|
|
def updatePiecefields(self, force=False):
|
|
if self.connection and self.connection.handshake.get("rev", 0) < 2190:
|
|
return False # Not supported
|
|
|
|
# Don't update piecefield again in 1 min
|
|
if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
|
|
return False
|
|
|
|
self.time_piecefields_updated = time.time()
|
|
res = self.request("getPiecefields", {"site": self.site.address})
|
|
if not res or "error" in res:
|
|
return False
|
|
|
|
self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
|
|
try:
|
|
for sha512, piecefield_packed in res["piecefields_packed"].items():
|
|
self.piecefields[sha512].unpack(piecefield_packed)
|
|
except Exception as err:
|
|
self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
|
|
|
|
return self.piecefields
|
|
|
|
def sendMyHashfield(self, *args, **kwargs):
|
|
return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
|
|
|
|
def updateHashfield(self, *args, **kwargs):
|
|
if self.site.settings.get("has_bigfile"):
|
|
thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
|
|
back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
|
|
thread.join()
|
|
return back
|
|
else:
|
|
return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
|
|
|
|
def getFile(self, site, inner_path, *args, **kwargs):
|
|
if "|" in inner_path:
|
|
inner_path, file_range = inner_path.split("|")
|
|
pos_from, pos_to = map(int, file_range.split("-"))
|
|
kwargs["pos_from"] = pos_from
|
|
kwargs["pos_to"] = pos_to
|
|
return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
|
|
|
|
|
|
@PluginManager.registerTo("Site")
|
|
class SitePlugin(object):
|
|
def isFileDownloadAllowed(self, inner_path, file_info):
|
|
if "piecemap" in file_info:
|
|
file_size_mb = file_info["size"] / 1024 / 1024
|
|
if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
|
|
self.log.debug(
|
|
"Bigfile size %s too large: %sMB > %sMB, skipping..." %
|
|
(inner_path, file_size_mb, config.bigfile_size_limit)
|
|
)
|
|
return False
|
|
|
|
file_info = file_info.copy()
|
|
file_info["size"] = file_info["piece_size"]
|
|
return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
|
|
|
|
def getSettingsCache(self):
|
|
back = super(SitePlugin, self).getSettingsCache()
|
|
if self.storage.piecefields:
|
|
back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
|
|
return back
|
|
|
|
def needFile(self, inner_path, *args, **kwargs):
|
|
if inner_path.endswith("|all"):
|
|
@util.Pooled(20)
|
|
def pooledNeedBigfile(inner_path, *args, **kwargs):
|
|
if inner_path not in self.bad_files:
|
|
self.log.debug("Cancelled piece, skipping %s" % inner_path)
|
|
return False
|
|
return self.needFile(inner_path, *args, **kwargs)
|
|
|
|
inner_path = inner_path.replace("|all", "")
|
|
file_info = self.needFileInfo(inner_path)
|
|
file_size = file_info["size"]
|
|
piece_size = file_info["piece_size"]
|
|
|
|
piece_num = int(math.ceil(float(file_size) / piece_size))
|
|
|
|
file_threads = []
|
|
|
|
piecefield = self.storage.piecefields.get(file_info["sha512"])
|
|
|
|
for piece_i in range(piece_num):
|
|
piece_from = piece_i * piece_size
|
|
piece_to = min(file_size, piece_from + piece_size)
|
|
if not piecefield or not piecefield[piece_i]:
|
|
inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
|
|
self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
|
|
res = pooledNeedBigfile(inner_path_piece, blocking=False)
|
|
if res is not True and res is not False:
|
|
file_threads.append(res)
|
|
gevent.joinall(file_threads)
|
|
else:
|
|
return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
|
|
|
|
|
|
@PluginManager.registerTo("ConfigPlugin")
|
|
class ConfigPlugin(object):
|
|
def createArguments(self):
|
|
group = self.parser.add_argument_group("Bigfile plugin")
|
|
group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=10, metavar="MB", type=int)
|
|
group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
|
|
|
|
return super(ConfigPlugin, self).createArguments()
|