diff --git a/src/Worker/WorkerManager.py b/src/Worker/WorkerManager.py index 252d34ca..2a71b88e 100644 --- a/src/Worker/WorkerManager.py +++ b/src/Worker/WorkerManager.py @@ -20,6 +20,7 @@ class WorkerManager(object): self.workers = {} # Key: ip:port, Value: Worker.Worker self.tasks = WorkerTaskManager() self.next_task_id = 1 + self.lock_add_task = gevent.lock.Semaphore(1) # {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "lock": None or gevent.lock.RLock} self.started_task_num = 0 # Last added task num @@ -198,7 +199,6 @@ class WorkerManager(object): if type(peers) is set: peers = list(peers) - # Sort by ping peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999) @@ -463,6 +463,7 @@ class WorkerManager(object): # Create new task and return asyncresult def addTask(self, inner_path, peer=None, priority=0, file_info=None): + self.lock_add_task.acquire() self.site.onFileStart(inner_path) # First task, trigger site download started task = self.tasks.findTask(inner_path) if task: # Already has task for that file @@ -476,7 +477,6 @@ class WorkerManager(object): task["failed"].remove(peer) # New update arrived, remove the peer from failed peers self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"])) self.startWorkers([peer], reason="Added new task (peer failed before)") - return task else: # No task for that file yet evt = gevent.event.AsyncResult() if peer: @@ -526,7 +526,8 @@ class WorkerManager(object): else: self.startWorkers(peers, reason="Added new task") - return task + self.lock_add_task.release() + return task def addTaskWorker(self, task, worker): if task in self.tasks: