BigFile lock seek and read until previous request finished, fix seek relative to bigfile end
This commit is contained in:
parent
f5ab2f63c0
commit
879eb6295d
2 changed files with 42 additions and 35 deletions
|
@ -3,10 +3,11 @@ import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import shutil
|
import shutil
|
||||||
import collections
|
import collections
|
||||||
import gevent
|
|
||||||
import math
|
import math
|
||||||
|
|
||||||
import msgpack
|
import msgpack
|
||||||
|
import gevent
|
||||||
|
import gevent.lock
|
||||||
|
|
||||||
from Plugin import PluginManager
|
from Plugin import PluginManager
|
||||||
from Debug import Debug
|
from Debug import Debug
|
||||||
|
@ -475,52 +476,58 @@ class BigFile(object):
|
||||||
|
|
||||||
self.piecefield = self.site.storage.piecefields[self.sha512]
|
self.piecefield = self.site.storage.piecefields[self.sha512]
|
||||||
self.f = open(file_path, "rb+")
|
self.f = open(file_path, "rb+")
|
||||||
|
self.read_lock = gevent.lock.Semaphore()
|
||||||
|
|
||||||
def read(self, buff=64 * 1024):
|
def read(self, buff=64 * 1024):
|
||||||
pos = self.f.tell()
|
with self.read_lock:
|
||||||
read_until = min(self.size, pos + buff)
|
pos = self.f.tell()
|
||||||
requests = []
|
read_until = min(self.size, pos + buff)
|
||||||
# Request all required blocks
|
requests = []
|
||||||
while 1:
|
# Request all required blocks
|
||||||
piece_i = pos / self.piece_size
|
|
||||||
if piece_i * self.piece_size >= read_until:
|
|
||||||
break
|
|
||||||
pos_from = piece_i * self.piece_size
|
|
||||||
pos_to = pos_from + self.piece_size
|
|
||||||
if not self.piecefield[piece_i]:
|
|
||||||
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
|
|
||||||
pos += self.piece_size
|
|
||||||
|
|
||||||
if not all(requests):
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Request prebuffer
|
|
||||||
if self.prebuffer:
|
|
||||||
prebuffer_until = min(self.size, read_until + self.prebuffer)
|
|
||||||
priority = 3
|
|
||||||
while 1:
|
while 1:
|
||||||
piece_i = pos / self.piece_size
|
piece_i = pos / self.piece_size
|
||||||
if piece_i * self.piece_size >= prebuffer_until:
|
if piece_i * self.piece_size >= read_until:
|
||||||
break
|
break
|
||||||
pos_from = piece_i * self.piece_size
|
pos_from = piece_i * self.piece_size
|
||||||
pos_to = pos_from + self.piece_size
|
pos_to = pos_from + self.piece_size
|
||||||
if not self.piecefield[piece_i]:
|
if not self.piecefield[piece_i]:
|
||||||
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
|
requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
|
||||||
priority -= 1
|
|
||||||
pos += self.piece_size
|
pos += self.piece_size
|
||||||
|
|
||||||
gevent.joinall(requests)
|
if not all(requests):
|
||||||
self.read_bytes += buff
|
return None
|
||||||
|
|
||||||
# Increase buffer for long reads
|
# Request prebuffer
|
||||||
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
|
if self.prebuffer:
|
||||||
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
|
prebuffer_until = min(self.size, read_until + self.prebuffer)
|
||||||
self.prebuffer = 5 * 1024 * 1024
|
priority = 3
|
||||||
|
while 1:
|
||||||
|
piece_i = pos / self.piece_size
|
||||||
|
if piece_i * self.piece_size >= prebuffer_until:
|
||||||
|
break
|
||||||
|
pos_from = piece_i * self.piece_size
|
||||||
|
pos_to = pos_from + self.piece_size
|
||||||
|
if not self.piecefield[piece_i]:
|
||||||
|
self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
|
||||||
|
priority -= 1
|
||||||
|
pos += self.piece_size
|
||||||
|
|
||||||
return self.f.read(buff)
|
gevent.joinall(requests)
|
||||||
|
self.read_bytes += buff
|
||||||
|
|
||||||
def seek(self, pos):
|
# Increase buffer for long reads
|
||||||
return self.f.seek(pos)
|
if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
|
||||||
|
self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
|
||||||
|
self.prebuffer = 5 * 1024 * 1024
|
||||||
|
|
||||||
|
return self.f.read(buff)
|
||||||
|
|
||||||
|
def seek(self, pos, whence=0):
|
||||||
|
with self.read_lock:
|
||||||
|
if whence == 2: # Relative from file end
|
||||||
|
pos = self.size + pos # Use the real size instead of size on the disk
|
||||||
|
whence = 0
|
||||||
|
return self.f.seek(pos, whence)
|
||||||
|
|
||||||
def tell(self):
|
def tell(self):
|
||||||
self.f.tell()
|
self.f.tell()
|
||||||
|
|
|
@ -10,7 +10,7 @@ class Config(object):
|
||||||
|
|
||||||
def __init__(self, argv):
|
def __init__(self, argv):
|
||||||
self.version = "0.6.3"
|
self.version = "0.6.3"
|
||||||
self.rev = 3546
|
self.rev = 3549
|
||||||
self.argv = argv
|
self.argv = argv
|
||||||
self.action = None
|
self.action = None
|
||||||
self.pending_changes = {}
|
self.pending_changes = {}
|
||||||
|
|
Loading…
Reference in a new issue