WorkerManager quickfix

This commit is contained in:
shortcutme 2016-08-15 15:59:49 +02:00
parent 686a2de570
commit e9009c5c75
2 changed files with 16 additions and 8 deletions

View file

@ -8,7 +8,7 @@ class Config(object):
def __init__(self, argv): def __init__(self, argv):
self.version = "0.4.0" self.version = "0.4.0"
self.rev = 1411 self.rev = 1413
self.argv = argv self.argv = argv
self.action = None self.action = None
self.config_file = "zeronet.conf" self.config_file = "zeronet.conf"

View file

@ -6,6 +6,7 @@ import collections
import gevent import gevent
from Worker import Worker from Worker import Worker
from Config import config
from util import helper from util import helper
import util import util
@ -46,8 +47,8 @@ class WorkerManager:
tasks = self.tasks[:] # Copy it so removing elements wont cause any problem tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
for task in tasks: for task in tasks:
size_extra_time = task["size"] / (1024 * 100) # 1 second for every 100k size_extra_time = task["size"] / (1024 * 100) # 1 second for every 100k
if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time: # Task taking too long time, skip it if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time:
self.log.debug("Timeout, Skipping: %s" % task) self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
# Skip to next file workers # Skip to next file workers
workers = self.findWorkers(task) workers = self.findWorkers(task)
if workers: if workers:
@ -81,7 +82,9 @@ class WorkerManager:
# Returns the next free or less worked task # Returns the next free or less worked task
def getTask(self, peer): def getTask(self, peer):
self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 5, reverse=True) # Sort tasks by priority and worker numbers # Sort tasks by priority and worker numbers
self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 5, reverse=True)
for task in self.tasks: # Find a task for task in self.tasks: # Find a task
if task["peers"] and peer not in task["peers"]: if task["peers"] and peer not in task["peers"]:
continue # This peer not allowed to pick this task continue # This peer not allowed to pick this task
@ -233,7 +236,9 @@ class WorkerManager:
gevent.joinall(threads, timeout=5) gevent.joinall(threads, timeout=5)
found = self.findOptionalTasks(optional_tasks) found = self.findOptionalTasks(optional_tasks)
self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (len(found), len(optional_hash_ids))) self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
len(found), len(optional_hash_ids)
))
if found: if found:
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers]) found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
@ -259,7 +264,9 @@ class WorkerManager:
found_ips = helper.mergeDicts(thread_values) found_ips = helper.mergeDicts(thread_values)
found = self.addOptionalPeers(found_ips) found = self.addOptionalPeers(found_ips)
self.log.debug("Found optional files after findhash connected peers: %s/%s" % (len(found), len(optional_hash_ids))) self.log.debug("Found optional files after findhash connected peers: %s/%s" % (
len(found), len(optional_hash_ids)
))
if found: if found:
found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers]) found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
@ -368,8 +375,9 @@ class WorkerManager:
size = 0 size = 0
priority += self.getPriorityBoost(inner_path) priority += self.getPriorityBoost(inner_path)
task = { task = {
"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": optional_hash_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
"time_added": time.time(), "time_started": None, "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None,
"time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
} }
self.tasks.append(task) self.tasks.append(task)