Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/test_objstorage_seaweedfs.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from itertools import dropwhile, islice | |||||
import json | |||||
import os | |||||
import unittest | import unittest | ||||
from swh.objstorage.backends.seaweed import DEFAULT_LIMIT, WeedObjStorage | from requests.utils import get_encoding_from_headers | ||||
import requests_mock | |||||
from requests_mock.contrib import fixture | |||||
from swh.objstorage.backends.pathslicing import PathSlicer | |||||
from swh.objstorage.backends.seaweed import WeedObjStorage | |||||
from swh.objstorage.exc import Error | from swh.objstorage.exc import Error | ||||
from swh.objstorage.objstorage import decompressors | from swh.objstorage.objstorage import decompressors | ||||
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | ||||
class MockWeedFiler: | class PathDict: | ||||
""" WeedFiler mock that replicates its API """ | """A dict-like object that handles "path-like" keys in a recursive dict | ||||
structure. | |||||
For example: | |||||
>>> a = PathDict() | |||||
>>> a['path/to/file'] = 'some file content' | |||||
will create a dict structure (in self.data) like: | |||||
>>> print(a.data) | |||||
{'path': {'to': {'file': 'some file content'}}} | |||||
>>> 'path/to/file' in a | |||||
True | |||||
This is a helper class for the FilerRequestsMock below. | |||||
""" | |||||
def __init__(self): | |||||
self.data = {} | |||||
def __setitem__(self, key, value): | |||||
if key.endswith("/"): | |||||
raise ValueError("Nope") | |||||
if key.startswith("/"): | |||||
key = key[1:] | |||||
path = key.split("/") | |||||
resu = self.data | |||||
for p in path[:-1]: | |||||
resu = resu.setdefault(p, {}) | |||||
resu[path[-1]] = value | |||||
def __getitem__(self, key): | |||||
assert isinstance(key, str) | |||||
if key.startswith("/"): | |||||
key = key[1:] | |||||
if key.endswith("/"): | |||||
key = key[:-1] | |||||
path = key.split("/") | |||||
resu = self.data | |||||
for p in path: | |||||
resu = resu[p] | |||||
return resu | |||||
def __delitem__(self, key): | |||||
if key.startswith("/"): | |||||
key = key[1:] | |||||
if key.endswith("/"): | |||||
key = key[:-1] | |||||
path = key.split("/") | |||||
resu = self.data | |||||
for p in path[:-1]: | |||||
resu = resu.setdefault(p, {}) | |||||
del resu[path[-1]] | |||||
def __contains__(self, key): | |||||
try: | |||||
self[key] | |||||
return True | |||||
except KeyError: | |||||
return False | |||||
def flat(self): | |||||
def go(d): | |||||
for k, v in d.items(): | |||||
if isinstance(v, dict): | |||||
yield from go(v) | |||||
else: | |||||
yield k | |||||
def __init__(self, url): | yield from go(self.data) | ||||
self.url = url | |||||
self.content = {} | |||||
def get(self, remote_path): | |||||
return self.content[remote_path] | |||||
def put(self, fp, remote_path): | class FilerRequestsMock: | ||||
self.content[remote_path] = fp.read() | """This is a requests_mock based mock for the seaweedfs Filer API | ||||
def exists(self, remote_path): | It does not implement the whole API, only the parts required to make the | ||||
return remote_path in self.content | WeedFiler (used by WeedObjStorage) work. | ||||
It stores the files in a dict-based structure, eg. the file | |||||
'0a/32/0a3245983255' will be stored in a dict like: | |||||
{'0a': {'32': {'0a3245983255': b'content'}}} | |||||
It uses the PathDict helper class to make it a bit easier to handle this | |||||
dict structure. | |||||
""" | |||||
MODE_DIR = 0o20000000771 | |||||
MODE_FILE = 0o660 | |||||
def __init__(self): | |||||
self.content = PathDict() | |||||
self.requests_mock = fixture.Fixture() | |||||
self.requests_mock.setUp() | |||||
self.requests_mock.register_uri( | |||||
requests_mock.GET, requests_mock.ANY, content=self.get_cb | |||||
) | |||||
self.requests_mock.register_uri( | |||||
requests_mock.POST, requests_mock.ANY, content=self.post_cb | |||||
) | |||||
self.requests_mock.register_uri( | |||||
requests_mock.HEAD, requests_mock.ANY, content=self.head_cb | |||||
) | |||||
self.requests_mock.register_uri( | |||||
requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb | |||||
) | |||||
def head_cb(self, request, context): | |||||
if request.path not in self.content: | |||||
context.status_code = 404 | |||||
def get_cb(self, request, context): | |||||
content = None | |||||
if request.path not in self.content: | |||||
context.status_code = 404 | |||||
else: | |||||
content = self.content[request.path] | |||||
if isinstance(content, dict): | |||||
if "limit" in request.qs: | |||||
limit = int(request.qs["limit"][0]) | |||||
assert limit > 0 | |||||
else: | |||||
limit = None | |||||
def delete(self, remote_path): | items = sorted(content.items()) | ||||
del self.content[remote_path] | if items and "lastfilename" in request.qs: | ||||
lastfilename = request.qs["lastfilename"][0] | |||||
# exclude all filenames up to lastfilename | |||||
items = dropwhile( | |||||
lambda kv: kv[0].split("/")[-1] <= lastfilename, items | |||||
) | |||||
if limit: | |||||
# +1 to easily detect if there are more | |||||
items = islice(items, limit + 1) | |||||
entries = [ | |||||
{ | |||||
"FullPath": os.path.join(request.path, fname), | |||||
"Mode": self.MODE_DIR | |||||
if isinstance(obj, dict) | |||||
else self.MODE_FILE, | |||||
} | |||||
for fname, obj in items | |||||
] | |||||
thereismore = False | |||||
if limit and len(entries) > limit: | |||||
entries = entries[:limit] | |||||
thereismore = True | |||||
def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): | if entries: | ||||
keys = sorted(self.content.keys()) | lastfilename = entries[-1]["FullPath"].split("/")[-1] | ||||
if last_file_name is None: | |||||
idx = 0 | |||||
else: | else: | ||||
idx = keys.index(last_file_name) + 1 | lastfilename = None | ||||
return {"Entries": [{"FullPath": x} for x in keys[idx : idx + limit]]} | text = json.dumps( | ||||
{ | |||||
"Path": request.path, | |||||
"Limit": limit, | |||||
"LastFileName": lastfilename, | |||||
"ShouldDisplayLoadMore": thereismore, | |||||
"Entries": entries, | |||||
} | |||||
) | |||||
encoding = get_encoding_from_headers(request.headers) or "utf-8" | |||||
content = text.encode(encoding) | |||||
return content | |||||
def post_cb(self, request, context): | |||||
from requests_toolbelt.multipart import decoder | |||||
multipart_data = decoder.MultipartDecoder( | |||||
request.body, request.headers["content-type"] | |||||
) | |||||
part = multipart_data.parts[0] | |||||
self.content[request.path] = part.content | |||||
def delete_cb(self, request, context): | |||||
del self.content[request.path] | |||||
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
compression = "none" | compression = "none" | ||||
slicing = "" | |||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.url = "http://127.0.0.1/test" | self.url = "http://127.0.0.1/test" | ||||
self.storage = WeedObjStorage(url=self.url, compression=self.compression) | self.storage = WeedObjStorage( | ||||
self.storage.wf = MockWeedFiler(self.url) | url=self.url, compression=self.compression, slicing=self.slicing | ||||
) | |||||
self.mock = FilerRequestsMock() | |||||
def test_compression(self): | def test_compression(self): | ||||
content, obj_id = self.hash_content(b"test compression") | content, obj_id = self.hash_content(b"test compression") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
raw_content = self.storage.wf.get(self.storage._path(obj_id)) | raw_content = self.storage.wf.get(self.storage._path(obj_id)) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
assert d.decompress(raw_content) == content | assert d.decompress(raw_content) == content | ||||
assert d.unused_data == b"" | assert d.unused_data == b"" | ||||
def test_trailing_data_on_stored_blob(self): | def test_trailing_data_on_stored_blob(self): | ||||
content, obj_id = self.hash_content(b"test content without garbage") | content, obj_id = self.hash_content(b"test content without garbage") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
path = self.storage._path(obj_id) | path = self.storage._path(obj_id) | ||||
self.storage.wf.content[path] += b"trailing garbage" | self.mock.content[path] += b"trailing garbage" | ||||
if self.compression == "none": | if self.compression == "none": | ||||
with self.assertRaises(Error) as e: | with self.assertRaises(Error) as e: | ||||
self.storage.check(obj_id) | self.storage.check(obj_id) | ||||
else: | else: | ||||
with self.assertRaises(Error) as e: | with self.assertRaises(Error) as e: | ||||
self.storage.get(obj_id) | self.storage.get(obj_id) | ||||
assert "trailing data" in e.exception.args[0] | assert "trailing data" in e.exception.args[0] | ||||
def test_slicing(self): | |||||
content, obj_id = self.hash_content(b"test compression") | |||||
self.storage.add(content, obj_id=obj_id) | |||||
slicer = PathSlicer("/test", self.slicing) | |||||
assert slicer.get_path(obj_id.hex()) in self.mock.content | |||||
class TestWeedObjStorageBz2(TestWeedObjStorage): | class TestWeedObjStorageBz2(TestWeedObjStorage): | ||||
compression = "bz2" | compression = "bz2" | ||||
class TestWeedObjStorageGzip(TestWeedObjStorage): | class TestWeedObjStorageGzip(TestWeedObjStorage): | ||||
compression = "gzip" | compression = "gzip" | ||||
class TestWeedObjStorageLzma(TestWeedObjStorage): | class TestWeedObjStorageLzma(TestWeedObjStorage): | ||||
compression = "lzma" | compression = "lzma" | ||||
class TestWeedObjStorageZlib(TestWeedObjStorage): | class TestWeedObjStorageZlib(TestWeedObjStorage): | ||||
compression = "zlib" | compression = "zlib" | ||||
class TestWeedObjStorageWithSlicing(TestWeedObjStorage): | |||||
slicing = "0:2/2:4" | |||||
class TestWeedObjStorageWithSlicingAndSmallBatch(TestWeedObjStorage): | |||||
slicing = "0:2/2:4" | |||||
def setUp(self): | |||||
super().setUp() | |||||
self.storage.wf.batchsize = 1 |