diff --git a/swh/objstorage/backends/seaweed.py b/swh/objstorage/backends/seaweed.py --- a/swh/objstorage/backends/seaweed.py +++ b/swh/objstorage/backends/seaweed.py @@ -6,13 +6,13 @@ import io from itertools import islice import logging -import os from typing import Iterator, Optional from urllib.parse import urljoin, urlparse import requests from swh.model import hashutil +from swh.objstorage.backends.pathslicing import PathSlicer from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.objstorage import ( DEFAULT_LIMIT, @@ -29,7 +29,6 @@ class WeedFiler(object): """Simple class that encapsulates access to a seaweedfs filer service. - Objects are expected to be in a single directory. TODO: handle errors """ @@ -71,10 +70,14 @@ LOGGER.debug("Delete file %s", url) return self.session.delete(url) - def iterfiles(self, last_file_name: Optional[str] = None) -> Iterator[str]: - """yield absolute file names + def iterfiles( + self, dir: str, last_file_name: Optional[str] = None + ) -> Iterator[str]: + """Recursively yield absolute file names Args: + dir: retrieve file names starting from this directory; must + be an absolute path. last_file_name: if given, starts from the file just after; must be basename. @@ -82,22 +85,38 @@ absolute file names """ - for entry in self._iter_dir(last_file_name): + if not dir.endswith("/"): + dir = dir + "/" + + # first, generates files going "down" + yield from self._iter_files(dir, last_file_name) + + # then, continue iterate going up the tree + while dir != self.basepath: + dir, last = dir[:-1].rsplit("/", 1) + dir += "/" + yield from self._iter_files(dir, last_file_name=last) + + def _iter_files( + self, dir: str, last_file_name: Optional[str] = None + ) -> Iterator[str]: + for entry in self._iter_one_dir(dir, last_file_name): fullpath = entry["FullPath"] if entry["Mode"] & 1 << 31: # it's a directory, recurse # see https://pkg.go.dev/io/fs#FileMode - yield from self.iterfiles(fullpath) + yield from self._iter_files(fullpath) else: yield fullpath - def _iter_dir(self, last_file_name=None): + def _iter_one_dir(self, remote_path, last_file_name=None): + url = self.build_url(remote_path) params = {"limit": self.batchsize} if last_file_name: params["lastFileName"] = last_file_name - LOGGER.debug("List directory %s", self.url) + LOGGER.debug("List directory %s", url) while True: - rsp = self.session.get(self.url, params=params) + rsp = self.session.get(url, params=params) if rsp.ok: dircontent = rsp.json() if dircontent["Entries"]: @@ -107,9 +126,7 @@ params["lastFileName"] = dircontent["LastFileName"] else: - LOGGER.error( - 'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code) - ) + LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code)) break @@ -119,12 +136,15 @@ https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API """ - def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): + def __init__( + self, url="http://127.0.0.1:8888/swh", compression=None, slicing="", **kwargs + ): super().__init__(**kwargs) self.wf = WeedFiler(url) self.root_path = urlparse(url).path if not self.root_path.endswith("/"): self.root_path += "/" + self.slicer = PathSlicer(self.root_path, slicing) self.compression = compression def check_config(self, *, check_write): @@ -219,12 +239,16 @@ def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): if last_obj_id: objid = hashutil.hash_to_hex(last_obj_id) - lastfilename = objid + objpath = self._path(objid) + startdir, lastfilename = objpath.rsplit("/", 1) else: + startdir = self.root_path lastfilename = None # startdir = self.wf.build_url(startdir) - for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit): + for fname in islice( + self.wf.iterfiles(startdir, last_file_name=lastfilename), limit + ): bytehex = fname.rsplit("/", 1)[-1] yield hashutil.bytehex_to_hash(bytehex.encode()) @@ -248,4 +272,4 @@ self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) def _path(self, obj_id): - return os.path.join(self.wf.basepath, hashutil.hash_to_hex(obj_id)) + return self.slicer.get_path(hashutil.hash_to_hex(obj_id)) diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py --- a/swh/objstorage/tests/test_objstorage_seaweedfs.py +++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py @@ -13,28 +13,115 @@ import requests_mock from requests_mock.contrib import fixture +from swh.objstorage.backends.pathslicing import PathSlicer from swh.objstorage.backends.seaweed import WeedObjStorage from swh.objstorage.exc import Error from swh.objstorage.objstorage import decompressors from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture +class PathDict: + """A dict-like object that handles "path-like" keys in a recursive dict + structure. + + For example: + + >>> a = PathDict() + >>> a['path/to/file'] = 'some file content' + + will create a dict structure (in self.data) like: + + >>> print(a.data) + {'path': {'to': {'file': 'some file content'}}} + >>> 'path/to/file' in a + True + + This is a helper class for the FilerRequestsMock below. + """ + + def __init__(self): + self.data = {} + + def __setitem__(self, key, value): + if key.endswith("/"): + raise ValueError("Nope") + if key.startswith("/"): + key = key[1:] + path = key.split("/") + resu = self.data + for p in path[:-1]: + resu = resu.setdefault(p, {}) + resu[path[-1]] = value + + def __getitem__(self, key): + assert isinstance(key, str) + if key == "/": + return self.data + + if key.startswith("/"): + key = key[1:] + if key.endswith("/"): + key = key[:-1] + + path = key.split("/") + resu = self.data + for p in path: + resu = resu[p] + return resu + + def __delitem__(self, key): + if key.startswith("/"): + key = key[1:] + if key.endswith("/"): + key = key[:-1] + path = key.split("/") + resu = self.data + for p in path[:-1]: + resu = resu.setdefault(p, {}) + del resu[path[-1]] + + def __contains__(self, key): + if key == "/": + # always consider we have the 'root' directory + return True + try: + self[key] + return True + except KeyError: + return False + + def flat(self): + def go(d): + for k, v in d.items(): + if isinstance(v, dict): + yield from go(v) + else: + yield k + + yield from go(self.data) + + class FilerRequestsMock: """This is a requests_mock based mock for the seaweedfs Filer API It does not implement the whole API, only the parts required to make the WeedFiler (used by WeedObjStorage) work. - It stores the files in a dict. + It stores the files in a dict-based structure, eg. the file + '0a/32/0a3245983255' will be stored in a dict like: + + {'0a': {'32': {'0a3245983255': b'content'}}} + + It uses the PathDict helper class to make it a bit easier to handle this + dict structure. + """ MODE_DIR = 0o20000000771 MODE_FILE = 0o660 - def __init__(self, baseurl): - self.baseurl = baseurl - self.basepath = urlparse(baseurl).path - self.content = {} + def __init__(self): + self.content = PathDict() self.requests_mock = fixture.Fixture() self.requests_mock.setUp() self.requests_mock.register_uri( @@ -50,66 +137,66 @@ requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb ) - def relpath(self, path): - if path.startswith(self.basepath): - return os.path.relpath(path, self.basepath) - def head_cb(self, request, context): - relpath = self.relpath(request.path) - if relpath == "." or relpath in self.content: - return b"Found" # ok, found it - context.status_code = 404 - return b"Not Found" + if request.path not in self.content: + context.status_code = 404 def get_cb(self, request, context): - if self.head_cb(request, context) == b"Not Found": - return - relpath = self.relpath(request.path) - if relpath == ".": - if "limit" in request.qs: - limit = int(request.qs["limit"][0]) - assert limit > 0 - else: - limit = None - - items = sorted(self.content.items()) - if items and "lastfilename" in request.qs: - lastfilename = request.qs["lastfilename"][0] - if lastfilename: - # exclude all filenames up to lastfilename - items = dropwhile(lambda kv: kv[0] <= lastfilename, items) - - if limit: - # +1 to easily detect if there are more - items = islice(items, limit + 1) - - entries = [ - {"FullPath": os.path.join(request.path, fname), "Mode": self.MODE_FILE,} - for fname, obj in items - ] - - thereismore = False - if limit and len(entries) > limit: - entries = entries[:limit] - thereismore = True - - if entries: - lastfilename = entries[-1]["FullPath"].split("/")[-1] - else: - lastfilename = None - text = json.dumps( - { - "Path": request.path, - "Limit": limit, - "LastFileName": lastfilename, - "ShouldDisplayLoadMore": thereismore, - "Entries": entries, - } - ) - encoding = get_encoding_from_headers(request.headers) or "utf-8" - return text.encode(encoding) + content = None + if request.path not in self.content: + context.status_code = 404 else: - return self.content[relpath] + content = self.content[request.path] + if isinstance(content, dict): + if "limit" in request.qs: + limit = int(request.qs["limit"][0]) + assert limit > 0 + else: + limit = None + + items = sorted(content.items()) + if items and "lastfilename" in request.qs: + lastfilename = request.qs["lastfilename"][0] + # exclude all filenames up to lastfilename + items = dropwhile( + lambda kv: kv[0].split("/")[-1] <= lastfilename, items + ) + + if limit: + # +1 to easily detect if there are more + items = islice(items, limit + 1) + + entries = [ + { + "FullPath": os.path.join(request.path, fname), + "Mode": self.MODE_DIR + if isinstance(obj, dict) + else self.MODE_FILE, + } + for fname, obj in items + ] + + thereismore = False + if limit and len(entries) > limit: + entries = entries[:limit] + thereismore = True + + if entries: + lastfilename = entries[-1]["FullPath"].split("/")[-1] + else: + lastfilename = None + text = json.dumps( + { + "Path": request.path, + "Limit": limit, + "LastFileName": lastfilename, + "ShouldDisplayLoadMore": thereismore, + "Entries": entries, + } + ) + encoding = get_encoding_from_headers(request.headers) or "utf-8" + content = text.encode(encoding) + return content def post_cb(self, request, context): from requests_toolbelt.multipart import decoder @@ -118,20 +205,23 @@ request.body, request.headers["content-type"] ) part = multipart_data.parts[0] - self.content[self.relpath(request.path)] = part.content + self.content[request.path] = part.content def delete_cb(self, request, context): - del self.content[self.relpath(request.path)] + del self.content[request.path] class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): compression = "none" url = "http://127.0.0.1/test/" + slicing = "" def setUp(self): super().setUp() - self.storage = WeedObjStorage(url=self.url, compression=self.compression) - self.mock = FilerRequestsMock(baseurl=self.url) + self.storage = WeedObjStorage( + url=self.url, compression=self.compression, slicing=self.slicing + ) + self.mock = FilerRequestsMock() def test_compression(self): content, obj_id = self.hash_content(b"test compression") @@ -147,7 +237,8 @@ content, obj_id = self.hash_content(b"test content without garbage") self.storage.add(content, obj_id=obj_id) - self.mock.content[obj_id.hex()] += b"trailing garbage" + path = self.storage._path(obj_id) + self.mock.content[path] += b"trailing garbage" if self.compression == "none": with self.assertRaises(Error) as e: @@ -157,6 +248,13 @@ self.storage.get(obj_id) assert "trailing data" in e.exception.args[0] + def test_slicing(self): + content, obj_id = self.hash_content(b"test compression") + self.storage.add(content, obj_id=obj_id) + + slicer = PathSlicer(urlparse(self.url).path, self.slicing) + assert slicer.get_path(obj_id.hex()) in self.mock.content + class TestWeedObjStorageWithCompression(TestWeedObjStorage): compression = "lzma" @@ -168,5 +266,12 @@ self.storage.wf.batchsize = 1 +class TestWeedObjStorageWithSlicingAndSmallBatch(TestWeedObjStorage): + slicing = "0:2/2:4" + def setUp(self): + super().setUp() + self.storage.wf.batchsize = 1 + + class TestWeedObjStorageWithNoPath(TestWeedObjStorage): url = "http://127.0.0.1/"