Page MenuHomeSoftware Heritage

D6492.id23692.diff
No OneTemporary

D6492.id23692.diff

diff --git a/swh/objstorage/backends/seaweed.py b/swh/objstorage/backends/seaweed.py
--- a/swh/objstorage/backends/seaweed.py
+++ b/swh/objstorage/backends/seaweed.py
@@ -13,6 +13,7 @@
import requests
from swh.model import hashutil
+from swh.objstorage.backends.pathslicing import PathSlicer
from swh.objstorage.exc import Error, ObjNotFoundError
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
@@ -29,7 +30,6 @@
class WeedFiler(object):
"""Simple class that encapsulates access to a seaweedfs filer service.
- Objects are expected to be in a single directory.
TODO: handle errors
"""
@@ -71,10 +71,14 @@
LOGGER.debug("Delete file %s", url)
return self.session.delete(url)
- def iterfiles(self, last_file_name: str = "") -> Iterator[str]:
- """yield absolute file names
+ def iterfiles(
+ self, dir: str, last_file_name: str = ""
+ ) -> Iterator[str]:
+ """Recursively yield absolute file names
Args:
+ dir: retrieve file names starting from this directory; must
+ be an absolute path.
last_file_name: if given, starts from the file just after; must
be basename.
@@ -82,22 +86,38 @@
absolute file names
"""
- for entry in self._iter_dir(last_file_name):
+ if not dir.endswith("/"):
+ dir = dir + "/"
+
+ # first, generates files going "down"
+ yield from self._iter_files(dir, last_file_name)
+
+ # then, continue iterate going up the tree
+ while dir != self.basepath:
+ dir, last = dir[:-1].rsplit("/", 1)
+ dir += "/"
+ yield from self._iter_files(dir, last_file_name=last)
+
+ def _iter_files(
+ self, dir: str, last_file_name: str = ""
+ ) -> Iterator[str]:
+ for entry in self._iter_one_dir(dir, last_file_name):
fullpath = entry["FullPath"]
if entry["Mode"] & 1 << 31: # it's a directory, recurse
# see https://pkg.go.dev/io/fs#FileMode
- yield from self.iterfiles(fullpath)
+ yield from self._iter_files(fullpath)
else:
yield fullpath
- def _iter_dir(self, last_file_name: str = ""):
+ def _iter_one_dir(self, remote_path: str, last_file_name: str = ""):
+ url = self.build_url(remote_path)
params = {"limit": self.batchsize}
if last_file_name:
params["lastFileName"] = last_file_name
- LOGGER.debug("List directory %s", self.url)
+ LOGGER.debug("List directory %s", url)
while True:
- rsp = self.session.get(self.url, params=params)
+ rsp = self.session.get(url, params=params)
if rsp.ok:
dircontent = rsp.json()
if dircontent["Entries"]:
@@ -107,9 +127,7 @@
params["lastFileName"] = dircontent["LastFileName"]
else:
- LOGGER.error(
- 'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code)
- )
+ LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code))
break
@@ -119,12 +137,15 @@
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API
"""
- def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs):
+ def __init__(
+ self, url="http://127.0.0.1:8888/swh", compression=None, slicing="", **kwargs
+ ):
super().__init__(**kwargs)
self.wf = WeedFiler(url)
self.root_path = urlparse(url).path
if not self.root_path.endswith("/"):
self.root_path += "/"
+ self.slicer = PathSlicer(self.root_path, slicing)
self.compression = compression
def check_config(self, *, check_write):
@@ -219,12 +240,16 @@
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
if last_obj_id:
objid = hashutil.hash_to_hex(last_obj_id)
- lastfilename = objid
+ objpath = self._path(objid)
+ startdir, lastfilename = objpath.rsplit("/", 1)
else:
+ startdir = self.root_path
lastfilename = None
# startdir = self.wf.build_url(startdir)
- for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit):
+ for fname in islice(
+ self.wf.iterfiles(startdir, last_file_name=lastfilename), limit
+ ):
bytehex = fname.rsplit("/", 1)[-1]
yield hashutil.bytehex_to_hash(bytehex.encode())
@@ -248,4 +273,4 @@
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id))
def _path(self, obj_id):
- return os.path.join(self.wf.basepath, hashutil.hash_to_hex(obj_id))
+ return self.slicer.get_path(hashutil.hash_to_hex(obj_id))
diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py
--- a/swh/objstorage/tests/test_objstorage_seaweedfs.py
+++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py
@@ -13,28 +13,115 @@
import requests_mock
from requests_mock.contrib import fixture
+from swh.objstorage.backends.pathslicing import PathSlicer
from swh.objstorage.backends.seaweed import WeedObjStorage
from swh.objstorage.exc import Error
from swh.objstorage.objstorage import decompressors
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
+class PathDict:
+ """A dict-like object that handles "path-like" keys in a recursive dict
+ structure.
+
+ For example:
+
+ >>> a = PathDict()
+ >>> a['path/to/file'] = 'some file content'
+
+ will create a dict structure (in self.data) like:
+
+ >>> print(a.data)
+ {'path': {'to': {'file': 'some file content'}}}
+ >>> 'path/to/file' in a
+ True
+
+ This is a helper class for the FilerRequestsMock below.
+ """
+
+ def __init__(self):
+ self.data = {}
+
+ def __setitem__(self, key, value):
+ if key.endswith("/"):
+ raise ValueError("Nope")
+ if key.startswith("/"):
+ key = key[1:]
+ path = key.split("/")
+ resu = self.data
+ for p in path[:-1]:
+ resu = resu.setdefault(p, {})
+ resu[path[-1]] = value
+
+ def __getitem__(self, key):
+ assert isinstance(key, str)
+ if key == "/":
+ return self.data
+
+ if key.startswith("/"):
+ key = key[1:]
+ if key.endswith("/"):
+ key = key[:-1]
+
+ path = key.split("/")
+ resu = self.data
+ for p in path:
+ resu = resu[p]
+ return resu
+
+ def __delitem__(self, key):
+ if key.startswith("/"):
+ key = key[1:]
+ if key.endswith("/"):
+ key = key[:-1]
+ path = key.split("/")
+ resu = self.data
+ for p in path[:-1]:
+ resu = resu.setdefault(p, {})
+ del resu[path[-1]]
+
+ def __contains__(self, key):
+ if key == "/":
+ # always consider we have the 'root' directory
+ return True
+ try:
+ self[key]
+ return True
+ except KeyError:
+ return False
+
+ def flat(self):
+ def go(d):
+ for k, v in d.items():
+ if isinstance(v, dict):
+ yield from go(v)
+ else:
+ yield k
+
+ yield from go(self.data)
+
+
class FilerRequestsMock:
"""This is a requests_mock based mock for the seaweedfs Filer API
It does not implement the whole API, only the parts required to make the
WeedFiler (used by WeedObjStorage) work.
- It stores the files in a dict.
+ It stores the files in a dict-based structure, eg. the file
+ '0a/32/0a3245983255' will be stored in a dict like:
+
+ {'0a': {'32': {'0a3245983255': b'content'}}}
+
+ It uses the PathDict helper class to make it a bit easier to handle this
+ dict structure.
+
"""
MODE_DIR = 0o20000000771
MODE_FILE = 0o660
- def __init__(self, baseurl):
- self.baseurl = baseurl
- self.basepath = urlparse(baseurl).path
- self.content = {}
+ def __init__(self):
+ self.content = PathDict()
self.requests_mock = fixture.Fixture()
self.requests_mock.setUp()
self.requests_mock.register_uri(
@@ -50,66 +137,66 @@
requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb
)
- def relpath(self, path):
- if path.startswith(self.basepath):
- return os.path.relpath(path, self.basepath)
-
def head_cb(self, request, context):
- relpath = self.relpath(request.path)
- if relpath == "." or relpath in self.content:
- return b"Found" # ok, found it
- context.status_code = 404
- return b"Not Found"
+ if request.path not in self.content:
+ context.status_code = 404
def get_cb(self, request, context):
- if self.head_cb(request, context) == b"Not Found":
- return
- relpath = self.relpath(request.path)
- if relpath == ".":
- if "limit" in request.qs:
- limit = int(request.qs["limit"][0])
- assert limit > 0
- else:
- limit = None
-
- items = sorted(self.content.items())
- if items and "lastfilename" in request.qs:
- lastfilename = request.qs["lastfilename"][0]
- if lastfilename:
- # exclude all filenames up to lastfilename
- items = dropwhile(lambda kv: kv[0] <= lastfilename, items)
-
- if limit:
- # +1 to easily detect if there are more
- items = islice(items, limit + 1)
-
- entries = [
- {"FullPath": os.path.join(request.path, fname), "Mode": self.MODE_FILE,}
- for fname, obj in items
- ]
-
- thereismore = False
- if limit and len(entries) > limit:
- entries = entries[:limit]
- thereismore = True
-
- if entries:
- lastfilename = entries[-1]["FullPath"].split("/")[-1]
- else:
- lastfilename = None
- text = json.dumps(
- {
- "Path": request.path,
- "Limit": limit,
- "LastFileName": lastfilename,
- "ShouldDisplayLoadMore": thereismore,
- "Entries": entries,
- }
- )
- encoding = get_encoding_from_headers(request.headers) or "utf-8"
- return text.encode(encoding)
+ content = None
+ if request.path not in self.content:
+ context.status_code = 404
else:
- return self.content[relpath]
+ content = self.content[request.path]
+ if isinstance(content, dict):
+ if "limit" in request.qs:
+ limit = int(request.qs["limit"][0])
+ assert limit > 0
+ else:
+ limit = None
+
+ items = sorted(content.items())
+ if items and "lastfilename" in request.qs:
+ lastfilename = request.qs["lastfilename"][0]
+ # exclude all filenames up to lastfilename
+ items = dropwhile(
+ lambda kv: kv[0].split("/")[-1] <= lastfilename, items
+ )
+
+ if limit:
+ # +1 to easily detect if there are more
+ items = islice(items, limit + 1)
+
+ entries = [
+ {
+ "FullPath": os.path.join(request.path, fname),
+ "Mode": self.MODE_DIR
+ if isinstance(obj, dict)
+ else self.MODE_FILE,
+ }
+ for fname, obj in items
+ ]
+
+ thereismore = False
+ if limit and len(entries) > limit:
+ entries = entries[:limit]
+ thereismore = True
+
+ if entries:
+ lastfilename = entries[-1]["FullPath"].split("/")[-1]
+ else:
+ lastfilename = None
+ text = json.dumps(
+ {
+ "Path": request.path,
+ "Limit": limit,
+ "LastFileName": lastfilename,
+ "ShouldDisplayLoadMore": thereismore,
+ "Entries": entries,
+ }
+ )
+ encoding = get_encoding_from_headers(request.headers) or "utf-8"
+ content = text.encode(encoding)
+ return content
def post_cb(self, request, context):
from requests_toolbelt.multipart import decoder
@@ -118,20 +205,23 @@
request.body, request.headers["content-type"]
)
part = multipart_data.parts[0]
- self.content[self.relpath(request.path)] = part.content
+ self.content[request.path] = part.content
def delete_cb(self, request, context):
- del self.content[self.relpath(request.path)]
+ del self.content[request.path]
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
url = "http://127.0.0.1/test/"
+ slicing = ""
def setUp(self):
super().setUp()
- self.storage = WeedObjStorage(url=self.url, compression=self.compression)
- self.mock = FilerRequestsMock(baseurl=self.url)
+ self.storage = WeedObjStorage(
+ url=self.url, compression=self.compression, slicing=self.slicing
+ )
+ self.mock = FilerRequestsMock()
def test_compression(self):
content, obj_id = self.hash_content(b"test compression")
@@ -147,7 +237,8 @@
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
- self.mock.content[obj_id.hex()] += b"trailing garbage"
+ path = self.storage._path(obj_id)
+ self.mock.content[path] += b"trailing garbage"
if self.compression == "none":
with self.assertRaises(Error) as e:
@@ -157,6 +248,13 @@
self.storage.get(obj_id)
assert "trailing data" in e.exception.args[0]
+ def test_slicing(self):
+ content, obj_id = self.hash_content(b"test compression")
+ self.storage.add(content, obj_id=obj_id)
+
+ slicer = PathSlicer(urlparse(self.url).path, self.slicing)
+ assert slicer.get_path(obj_id.hex()) in self.mock.content
+
class TestWeedObjStorageWithCompression(TestWeedObjStorage):
compression = "lzma"
@@ -168,5 +266,12 @@
self.storage.wf.batchsize = 1
+class TestWeedObjStorageWithSlicingAndSmallBatch(TestWeedObjStorage):
+ slicing = "0:2/2:4"
+ def setUp(self):
+ super().setUp()
+ self.storage.wf.batchsize = 1
+
+
class TestWeedObjStorageWithNoPath(TestWeedObjStorage):
url = "http://127.0.0.1/"

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 6:55 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3230804

Event Timeline