Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/test_objstorage_seaweedfs.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from itertools import dropwhile, islice | |||||
import json | |||||
import os | |||||
import unittest | import unittest | ||||
from urllib.parse import urlparse | |||||
from swh.objstorage.backends.seaweed import DEFAULT_LIMIT, WeedObjStorage | from requests.utils import get_encoding_from_headers | ||||
import requests_mock | |||||
from requests_mock.contrib import fixture | |||||
from swh.objstorage.backends.seaweed import WeedObjStorage | |||||
from swh.objstorage.exc import Error | from swh.objstorage.exc import Error | ||||
from swh.objstorage.objstorage import decompressors | from swh.objstorage.objstorage import decompressors | ||||
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | ||||
class MockWeedFiler: | class FilerRequestsMock: | ||||
""" WeedFiler mock that replicates its API """ | """This is a requests_mock based mock for the seaweedfs Filer API | ||||
def __init__(self, url): | It does not implement the whole API, only the parts required to make the | ||||
self.url = url | WeedFiler (used by WeedObjStorage) work. | ||||
self.content = {} | |||||
def get(self, remote_path): | It stores the files in a dict. | ||||
return self.content[remote_path] | """ | ||||
def put(self, fp, remote_path): | MODE_DIR = 0o20000000771 | ||||
self.content[remote_path] = fp.read() | MODE_FILE = 0o660 | ||||
def __init__(self, baseurl): | |||||
self.baseurl = baseurl | |||||
self.basepath = urlparse(baseurl).path | |||||
self.content = {} | |||||
self.requests_mock = fixture.Fixture() | |||||
self.requests_mock.setUp() | |||||
self.requests_mock.register_uri( | |||||
requests_mock.GET, requests_mock.ANY, content=self.get_cb | |||||
) | |||||
self.requests_mock.register_uri( | |||||
requests_mock.POST, requests_mock.ANY, content=self.post_cb | |||||
) | |||||
self.requests_mock.register_uri( | |||||
requests_mock.HEAD, requests_mock.ANY, content=self.head_cb | |||||
) | |||||
self.requests_mock.register_uri( | |||||
requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb | |||||
) | |||||
def relpath(self, path): | |||||
if path.startswith(self.basepath): | |||||
return os.path.relpath(path, self.basepath) | |||||
def head_cb(self, request, context): | |||||
relpath = self.relpath(request.path) | |||||
if relpath == "." or relpath in self.content: | |||||
return b"Found" # ok, found it | |||||
context.status_code = 404 | |||||
return b"Not Found" | |||||
def get_cb(self, request, context): | |||||
if self.head_cb(request, context) == b"Not Found": | |||||
return | |||||
relpath = self.relpath(request.path) | |||||
if relpath == ".": | |||||
if "limit" in request.qs: | |||||
limit = int(request.qs["limit"][0]) | |||||
assert limit > 0 | |||||
else: | |||||
limit = None | |||||
def exists(self, remote_path): | items = sorted(self.content.items()) | ||||
return remote_path in self.content | if items and "lastfilename" in request.qs: | ||||
lastfilename = request.qs["lastfilename"][0] | |||||
if lastfilename: | |||||
# exclude all filenames up to lastfilename | |||||
items = dropwhile(lambda kv: kv[0] <= lastfilename, items) | |||||
if limit: | |||||
# +1 to easily detect if there are more | |||||
items = islice(items, limit + 1) | |||||
entries = [ | |||||
{"FullPath": os.path.join(request.path, fname), "Mode": self.MODE_FILE,} | |||||
for fname, obj in items | |||||
] | |||||
thereismore = False | |||||
if limit and len(entries) > limit: | |||||
entries = entries[:limit] | |||||
thereismore = True | |||||
def delete(self, remote_path): | if entries: | ||||
del self.content[remote_path] | lastfilename = entries[-1]["FullPath"].split("/")[-1] | ||||
else: | |||||
def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): | lastfilename = None | ||||
keys = sorted(self.content.keys()) | text = json.dumps( | ||||
if last_file_name is None: | { | ||||
idx = 0 | "Path": request.path, | ||||
"Limit": limit, | |||||
"LastFileName": lastfilename, | |||||
"ShouldDisplayLoadMore": thereismore, | |||||
"Entries": entries, | |||||
} | |||||
) | |||||
encoding = get_encoding_from_headers(request.headers) or "utf-8" | |||||
return text.encode(encoding) | |||||
else: | else: | ||||
idx = keys.index(last_file_name) + 1 | return self.content[relpath] | ||||
return {"Entries": [{"FullPath": x} for x in keys[idx : idx + limit]]} | |||||
def post_cb(self, request, context): | |||||
from requests_toolbelt.multipart import decoder | |||||
multipart_data = decoder.MultipartDecoder( | |||||
request.body, request.headers["content-type"] | |||||
) | |||||
part = multipart_data.parts[0] | |||||
self.content[self.relpath(request.path)] = part.content | |||||
def delete_cb(self, request, context): | |||||
del self.content[self.relpath(request.path)] | |||||
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
compression = "none" | compression = "none" | ||||
url = "http://127.0.0.1/test/" | |||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.url = "http://127.0.0.1/test" | |||||
self.storage = WeedObjStorage(url=self.url, compression=self.compression) | self.storage = WeedObjStorage(url=self.url, compression=self.compression) | ||||
self.storage.wf = MockWeedFiler(self.url) | self.mock = FilerRequestsMock(baseurl=self.url) | ||||
def test_compression(self): | def test_compression(self): | ||||
content, obj_id = self.hash_content(b"test compression") | content, obj_id = self.hash_content(b"test compression") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
raw_content = self.storage.wf.get(self.storage._path(obj_id)) | raw_content = self.storage.wf.get(self.storage._path(obj_id)) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
assert d.decompress(raw_content) == content | assert d.decompress(raw_content) == content | ||||
assert d.unused_data == b"" | assert d.unused_data == b"" | ||||
def test_trailing_data_on_stored_blob(self): | def test_trailing_data_on_stored_blob(self): | ||||
content, obj_id = self.hash_content(b"test content without garbage") | content, obj_id = self.hash_content(b"test content without garbage") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
path = self.storage._path(obj_id) | self.mock.content[obj_id.hex()] += b"trailing garbage" | ||||
self.storage.wf.content[path] += b"trailing garbage" | |||||
if self.compression == "none": | if self.compression == "none": | ||||
with self.assertRaises(Error) as e: | with self.assertRaises(Error) as e: | ||||
self.storage.check(obj_id) | self.storage.check(obj_id) | ||||
else: | else: | ||||
with self.assertRaises(Error) as e: | with self.assertRaises(Error) as e: | ||||
self.storage.get(obj_id) | self.storage.get(obj_id) | ||||
assert "trailing data" in e.exception.args[0] | assert "trailing data" in e.exception.args[0] | ||||
class TestWeedObjStorageBz2(TestWeedObjStorage): | class TestWeedObjStorageWithCompression(TestWeedObjStorage): | ||||
compression = "bz2" | compression = "lzma" | ||||
class TestWeedObjStorageGzip(TestWeedObjStorage): | |||||
compression = "gzip" | |||||
class TestWeedObjStorageLzma(TestWeedObjStorage): | class TestWeedObjStorageWithSmallBatch(TestWeedObjStorage): | ||||
compression = "lzma" | def setUp(self): | ||||
super().setUp() | |||||
self.storage.wf.batchsize = 1 | |||||
class TestWeedObjStorageZlib(TestWeedObjStorage): | class TestWeedObjStorageWithNoPath(TestWeedObjStorage): | ||||
compression = "zlib" | url = "http://127.0.0.1/" |