Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7343046
D6492.id23692.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D6492.id23692.diff
View Options
diff --git a/swh/objstorage/backends/seaweed.py b/swh/objstorage/backends/seaweed.py
--- a/swh/objstorage/backends/seaweed.py
+++ b/swh/objstorage/backends/seaweed.py
@@ -13,6 +13,7 @@
import requests
from swh.model import hashutil
+from swh.objstorage.backends.pathslicing import PathSlicer
from swh.objstorage.exc import Error, ObjNotFoundError
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
@@ -29,7 +30,6 @@
class WeedFiler(object):
"""Simple class that encapsulates access to a seaweedfs filer service.
- Objects are expected to be in a single directory.
TODO: handle errors
"""
@@ -71,10 +71,14 @@
LOGGER.debug("Delete file %s", url)
return self.session.delete(url)
- def iterfiles(self, last_file_name: str = "") -> Iterator[str]:
- """yield absolute file names
+ def iterfiles(
+ self, dir: str, last_file_name: str = ""
+ ) -> Iterator[str]:
+ """Recursively yield absolute file names
Args:
+ dir: retrieve file names starting from this directory; must
+ be an absolute path.
last_file_name: if given, starts from the file just after; must
be basename.
@@ -82,22 +86,38 @@
absolute file names
"""
- for entry in self._iter_dir(last_file_name):
+ if not dir.endswith("/"):
+ dir = dir + "/"
+
+ # first, generates files going "down"
+ yield from self._iter_files(dir, last_file_name)
+
+ # then, continue iterate going up the tree
+ while dir != self.basepath:
+ dir, last = dir[:-1].rsplit("/", 1)
+ dir += "/"
+ yield from self._iter_files(dir, last_file_name=last)
+
+ def _iter_files(
+ self, dir: str, last_file_name: str = ""
+ ) -> Iterator[str]:
+ for entry in self._iter_one_dir(dir, last_file_name):
fullpath = entry["FullPath"]
if entry["Mode"] & 1 << 31: # it's a directory, recurse
# see https://pkg.go.dev/io/fs#FileMode
- yield from self.iterfiles(fullpath)
+ yield from self._iter_files(fullpath)
else:
yield fullpath
- def _iter_dir(self, last_file_name: str = ""):
+ def _iter_one_dir(self, remote_path: str, last_file_name: str = ""):
+ url = self.build_url(remote_path)
params = {"limit": self.batchsize}
if last_file_name:
params["lastFileName"] = last_file_name
- LOGGER.debug("List directory %s", self.url)
+ LOGGER.debug("List directory %s", url)
while True:
- rsp = self.session.get(self.url, params=params)
+ rsp = self.session.get(url, params=params)
if rsp.ok:
dircontent = rsp.json()
if dircontent["Entries"]:
@@ -107,9 +127,7 @@
params["lastFileName"] = dircontent["LastFileName"]
else:
- LOGGER.error(
- 'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code)
- )
+ LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code))
break
@@ -119,12 +137,15 @@
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API
"""
- def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs):
+ def __init__(
+ self, url="http://127.0.0.1:8888/swh", compression=None, slicing="", **kwargs
+ ):
super().__init__(**kwargs)
self.wf = WeedFiler(url)
self.root_path = urlparse(url).path
if not self.root_path.endswith("/"):
self.root_path += "/"
+ self.slicer = PathSlicer(self.root_path, slicing)
self.compression = compression
def check_config(self, *, check_write):
@@ -219,12 +240,16 @@
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
if last_obj_id:
objid = hashutil.hash_to_hex(last_obj_id)
- lastfilename = objid
+ objpath = self._path(objid)
+ startdir, lastfilename = objpath.rsplit("/", 1)
else:
+ startdir = self.root_path
lastfilename = None
# startdir = self.wf.build_url(startdir)
- for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit):
+ for fname in islice(
+ self.wf.iterfiles(startdir, last_file_name=lastfilename), limit
+ ):
bytehex = fname.rsplit("/", 1)[-1]
yield hashutil.bytehex_to_hash(bytehex.encode())
@@ -248,4 +273,4 @@
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id))
def _path(self, obj_id):
- return os.path.join(self.wf.basepath, hashutil.hash_to_hex(obj_id))
+ return self.slicer.get_path(hashutil.hash_to_hex(obj_id))
diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py
--- a/swh/objstorage/tests/test_objstorage_seaweedfs.py
+++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py
@@ -13,28 +13,115 @@
import requests_mock
from requests_mock.contrib import fixture
+from swh.objstorage.backends.pathslicing import PathSlicer
from swh.objstorage.backends.seaweed import WeedObjStorage
from swh.objstorage.exc import Error
from swh.objstorage.objstorage import decompressors
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
+class PathDict:
+ """A dict-like object that handles "path-like" keys in a recursive dict
+ structure.
+
+ For example:
+
+ >>> a = PathDict()
+ >>> a['path/to/file'] = 'some file content'
+
+ will create a dict structure (in self.data) like:
+
+ >>> print(a.data)
+ {'path': {'to': {'file': 'some file content'}}}
+ >>> 'path/to/file' in a
+ True
+
+ This is a helper class for the FilerRequestsMock below.
+ """
+
+ def __init__(self):
+ self.data = {}
+
+ def __setitem__(self, key, value):
+ if key.endswith("/"):
+ raise ValueError("Nope")
+ if key.startswith("/"):
+ key = key[1:]
+ path = key.split("/")
+ resu = self.data
+ for p in path[:-1]:
+ resu = resu.setdefault(p, {})
+ resu[path[-1]] = value
+
+ def __getitem__(self, key):
+ assert isinstance(key, str)
+ if key == "/":
+ return self.data
+
+ if key.startswith("/"):
+ key = key[1:]
+ if key.endswith("/"):
+ key = key[:-1]
+
+ path = key.split("/")
+ resu = self.data
+ for p in path:
+ resu = resu[p]
+ return resu
+
+ def __delitem__(self, key):
+ if key.startswith("/"):
+ key = key[1:]
+ if key.endswith("/"):
+ key = key[:-1]
+ path = key.split("/")
+ resu = self.data
+ for p in path[:-1]:
+ resu = resu.setdefault(p, {})
+ del resu[path[-1]]
+
+ def __contains__(self, key):
+ if key == "/":
+ # always consider we have the 'root' directory
+ return True
+ try:
+ self[key]
+ return True
+ except KeyError:
+ return False
+
+ def flat(self):
+ def go(d):
+ for k, v in d.items():
+ if isinstance(v, dict):
+ yield from go(v)
+ else:
+ yield k
+
+ yield from go(self.data)
+
+
class FilerRequestsMock:
"""This is a requests_mock based mock for the seaweedfs Filer API
It does not implement the whole API, only the parts required to make the
WeedFiler (used by WeedObjStorage) work.
- It stores the files in a dict.
+ It stores the files in a dict-based structure, eg. the file
+ '0a/32/0a3245983255' will be stored in a dict like:
+
+ {'0a': {'32': {'0a3245983255': b'content'}}}
+
+ It uses the PathDict helper class to make it a bit easier to handle this
+ dict structure.
+
"""
MODE_DIR = 0o20000000771
MODE_FILE = 0o660
- def __init__(self, baseurl):
- self.baseurl = baseurl
- self.basepath = urlparse(baseurl).path
- self.content = {}
+ def __init__(self):
+ self.content = PathDict()
self.requests_mock = fixture.Fixture()
self.requests_mock.setUp()
self.requests_mock.register_uri(
@@ -50,66 +137,66 @@
requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb
)
- def relpath(self, path):
- if path.startswith(self.basepath):
- return os.path.relpath(path, self.basepath)
-
def head_cb(self, request, context):
- relpath = self.relpath(request.path)
- if relpath == "." or relpath in self.content:
- return b"Found" # ok, found it
- context.status_code = 404
- return b"Not Found"
+ if request.path not in self.content:
+ context.status_code = 404
def get_cb(self, request, context):
- if self.head_cb(request, context) == b"Not Found":
- return
- relpath = self.relpath(request.path)
- if relpath == ".":
- if "limit" in request.qs:
- limit = int(request.qs["limit"][0])
- assert limit > 0
- else:
- limit = None
-
- items = sorted(self.content.items())
- if items and "lastfilename" in request.qs:
- lastfilename = request.qs["lastfilename"][0]
- if lastfilename:
- # exclude all filenames up to lastfilename
- items = dropwhile(lambda kv: kv[0] <= lastfilename, items)
-
- if limit:
- # +1 to easily detect if there are more
- items = islice(items, limit + 1)
-
- entries = [
- {"FullPath": os.path.join(request.path, fname), "Mode": self.MODE_FILE,}
- for fname, obj in items
- ]
-
- thereismore = False
- if limit and len(entries) > limit:
- entries = entries[:limit]
- thereismore = True
-
- if entries:
- lastfilename = entries[-1]["FullPath"].split("/")[-1]
- else:
- lastfilename = None
- text = json.dumps(
- {
- "Path": request.path,
- "Limit": limit,
- "LastFileName": lastfilename,
- "ShouldDisplayLoadMore": thereismore,
- "Entries": entries,
- }
- )
- encoding = get_encoding_from_headers(request.headers) or "utf-8"
- return text.encode(encoding)
+ content = None
+ if request.path not in self.content:
+ context.status_code = 404
else:
- return self.content[relpath]
+ content = self.content[request.path]
+ if isinstance(content, dict):
+ if "limit" in request.qs:
+ limit = int(request.qs["limit"][0])
+ assert limit > 0
+ else:
+ limit = None
+
+ items = sorted(content.items())
+ if items and "lastfilename" in request.qs:
+ lastfilename = request.qs["lastfilename"][0]
+ # exclude all filenames up to lastfilename
+ items = dropwhile(
+ lambda kv: kv[0].split("/")[-1] <= lastfilename, items
+ )
+
+ if limit:
+ # +1 to easily detect if there are more
+ items = islice(items, limit + 1)
+
+ entries = [
+ {
+ "FullPath": os.path.join(request.path, fname),
+ "Mode": self.MODE_DIR
+ if isinstance(obj, dict)
+ else self.MODE_FILE,
+ }
+ for fname, obj in items
+ ]
+
+ thereismore = False
+ if limit and len(entries) > limit:
+ entries = entries[:limit]
+ thereismore = True
+
+ if entries:
+ lastfilename = entries[-1]["FullPath"].split("/")[-1]
+ else:
+ lastfilename = None
+ text = json.dumps(
+ {
+ "Path": request.path,
+ "Limit": limit,
+ "LastFileName": lastfilename,
+ "ShouldDisplayLoadMore": thereismore,
+ "Entries": entries,
+ }
+ )
+ encoding = get_encoding_from_headers(request.headers) or "utf-8"
+ content = text.encode(encoding)
+ return content
def post_cb(self, request, context):
from requests_toolbelt.multipart import decoder
@@ -118,20 +205,23 @@
request.body, request.headers["content-type"]
)
part = multipart_data.parts[0]
- self.content[self.relpath(request.path)] = part.content
+ self.content[request.path] = part.content
def delete_cb(self, request, context):
- del self.content[self.relpath(request.path)]
+ del self.content[request.path]
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
url = "http://127.0.0.1/test/"
+ slicing = ""
def setUp(self):
super().setUp()
- self.storage = WeedObjStorage(url=self.url, compression=self.compression)
- self.mock = FilerRequestsMock(baseurl=self.url)
+ self.storage = WeedObjStorage(
+ url=self.url, compression=self.compression, slicing=self.slicing
+ )
+ self.mock = FilerRequestsMock()
def test_compression(self):
content, obj_id = self.hash_content(b"test compression")
@@ -147,7 +237,8 @@
content, obj_id = self.hash_content(b"test content without garbage")
self.storage.add(content, obj_id=obj_id)
- self.mock.content[obj_id.hex()] += b"trailing garbage"
+ path = self.storage._path(obj_id)
+ self.mock.content[path] += b"trailing garbage"
if self.compression == "none":
with self.assertRaises(Error) as e:
@@ -157,6 +248,13 @@
self.storage.get(obj_id)
assert "trailing data" in e.exception.args[0]
+ def test_slicing(self):
+ content, obj_id = self.hash_content(b"test compression")
+ self.storage.add(content, obj_id=obj_id)
+
+ slicer = PathSlicer(urlparse(self.url).path, self.slicing)
+ assert slicer.get_path(obj_id.hex()) in self.mock.content
+
class TestWeedObjStorageWithCompression(TestWeedObjStorage):
compression = "lzma"
@@ -168,5 +266,12 @@
self.storage.wf.batchsize = 1
+class TestWeedObjStorageWithSlicingAndSmallBatch(TestWeedObjStorage):
+ slicing = "0:2/2:4"
+ def setUp(self):
+ super().setUp()
+ self.storage.wf.batchsize = 1
+
+
class TestWeedObjStorageWithNoPath(TestWeedObjStorage):
url = "http://127.0.0.1/"
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 6:55 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3230804
Attached To
D6492: Add support for pathslicing in seaweedfs backend
Event Timeline
Log In to Comment