Refactor worker, fix concurrent write errors

This commit is contained in:
shortcutme 2019-12-17 14:42:33 +01:00
parent 0839fdfc5e
commit 4c31aae97b
No known key found for this signature in database
GPG key ID: 5B63BAE6CB9613AE
2 changed files with 177 additions and 116 deletions

View file

@ -1,9 +1,23 @@
import time import time
import gevent import gevent
import gevent.lock
from Debug import Debug from Debug import Debug
from Config import config from Config import config
from Content.ContentManager import VerifyError
class WorkerDownloadError(Exception):
pass
class WorkerIOError(Exception):
pass
class WorkerStop(Exception):
pass
class Worker(object): class Worker(object):
@ -24,134 +38,181 @@ class Worker(object):
def __repr__(self): def __repr__(self):
return "<%s>" % self.__str__() return "<%s>" % self.__str__()
# Downloader thread def waitForTask(self, task, timeout): # Wait for other workers to finish the task
for sleep_i in range(1, timeout * 10):
time.sleep(0.1)
if task["done"] or task["workers_num"] == 0:
if config.verbose:
self.manager.log.debug("%s: %s, picked task free after %ss sleep. (done: %s)" % (
self.key, task["inner_path"], 0.1 * sleep_i, task["done"]
))
break
if sleep_i % 10 == 0:
workers = self.manager.findWorkers(task)
if not workers or not workers[0].peer.connection:
break
worker_idle = time.time() - workers[0].peer.connection.last_recv_time
if worker_idle > 1:
if config.verbose:
self.manager.log.debug("%s: %s, worker %s seems idle, picked up task after %ss sleep. (done: %s)" % (
self.key, task["inner_path"], workers[0].key, 0.1 * sleep_i, task["done"]
))
break
return True
def pickTask(self): # Find and select a new task for the worker
task = self.manager.getTask(self.peer)
if not task: # No more task
time.sleep(0.1) # Wait a bit for new tasks
task = self.manager.getTask(self.peer)
if not task: # Still no task, stop it
stats = "downloaded files: %s, failed: %s" % (self.num_downloaded, self.num_failed)
self.manager.log.debug("%s: No task found, stopping (%s)" % (self.key, stats))
return False
if not task["time_started"]:
task["time_started"] = time.time() # Task started now
if task["workers_num"] > 0: # Wait a bit if someone already working on it
if task["peers"]: # It's an update
timeout = 3
else:
timeout = 1
if task["size"] > 100 * 1024 * 1024:
timeout = timeout * 2
if config.verbose:
self.manager.log.debug("%s: Someone already working on %s (pri: %s), sleeping %s sec..." % (
self.key, task["inner_path"], task["priority"], timeout
))
self.waitForTask(task, timeout)
return task
def downloadTask(self, task):
try:
buff = self.peer.getFile(task["site"].address, task["inner_path"], task["size"])
except Exception as err:
self.manager.log.debug("%s: getFile error: %s" % (self.key, err))
raise WorkerDownloadError(str(err))
if not buff:
raise WorkerDownloadError("No response")
return buff
def getTaskLock(self, task):
if task["lock"] is None:
task["lock"] = gevent.lock.Semaphore()
return task["lock"]
def writeTask(self, task, buff):
buff.seek(0)
try:
task["site"].storage.write(task["inner_path"], buff)
except Exception as err:
if type(err) == Debug.Notify:
self.manager.log.debug("%s: Write aborted: %s (%s)" % (self.key, task["inner_path"], err))
else:
self.manager.log.error("%s: Error writing: %s (%s)" % (self.key, task["inner_path"], err))
raise WorkerIOError(str(err))
def onTaskVerifyFail(self, task, error_message):
self.num_failed += 1
if self.manager.started_task_num < 50 or config.verbose:
self.manager.log.debug(
"%s: Verify failed: %s, error: %s, failed peers: %s, workers: %s" %
(self.key, task["inner_path"], error_message, len(task["failed"]), task["workers_num"])
)
task["failed"].append(self.peer)
self.peer.hash_failed += 1
if self.peer.hash_failed >= max(len(self.manager.tasks), 3) or self.peer.connection_error > 10:
# Broken peer: More fails than tasks number but atleast 3
raise WorkerStop(
"Too many errors (hash failed: %s, connection error: %s)" %
(self.peer.hash_failed, self.peer.connection_error)
)
def handleTask(self, task):
download_err = write_err = False
write_lock = None
try:
buff = self.downloadTask(task)
if task["done"] is True: # Task done, try to find new one
return None
if self.running is False: # Worker no longer needed or got killed
self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"]))
raise WorkerStop("Running got disabled")
write_lock = self.getTaskLock(task)
write_lock.acquire()
if task["site"].content_manager.verifyFile(task["inner_path"], buff) is None:
is_same = True
else:
is_same = False
is_valid = True
except (WorkerDownloadError, VerifyError) as err:
download_err = err
is_valid = False
is_same = False
if is_valid and not is_same:
if self.manager.started_task_num < 50 or config.verbose:
self.manager.log.debug("%s: Verify correct: %s" % (self.key, task["inner_path"]))
try:
self.writeTask(task, buff)
except WorkerIOError as err:
write_err = err
if not task["done"]:
if write_err:
self.manager.failTask(task)
self.num_failed += 1
self.manager.log.error("%s: Error writing %s: %s" % (self.key, task["inner_path"], write_err))
elif is_valid:
self.manager.doneTask(task)
self.num_downloaded += 1
if write_lock is not None and write_lock.locked():
write_lock.release()
if not is_valid:
self.onTaskVerifyFail(task, download_err)
time.sleep(1)
return False
return True
def downloader(self): def downloader(self):
self.peer.hash_failed = 0 # Reset hash error counter self.peer.hash_failed = 0 # Reset hash error counter
while self.running: while self.running:
# Try to pickup free file download task # Try to pickup free file download task
task = self.manager.getTask(self.peer) task = self.pickTask()
if not task: # No more task
time.sleep(0.1) # Wait a bit for new tasks
task = self.manager.getTask(self.peer)
if not task: # Still no task, stop it
stats = "downloaded files: %s, failed: %s" % (self.num_downloaded, self.num_failed)
self.manager.log.debug("%s: No task found, stopping (%s)" % (self.key, stats))
break
if not task["time_started"]:
task["time_started"] = time.time() # Task started now
if task["workers_num"] > 0: # Wait a bit if someone already working on it if not task:
if task["peers"]: # It's an update break
timeout = 3
else:
timeout = 1
if task["size"] > 100 * 1024 * 1024:
timeout = timeout * 2
if config.verbose:
self.manager.log.debug("%s: Someone already working on %s (pri: %s), sleeping %s sec..." % (
self.key, task["inner_path"], task["priority"], timeout
))
for sleep_i in range(1, timeout * 10):
time.sleep(0.1)
if task["done"] or task["workers_num"] == 0:
if config.verbose:
self.manager.log.debug("%s: %s, picked task free after %ss sleep. (done: %s)" % (
self.key, task["inner_path"], 0.1 * sleep_i, task["done"]
))
break
if sleep_i % 10 == 0:
workers = self.manager.findWorkers(task)
if not workers or not workers[0].peer.connection:
break
worker_idle = time.time() - workers[0].peer.connection.last_recv_time
if worker_idle > 1:
if config.verbose:
self.manager.log.debug("%s: %s, worker %s seems idle, picked up task after %ss sleep. (done: %s)" % (
self.key, task["inner_path"], workers[0].key, 0.1 * sleep_i, task["done"]
))
break
if task["done"]: if task["done"]:
continue continue
self.task = task self.task = task
site = task["site"]
self.manager.addTaskWorker(task, self) self.manager.addTaskWorker(task, self)
error_message = "Unknown error"
try: try:
buff = self.peer.getFile(site.address, task["inner_path"], task["size"]) success = self.handleTask(task)
except Exception as err: except WorkerStop as err:
self.manager.log.debug("%s: getFile error: %s" % (self.key, err)) self.manager.log.debug("%s: Worker stopped: %s" % (self.key, err))
error_message = str(err) self.manager.removeTaskWorker(task, self)
buff = None
if self.running is False: # Worker no longer needed or got killed
self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"]))
break break
if task["done"] is True: # Task done, try to find new one
continue
if buff: # Download ok
try:
correct = site.content_manager.verifyFile(task["inner_path"], buff)
except Exception as err:
error_message = str(err)
correct = False
else: # Download error
error_message = "Download failed"
correct = False
if correct is True or correct is None: # Verify ok or same file
if self.manager.started_task_num < 50 or config.verbose:
self.manager.log.debug("%s: Verify correct: %s" % (self.key, task["inner_path"]))
write_error = None
task_finished = False
if correct is True and task["locked"] is False: # Save if changed and task not done yet
task["locked"] = True
buff.seek(0)
try:
site.storage.write(task["inner_path"], buff)
write_error = False
except Exception as err:
if type(err) == Debug.Notify:
self.manager.log.debug("%s: Write aborted: %s (%s)" % (self.key, task["inner_path"], err))
else:
self.manager.log.error("%s: Error writing: %s (%s)" % (self.key, task["inner_path"], err))
write_error = err
task_finished = True
if correct is None and task["locked"] is False: # Mark as done if same file self.manager.removeTaskWorker(task, self)
task["locked"] = True
task_finished = True
if task_finished and not task["done"]:
if write_error:
self.manager.failTask(task)
self.num_failed += 1
else:
self.manager.doneTask(task)
self.num_downloaded += 1
self.manager.removeTaskWorker(task, self)
else: # Verify failed
self.num_failed += 1
self.manager.removeTaskWorker(task, self)
if self.manager.started_task_num < 50 or config.verbose:
self.manager.log.debug(
"%s: Verify failed: %s, error: %s, failed peers: %s, workers: %s" %
(self.key, task["inner_path"], error_message, len(task["failed"]), task["workers_num"])
)
task["failed"].append(self.peer)
self.peer.hash_failed += 1
if self.peer.hash_failed >= max(len(self.manager.tasks), 3) or self.peer.connection_error > 10:
# Broken peer: More fails than tasks number but atleast 3
break
if task["inner_path"] not in site.bad_files:
# Don't need this file anymore
break
time.sleep(1)
self.peer.onWorkerDone() self.peer.onWorkerDone()
self.running = False self.running = False
self.manager.removeWorker(self) self.manager.removeWorker(self)
@ -175,4 +236,4 @@ class Worker(object):
if self.thread: if self.thread:
self.thread.kill(exception=Debug.Notify("Worker stopped")) self.thread.kill(exception=Debug.Notify("Worker stopped"))
del self.thread del self.thread
self.manager.removeWorker(self) self.manager.removeWorker(self)

View file

@ -21,7 +21,7 @@ class WorkerManager(object):
self.tasks = WorkerTaskManager() self.tasks = WorkerTaskManager()
self.next_task_id = 1 self.next_task_id = 1
# {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, # {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
# "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "locked": False} # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "lock": None or gevent.lock.RLock}
self.started_task_num = 0 # Last added task num self.started_task_num = 0 # Last added task num
self.asked_peers = [] self.asked_peers = []
self.running = True self.running = True
@ -500,7 +500,7 @@ class WorkerManager(object):
task = { task = {
"id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
"optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "locked": False, "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None,
"time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
} }
@ -575,4 +575,4 @@ class WorkerManager(object):
self.site.onFileFail(task["inner_path"]) self.site.onFileFail(task["inner_path"])
task["evt"].set(False) task["evt"].set(False)
if not self.tasks: if not self.tasks:
self.started_task_num = 0 self.started_task_num = 0