Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/tests/test_objstorage_seaweedfs.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from itertools import dropwhile, islice | from itertools import dropwhile, islice | ||||
import json | import json | ||||
import os | import os | ||||
import unittest | import unittest | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
from requests.utils import get_encoding_from_headers | from requests.utils import get_encoding_from_headers | ||||
import requests_mock | import requests_mock | ||||
from requests_mock.contrib import fixture | from requests_mock.contrib import fixture | ||||
from swh.objstorage.backends.pathslicing import PathSlicer | |||||
from swh.objstorage.backends.seaweed import WeedObjStorage | from swh.objstorage.backends.seaweed import WeedObjStorage | ||||
from swh.objstorage.exc import Error | from swh.objstorage.exc import Error | ||||
from swh.objstorage.objstorage import decompressors | from swh.objstorage.objstorage import decompressors | ||||
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture | ||||
class PathDict: | |||||
"""A dict-like object that handles "path-like" keys in a recursive dict | |||||
structure. | |||||
For example: | |||||
>>> a = PathDict() | |||||
>>> a['path/to/file'] = 'some file content' | |||||
will create a dict structure (in self.data) like: | |||||
>>> print(a.data) | |||||
{'path': {'to': {'file': 'some file content'}}} | |||||
>>> 'path/to/file' in a | |||||
True | |||||
This is a helper class for the FilerRequestsMock below. | |||||
""" | |||||
def __init__(self): | |||||
self.data = {} | |||||
def __setitem__(self, key, value): | |||||
if key.endswith("/"): | |||||
raise ValueError("Nope") | |||||
if key.startswith("/"): | |||||
key = key[1:] | |||||
path = key.split("/") | |||||
resu = self.data | |||||
for p in path[:-1]: | |||||
resu = resu.setdefault(p, {}) | |||||
resu[path[-1]] = value | |||||
def __getitem__(self, key): | |||||
assert isinstance(key, str) | |||||
if key == "/": | |||||
return self.data | |||||
if key.startswith("/"): | |||||
key = key[1:] | |||||
if key.endswith("/"): | |||||
key = key[:-1] | |||||
path = key.split("/") | |||||
resu = self.data | |||||
for p in path: | |||||
resu = resu[p] | |||||
return resu | |||||
def __delitem__(self, key): | |||||
if key.startswith("/"): | |||||
key = key[1:] | |||||
if key.endswith("/"): | |||||
key = key[:-1] | |||||
path = key.split("/") | |||||
resu = self.data | |||||
for p in path[:-1]: | |||||
resu = resu.setdefault(p, {}) | |||||
del resu[path[-1]] | |||||
def __contains__(self, key): | |||||
if key == "/": | |||||
# always consider we have the 'root' directory | |||||
return True | |||||
try: | |||||
self[key] | |||||
return True | |||||
except KeyError: | |||||
return False | |||||
def flat(self): | |||||
def go(d): | |||||
for k, v in d.items(): | |||||
if isinstance(v, dict): | |||||
yield from go(v) | |||||
else: | |||||
yield k | |||||
yield from go(self.data) | |||||
class FilerRequestsMock: | class FilerRequestsMock: | ||||
"""This is a requests_mock based mock for the seaweedfs Filer API | """This is a requests_mock based mock for the seaweedfs Filer API | ||||
It does not implement the whole API, only the parts required to make the | It does not implement the whole API, only the parts required to make the | ||||
WeedFiler (used by WeedObjStorage) work. | WeedFiler (used by WeedObjStorage) work. | ||||
It stores the files in a dict. | It stores the files in a dict-based structure, eg. the file | ||||
'0a/32/0a3245983255' will be stored in a dict like: | |||||
{'0a': {'32': {'0a3245983255': b'content'}}} | |||||
It uses the PathDict helper class to make it a bit easier to handle this | |||||
dict structure. | |||||
""" | """ | ||||
MODE_DIR = 0o20000000771 | MODE_DIR = 0o20000000771 | ||||
MODE_FILE = 0o660 | MODE_FILE = 0o660 | ||||
def __init__(self, baseurl): | def __init__(self): | ||||
self.baseurl = baseurl | self.content = PathDict() | ||||
self.basepath = urlparse(baseurl).path | |||||
self.content = {} | |||||
self.requests_mock = fixture.Fixture() | self.requests_mock = fixture.Fixture() | ||||
self.requests_mock.setUp() | self.requests_mock.setUp() | ||||
self.requests_mock.register_uri( | self.requests_mock.register_uri( | ||||
requests_mock.GET, requests_mock.ANY, content=self.get_cb | requests_mock.GET, requests_mock.ANY, content=self.get_cb | ||||
) | ) | ||||
self.requests_mock.register_uri( | self.requests_mock.register_uri( | ||||
requests_mock.POST, requests_mock.ANY, content=self.post_cb | requests_mock.POST, requests_mock.ANY, content=self.post_cb | ||||
) | ) | ||||
self.requests_mock.register_uri( | self.requests_mock.register_uri( | ||||
requests_mock.HEAD, requests_mock.ANY, content=self.head_cb | requests_mock.HEAD, requests_mock.ANY, content=self.head_cb | ||||
) | ) | ||||
self.requests_mock.register_uri( | self.requests_mock.register_uri( | ||||
requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb | requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb | ||||
) | ) | ||||
def relpath(self, path): | |||||
if path.startswith(self.basepath): | |||||
return os.path.relpath(path, self.basepath) | |||||
def head_cb(self, request, context): | def head_cb(self, request, context): | ||||
relpath = self.relpath(request.path) | if request.path not in self.content: | ||||
if relpath == "." or relpath in self.content: | |||||
return b"Found" # ok, found it | |||||
context.status_code = 404 | context.status_code = 404 | ||||
return b"Not Found" | |||||
def get_cb(self, request, context): | def get_cb(self, request, context): | ||||
if self.head_cb(request, context) == b"Not Found": | content = None | ||||
return | if request.path not in self.content: | ||||
relpath = self.relpath(request.path) | context.status_code = 404 | ||||
if relpath == ".": | else: | ||||
content = self.content[request.path] | |||||
if isinstance(content, dict): | |||||
if "limit" in request.qs: | if "limit" in request.qs: | ||||
limit = int(request.qs["limit"][0]) | limit = int(request.qs["limit"][0]) | ||||
assert limit > 0 | assert limit > 0 | ||||
else: | else: | ||||
limit = None | limit = None | ||||
items = sorted(self.content.items()) | items = sorted(content.items()) | ||||
if items and "lastfilename" in request.qs: | if items and "lastfilename" in request.qs: | ||||
lastfilename = request.qs["lastfilename"][0] | lastfilename = request.qs["lastfilename"][0] | ||||
if lastfilename: | |||||
# exclude all filenames up to lastfilename | # exclude all filenames up to lastfilename | ||||
items = dropwhile(lambda kv: kv[0] <= lastfilename, items) | items = dropwhile( | ||||
lambda kv: kv[0].split("/")[-1] <= lastfilename, items | |||||
) | |||||
if limit: | if limit: | ||||
# +1 to easily detect if there are more | # +1 to easily detect if there are more | ||||
items = islice(items, limit + 1) | items = islice(items, limit + 1) | ||||
entries = [ | entries = [ | ||||
{"FullPath": os.path.join(request.path, fname), "Mode": self.MODE_FILE,} | { | ||||
"FullPath": os.path.join(request.path, fname), | |||||
"Mode": self.MODE_DIR | |||||
if isinstance(obj, dict) | |||||
else self.MODE_FILE, | |||||
} | |||||
for fname, obj in items | for fname, obj in items | ||||
] | ] | ||||
thereismore = False | thereismore = False | ||||
if limit and len(entries) > limit: | if limit and len(entries) > limit: | ||||
entries = entries[:limit] | entries = entries[:limit] | ||||
thereismore = True | thereismore = True | ||||
if entries: | if entries: | ||||
lastfilename = entries[-1]["FullPath"].split("/")[-1] | lastfilename = entries[-1]["FullPath"].split("/")[-1] | ||||
else: | else: | ||||
lastfilename = None | lastfilename = None | ||||
text = json.dumps( | text = json.dumps( | ||||
{ | { | ||||
"Path": request.path, | "Path": request.path, | ||||
"Limit": limit, | "Limit": limit, | ||||
"LastFileName": lastfilename, | "LastFileName": lastfilename, | ||||
"ShouldDisplayLoadMore": thereismore, | "ShouldDisplayLoadMore": thereismore, | ||||
"Entries": entries, | "Entries": entries, | ||||
} | } | ||||
) | ) | ||||
encoding = get_encoding_from_headers(request.headers) or "utf-8" | encoding = get_encoding_from_headers(request.headers) or "utf-8" | ||||
return text.encode(encoding) | content = text.encode(encoding) | ||||
else: | return content | ||||
return self.content[relpath] | |||||
def post_cb(self, request, context): | def post_cb(self, request, context): | ||||
from requests_toolbelt.multipart import decoder | from requests_toolbelt.multipart import decoder | ||||
multipart_data = decoder.MultipartDecoder( | multipart_data = decoder.MultipartDecoder( | ||||
request.body, request.headers["content-type"] | request.body, request.headers["content-type"] | ||||
) | ) | ||||
part = multipart_data.parts[0] | part = multipart_data.parts[0] | ||||
self.content[self.relpath(request.path)] = part.content | self.content[request.path] = part.content | ||||
def delete_cb(self, request, context): | def delete_cb(self, request, context): | ||||
del self.content[self.relpath(request.path)] | del self.content[request.path] | ||||
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): | ||||
compression = "none" | compression = "none" | ||||
url = "http://127.0.0.1/test/" | url = "http://127.0.0.1/test/" | ||||
slicing = "" | |||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.storage = WeedObjStorage(url=self.url, compression=self.compression) | self.storage = WeedObjStorage( | ||||
self.mock = FilerRequestsMock(baseurl=self.url) | url=self.url, compression=self.compression, slicing=self.slicing | ||||
) | |||||
self.mock = FilerRequestsMock() | |||||
def test_compression(self): | def test_compression(self): | ||||
content, obj_id = self.hash_content(b"test compression") | content, obj_id = self.hash_content(b"test compression") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
raw_content = self.storage.wf.get(self.storage._path(obj_id)) | raw_content = self.storage.wf.get(self.storage._path(obj_id)) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
assert d.decompress(raw_content) == content | assert d.decompress(raw_content) == content | ||||
assert d.unused_data == b"" | assert d.unused_data == b"" | ||||
def test_trailing_data_on_stored_blob(self): | def test_trailing_data_on_stored_blob(self): | ||||
content, obj_id = self.hash_content(b"test content without garbage") | content, obj_id = self.hash_content(b"test content without garbage") | ||||
self.storage.add(content, obj_id=obj_id) | self.storage.add(content, obj_id=obj_id) | ||||
self.mock.content[obj_id.hex()] += b"trailing garbage" | path = self.storage._path(obj_id) | ||||
self.mock.content[path] += b"trailing garbage" | |||||
if self.compression == "none": | if self.compression == "none": | ||||
with self.assertRaises(Error) as e: | with self.assertRaises(Error) as e: | ||||
self.storage.check(obj_id) | self.storage.check(obj_id) | ||||
else: | else: | ||||
with self.assertRaises(Error) as e: | with self.assertRaises(Error) as e: | ||||
self.storage.get(obj_id) | self.storage.get(obj_id) | ||||
assert "trailing data" in e.exception.args[0] | assert "trailing data" in e.exception.args[0] | ||||
def test_slicing(self): | |||||
content, obj_id = self.hash_content(b"test compression") | |||||
self.storage.add(content, obj_id=obj_id) | |||||
slicer = PathSlicer(urlparse(self.url).path, self.slicing) | |||||
assert slicer.get_path(obj_id.hex()) in self.mock.content | |||||
class TestWeedObjStorageWithCompression(TestWeedObjStorage): | class TestWeedObjStorageWithCompression(TestWeedObjStorage): | ||||
compression = "lzma" | compression = "lzma" | ||||
class TestWeedObjStorageWithSmallBatch(TestWeedObjStorage): | class TestWeedObjStorageWithSmallBatch(TestWeedObjStorage): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.storage.wf.batchsize = 1 | self.storage.wf.batchsize = 1 | ||||
class TestWeedObjStorageWithSlicingAndSmallBatch(TestWeedObjStorage): | |||||
slicing = "0:2/2:4" | |||||
def setUp(self): | |||||
super().setUp() | |||||
self.storage.wf.batchsize = 1 | |||||
class TestWeedObjStorageWithNoPath(TestWeedObjStorage): | class TestWeedObjStorageWithNoPath(TestWeedObjStorage): | ||||
url = "http://127.0.0.1/" | url = "http://127.0.0.1/" |