Lock task adding to avoid race condition when getFileInfo switches
This commit is contained in:
parent
c08d266822
commit
c0639fef75
1 changed files with 4 additions and 3 deletions
|
@ -20,6 +20,7 @@ class WorkerManager(object):
|
|||
self.workers = {} # Key: ip:port, Value: Worker.Worker
|
||||
self.tasks = WorkerTaskManager()
|
||||
self.next_task_id = 1
|
||||
self.lock_add_task = gevent.lock.Semaphore(1)
|
||||
# {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
|
||||
# "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "lock": None or gevent.lock.RLock}
|
||||
self.started_task_num = 0 # Last added task num
|
||||
|
@ -198,7 +199,6 @@ class WorkerManager(object):
|
|||
if type(peers) is set:
|
||||
peers = list(peers)
|
||||
|
||||
|
||||
# Sort by ping
|
||||
peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999)
|
||||
|
||||
|
@ -463,6 +463,7 @@ class WorkerManager(object):
|
|||
|
||||
# Create new task and return asyncresult
|
||||
def addTask(self, inner_path, peer=None, priority=0, file_info=None):
|
||||
self.lock_add_task.acquire()
|
||||
self.site.onFileStart(inner_path) # First task, trigger site download started
|
||||
task = self.tasks.findTask(inner_path)
|
||||
if task: # Already has task for that file
|
||||
|
@ -476,7 +477,6 @@ class WorkerManager(object):
|
|||
task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
|
||||
self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
|
||||
self.startWorkers([peer], reason="Added new task (peer failed before)")
|
||||
return task
|
||||
else: # No task for that file yet
|
||||
evt = gevent.event.AsyncResult()
|
||||
if peer:
|
||||
|
@ -526,6 +526,7 @@ class WorkerManager(object):
|
|||
|
||||
else:
|
||||
self.startWorkers(peers, reason="Added new task")
|
||||
self.lock_add_task.release()
|
||||
return task
|
||||
|
||||
def addTaskWorker(self, task, worker):
|
||||
|
|
Loading…
Reference in a new issue