Unify handling Stream and Get file requests
This commit is contained in:
parent
d6d9e911fe
commit
8ae9b5261e
1 changed files with 40 additions and 55 deletions
|
@ -187,34 +187,59 @@ class FileRequest(object):
|
|||
self.response({"error": "File invalid: %s" % err})
|
||||
self.connection.badAction(5)
|
||||
|
||||
def isReadable(self, site, inner_path, file, pos):
|
||||
return True
|
||||
|
||||
# Send file content request
|
||||
def actionGetFile(self, params):
|
||||
def handleGetFile(self, params, streaming=False):
|
||||
site = self.sites.get(params["site"])
|
||||
if not site or not site.settings["serving"]: # Site unknown or not serving
|
||||
self.response({"error": "Unknown site"})
|
||||
return False
|
||||
try:
|
||||
file_path = site.storage.getPath(params["inner_path"])
|
||||
with StreamingMsgpack.FilePart(file_path, "rb") as file:
|
||||
if streaming:
|
||||
file_obj = site.storage.open(params["inner_path"])
|
||||
else:
|
||||
file_obj = StreamingMsgpack.FilePart(file_path, "rb")
|
||||
|
||||
with file_obj as file:
|
||||
file.seek(params["location"])
|
||||
file.read_bytes = FILE_BUFF
|
||||
read_bytes = params.get("read_bytes", FILE_BUFF)
|
||||
file_size = os.fstat(file.fileno()).st_size
|
||||
|
||||
if file_size > read_bytes: # Check if file is readable at current position (for big files)
|
||||
if not self.isReadable(site, params["inner_path"], file, params["location"]):
|
||||
raise RequestError("File not readable at position: %s" % params["location"])
|
||||
|
||||
if not streaming:
|
||||
file.read_bytes = read_bytes
|
||||
|
||||
if params.get("file_size") and params["file_size"] != file_size:
|
||||
self.connection.badAction(5)
|
||||
self.connection.badAction(2)
|
||||
raise RequestError("File size does not match: %sB != %sB" % (params["file_size"], file_size))
|
||||
|
||||
if params["location"] > file_size:
|
||||
self.connection.badAction(5)
|
||||
raise RequestError("Bad file location")
|
||||
|
||||
back = {
|
||||
"body": file,
|
||||
"size": file_size,
|
||||
"location": min(file.tell() + FILE_BUFF, file_size)
|
||||
}
|
||||
self.response(back, streaming=True)
|
||||
if streaming:
|
||||
back = {
|
||||
"size": file_size,
|
||||
"location": min(file.tell() + read_bytes, file_size),
|
||||
"stream_bytes": min(read_bytes, file_size - params["location"])
|
||||
}
|
||||
self.response(back)
|
||||
self.sendRawfile(file, read_bytes=read_bytes)
|
||||
else:
|
||||
back = {
|
||||
"body": file,
|
||||
"size": file_size,
|
||||
"location": min(file.tell() + file.read_bytes, file_size)
|
||||
}
|
||||
self.response(back, streaming=True)
|
||||
|
||||
bytes_sent = min(FILE_BUFF, file_size - params["location"]) # Number of bytes we going to send
|
||||
bytes_sent = min(read_bytes, file_size - params["location"]) # Number of bytes we going to send
|
||||
site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + bytes_sent
|
||||
if config.debug_socket:
|
||||
self.log.debug("File %s at position %s sent %s bytes" % (file_path, params["location"], bytes_sent))
|
||||
|
@ -235,51 +260,11 @@ class FileRequest(object):
|
|||
self.response({"error": "File read error"})
|
||||
return False
|
||||
|
||||
# New-style file streaming out of Msgpack context
|
||||
def actionGetFile(self, params):
|
||||
return self.handleGetFile(params)
|
||||
|
||||
def actionStreamFile(self, params):
|
||||
site = self.sites.get(params["site"])
|
||||
if not site or not site.settings["serving"]: # Site unknown or not serving
|
||||
self.response({"error": "Unknown site"})
|
||||
return False
|
||||
try:
|
||||
if config.debug_socket:
|
||||
self.log.debug("Opening file: %s" % params["inner_path"])
|
||||
with site.storage.open(params["inner_path"]) as file:
|
||||
file.seek(params["location"])
|
||||
file_size = os.fstat(file.fileno()).st_size
|
||||
stream_bytes = min(FILE_BUFF, file_size - params["location"])
|
||||
if stream_bytes < 0:
|
||||
self.connection.badAction(5)
|
||||
raise RequestError("Bad file location")
|
||||
|
||||
back = {
|
||||
"size": file_size,
|
||||
"location": min(file.tell() + FILE_BUFF, file_size),
|
||||
"stream_bytes": stream_bytes
|
||||
}
|
||||
if config.debug_socket:
|
||||
self.log.debug(
|
||||
"Sending file %s from position %s to %s" %
|
||||
(params["inner_path"], params["location"], back["location"])
|
||||
)
|
||||
self.response(back)
|
||||
self.sendRawfile(file, read_bytes=FILE_BUFF)
|
||||
|
||||
site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + stream_bytes
|
||||
if config.debug_socket:
|
||||
self.log.debug("File %s at position %s sent %s bytes" % (params["inner_path"], params["location"], stream_bytes))
|
||||
|
||||
# Add peer to site if not added before
|
||||
connected_peer = site.addPeer(self.connection.ip, self.connection.port)
|
||||
if connected_peer: # Just added
|
||||
connected_peer.connect(self.connection) # Assign current connection to peer
|
||||
|
||||
return {"bytes_sent": stream_bytes, "file_size": file_size, "location": params["location"]}
|
||||
|
||||
except Exception, err:
|
||||
self.log.debug("GetFile read error: %s" % Debug.formatException(err))
|
||||
self.response({"error": "File read error"})
|
||||
return False
|
||||
return self.handleGetFile(params, streaming=True)
|
||||
|
||||
# Peer exchange request
|
||||
def actionPex(self, params):
|
||||
|
|
Loading…
Reference in a new issue