From f4cdc3178894e0fd7b774aae9eea6d47e64d15f8 Mon Sep 17 00:00:00 2001 From: shortcutme Date: Tue, 3 Oct 2017 14:27:29 +0200 Subject: [PATCH] Lock socket during sending data --- plugins/FilePack/FilePackPlugin.py- | 113 ++++++++++++++++++++++++++++ src/Connection/Connection.py | 22 ++++-- 2 files changed, 128 insertions(+), 7 deletions(-) create mode 100644 plugins/FilePack/FilePackPlugin.py- diff --git a/plugins/FilePack/FilePackPlugin.py- b/plugins/FilePack/FilePackPlugin.py- new file mode 100644 index 00000000..8bcbeea2 --- /dev/null +++ b/plugins/FilePack/FilePackPlugin.py- @@ -0,0 +1,113 @@ +import os +import re +import itertools + +from Plugin import PluginManager +from Config import config +from util import helper + + +# Keep archive open for faster reponse times for large sites +archive_cache = {} + + +def closeArchive(archive_path): + if archive_path in archive_cache: + del archive_cache[archive_path] + + +def openArchive(archive_path): + if archive_path not in archive_cache: + if archive_path.endswith("tar.gz"): + import tarfile + archive_cache[archive_path] = tarfile.open(archive_path, "r:gz") + elif archive_path.endswith("tar.bz2"): + import tarfile + archive_cache[archive_path] = tarfile.open(archive_path, "r:bz2") + else: + import zipfile + archive_cache[archive_path] = zipfile.ZipFile(archive_path) + helper.timer(5, lambda: closeArchive(archive_path)) # Close after 5 sec + + archive = archive_cache[archive_path] + return archive + +def openArchiveFile(archive_path, path_within): + archive = openArchive(archive_path) + if archive_path.endswith(".zip"): + return archive.open(path_within) + else: + return archive.extractfile(path_within.encode("utf8")) + + +@PluginManager.registerTo("UiRequest") +class UiRequestPlugin(object): + def actionSiteMedia(self, path, **kwargs): + if ".zip/" in path or ".tar.gz/" in path: + path_parts = self.parsePath(path) + file_path = u"%s/%s/%s" % (config.data_dir, path_parts["address"], path_parts["inner_path"].decode("utf8")) + match = re.match("^(.*\.(?:tar.gz|tar.bz2|zip))/(.*)", file_path) + archive_path, path_within = match.groups() + if not os.path.isfile(archive_path): + site = self.server.site_manager.get(path_parts["address"]) + if not site: + self.error404(path) + # Wait until file downloads + result = site.needFile(site.storage.getInnerPath(archive_path), priority=10) + # Send virutal file path download finished event to remove loading screen + site.updateWebsocket(file_done=site.storage.getInnerPath(file_path)) + if not result: + return self.error404(path) + try: + file = openArchiveFile(archive_path, path_within) + content_type = self.getContentType(file_path) + self.sendHeader(200, content_type=content_type, noscript=kwargs.get("header_noscript", False)) + return self.streamFile(file) + except Exception, err: + self.log.debug("Error opening archive file: %s" % err) + return self.error404(path) + + return super(UiRequestPlugin, self).actionSiteMedia(path, **kwargs) + + def streamFile(self, file): + while 1: + try: + block = file.read(60 * 1024) + if block: + yield block + else: + raise StopIteration + except StopIteration: + file.close() + break + + +@PluginManager.registerTo("SiteStorage") +class SiteStoragePlugin(object): + def isFile(self, inner_path): + if ".zip/" in inner_path or ".tar.gz/" in inner_path or ".tar.bz2/" in inner_path: + match = re.match("^(.*\.(?:tar.gz|tar.bz2|zip))/(.*)", inner_path) + inner_archive_path, path_within = match.groups() + return super(SiteStoragePlugin, self).isFile(inner_archive_path) + else: + return super(SiteStoragePlugin, self).isFile(inner_path) + + def getDbFiles(self): + for item in super(SiteStoragePlugin, self).getDbFiles(): + yield item + + # Search for archive files + for content_inner_path in self.site.content_manager.listContents(): + content = self.site.content_manager.contents[content_inner_path] + if not content: + merged_site.log.error("[MISSING] %s" % content_inner_path) + continue + + file_relative_paths = itertools.chain( + content.get("files", {}).iteritems(), + content.get("files_optional", {}).iteritems() + ) + + for file_relative_path, node in file_relative_paths: + if "zeronet-archive" in file_relative_path: + print node diff --git a/src/Connection/Connection.py b/src/Connection/Connection.py index 2cfdb27d..57ccebec 100644 --- a/src/Connection/Connection.py +++ b/src/Connection/Connection.py @@ -4,6 +4,10 @@ import time import gevent import msgpack import msgpack.fallback +try: + from gevent.coros import RLock +except: + from gevent.lock import RLock from Config import config from Debug import Debug @@ -15,7 +19,7 @@ class Connection(object): __slots__ = ( "sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "req_id", "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "last_recv_time", - "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", + "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock", "last_ping_delay", "last_req_time", "last_cmd", "bad_actions", "sites", "name", "updateName", "waiting_requests", "waiting_streams" ) @@ -58,6 +62,7 @@ class Connection(object): self.bad_actions = 0 self.sites = 0 self.cpu_time = 0.0 + self.send_lock = RLock() self.name = None self.updateName() @@ -351,6 +356,7 @@ class Connection(object): # Send data to connection def send(self, message, streaming=False): + self.last_send_time = time.time() if config.debug_socket: self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % ( message.get("cmd"), message.get("to"), streaming, @@ -362,10 +368,10 @@ class Connection(object): self.log("Send error: missing socket") return False - self.last_send_time = time.time() try: if streaming: - bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) + with self.send_lock: + bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall) message = None self.bytes_sent += bytes_sent self.server.bytes_sent += bytes_sent @@ -374,7 +380,8 @@ class Connection(object): message = None self.bytes_sent += len(data) self.server.bytes_sent += len(data) - self.sock.sendall(data) + with self.send_lock: + self.sock.sendall(data) except Exception, err: self.close("Send error: %s" % err) return False @@ -387,9 +394,10 @@ class Connection(object): bytes_left = read_bytes while True: self.last_send_time = time.time() - self.sock.sendall( - file.read(min(bytes_left, buff)) - ) + with self.send_lock: + self.sock.sendall( + file.read(min(bytes_left, buff)) + ) bytes_left -= buff if bytes_left <= 0: break