Test msgpack streaming with binary data

This commit is contained in:
shortcutme 2019-03-16 00:56:25 +01:00
parent d95da7372a
commit 48b6c81b36
No known key found for this signature in database
GPG key ID: 5B63BAE6CB9613AE

View file

@ -36,16 +36,25 @@ class TestMsgpack:
assert messages[0] == messages[1]
def testStreaming(self):
f = StreamingMsgpack.FilePart("%s/users.json" % config.data_dir)
f.read_bytes = 10
bin_data = os.urandom(20)
f = Msgpack.FilePart("%s/users.json" % config.data_dir, "rb")
f.read_bytes = 30
data = {"cmd": "response", "params": f}
data = {"cmd": "response", "body": f, "bin": bin_data}
out_buff = io.BytesIO()
Msgpack.stream(data, out_buff.write)
out_buff.seek(0)
data_packb = {"cmd": "response", "params": open("%s/users.json" % config.data_dir).read(10)}
data_packb = {
"cmd": "response",
"body": open("%s/users.json" % config.data_dir, "rb").read(30),
"bin": bin_data
}
out_buff.seek(0)
assert msgpack.unpackb(out_buff.read()) == data_packb
data_unpacked = Msgpack.unpack(out_buff.read())
assert data_unpacked == data_packb
assert data_unpacked["cmd"] == "response"
assert type(data_unpacked["body"]) == bytes