Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8394202
D6492.id23587.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D6492.id23587.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -19,3 +19,6 @@
[mypy-rados.*]
ignore_missing_imports = True
+
+[mypy-requests_toolbelt.*]
+ignore_missing_imports = True
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -2,5 +2,7 @@
azure-storage-blob >= 12.0, != 12.9.0 # version 12.9.0 breaks mypy https://github.com/Azure/azure-sdk-for-python/pull/20891
pytest
python-cephlibs
+requests_mock[fixtures] >= 1.9
+requests_toolbelt
types-pyyaml
types-requests
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,6 +5,7 @@
# remote storage API server
aiohttp >= 3
click
+requests
# optional dependencies
# apache-libcloud
diff --git a/swh/objstorage/backends/seaweed.py b/swh/objstorage/backends/seaweed.py
--- a/swh/objstorage/backends/seaweed.py
+++ b/swh/objstorage/backends/seaweed.py
@@ -4,12 +4,15 @@
# See top-level LICENSE file for more information
import io
+from itertools import islice
import logging
+from typing import Iterator, Optional
from urllib.parse import urljoin, urlparse
import requests
from swh.model import hashutil
+from swh.objstorage.backends.pathslicing import PathSlicer
from swh.objstorage.exc import Error, ObjNotFoundError
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
@@ -30,47 +33,102 @@
"""
def __init__(self, url):
+ if url.endswith("/"):
+ url = url[:-1]
self.url = url
+ self.baseurl = urljoin(url, "/")
+ self.basepath = urlparse(url).path
+
+ self.session = requests.Session()
+ self.session.headers["Accept"] = "application/json"
+
+ self.batchsize = DEFAULT_LIMIT
+
+ def build_url(self, path):
+ assert path == self.basepath or path.startswith(self.basepath)
+ return urljoin(self.baseurl, path)
def get(self, remote_path):
- url = urljoin(self.url, remote_path)
+ url = self.build_url(remote_path)
LOGGER.debug("Get file %s", url)
- return requests.get(url).content
+ resp = self.session.get(url)
+ resp.raise_for_status()
+ return resp.content
def exists(self, remote_path):
- url = urljoin(self.url, remote_path)
+ url = self.build_url(remote_path)
LOGGER.debug("Check file %s", url)
- return requests.head(url).status_code == 200
+ return self.session.head(url).status_code == 200
def put(self, fp, remote_path):
- url = urljoin(self.url, remote_path)
+ url = self.build_url(remote_path)
LOGGER.debug("Put file %s", url)
- return requests.post(url, files={"file": fp})
+ return self.session.post(url, files={"file": fp})
def delete(self, remote_path):
- url = urljoin(self.url, remote_path)
+ url = self.build_url(remote_path)
LOGGER.debug("Delete file %s", url)
- return requests.delete(url)
+ return self.session.delete(url)
+
+ def iterfiles(
+ self, dir: str, last_file_name: Optional[str] = None
+ ) -> Iterator[str]:
+ """Recursively yield absolute file names
- def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT):
- """list sub folders and files of @dir. show a better look if you turn on
+ Args:
+ dir (str): retrieve file names starting from this directory; must
+ be an absolute path.
+ last_file_name (str): if given, starts from the file just after; must
+ be basename.
- returns a dict of "sub-folders and files"
+ Yields:
+ absolute file names
"""
- d = dir if dir.endswith("/") else (dir + "/")
- url = urljoin(self.url, d)
- headers = {"Accept": "application/json"}
- params = {"limit": limit}
+ if dir.endswith("/"):
+ dir = dir[:-1]
+
+ # first, generates files going "down"
+ yield from self._iter_files(dir, last_file_name)
+
+ # then, continue iterate going up the tree
+ while True:
+ dir, last = dir.rsplit("/", 1)
+ if not (dir + "/").startswith(self.basepath):
+ # we are done
+ break
+ yield from self._iter_files(dir, last_file_name=last)
+
+ def _iter_files(
+ self, dir: str, last_file_name: Optional[str] = None
+ ) -> Iterator[str]:
+ for entry in self._iter_one_dir(dir, last_file_name):
+ fullpath = entry["FullPath"]
+ if entry["Mode"] & 1 << 31: # it's a directory, recurse
+ yield from self._iter_files(fullpath)
+ else:
+ yield fullpath
+
+ def _iter_one_dir(self, remote_path, last_file_name=None):
+ url = self.build_url(remote_path)
+ params = {"limit": self.batchsize}
if last_file_name:
params["lastFileName"] = last_file_name
LOGGER.debug("List directory %s", url)
- rsp = requests.get(url, params=params, headers=headers)
- if rsp.ok:
- return rsp.json()
- else:
- LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code))
+ while True:
+ rsp = self.session.get(url, params=params)
+ if rsp.ok:
+ dircontent = rsp.json()
+ if dircontent["Entries"]:
+ yield from dircontent["Entries"]
+ if not dircontent["ShouldDisplayLoadMore"]:
+ break
+ params["lastFileName"] = dircontent["LastFileName"]
+
+ else:
+ LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code))
+ break
class WeedObjStorage(ObjStorage):
@@ -79,10 +137,15 @@
https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API
"""
- def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs):
+ def __init__(
+ self, url="http://127.0.0.1:8888/swh", compression=None, slicing="", **kwargs
+ ):
super().__init__(**kwargs)
self.wf = WeedFiler(url)
self.root_path = urlparse(url).path
+ if not self.root_path.endswith("/"):
+ self.root_path += "/"
+ self.slicer = PathSlicer(self.root_path, slicing)
self.compression = compression
def check_config(self, *, check_write):
@@ -176,15 +239,19 @@
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
if last_obj_id:
- last_obj_id = hashutil.hash_to_hex(last_obj_id)
- resp = self.wf.list(self.root_path, last_obj_id, limit)
- if resp is not None:
- entries = resp["Entries"]
- if entries:
- for obj in entries:
- if obj is not None:
- bytehex = obj["FullPath"].rsplit("/", 1)[-1]
- yield hashutil.bytehex_to_hash(bytehex.encode())
+ objid = hashutil.hash_to_hex(last_obj_id)
+ objpath = self._path(objid)
+ startdir, lastfilename = objpath.rsplit("/", 1)
+ else:
+ startdir = self.root_path
+ lastfilename = None
+ # startdir = self.wf.build_url(startdir)
+
+ for fname in islice(
+ self.wf.iterfiles(startdir, last_file_name=lastfilename), limit
+ ):
+ bytehex = fname.rsplit("/", 1)[-1]
+ yield hashutil.bytehex_to_hash(bytehex.encode())
# internal methods
def _put_object(self, content, obj_id):
@@ -206,4 +273,4 @@
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id))
def _path(self, obj_id):
- return hashutil.hash_to_hex(obj_id)
+ return self.slicer.get_path(hashutil.hash_to_hex(obj_id))
diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py
--- a/swh/objstorage/tests/test_objstorage_seaweedfs.py
+++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py
@@ -3,50 +3,218 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from itertools import dropwhile, islice
+import json
+import os
import unittest
-from swh.objstorage.backends.seaweed import DEFAULT_LIMIT, WeedObjStorage
+from requests.utils import get_encoding_from_headers
+import requests_mock
+from requests_mock.contrib import fixture
+
+from swh.objstorage.backends.pathslicing import PathSlicer
+from swh.objstorage.backends.seaweed import WeedObjStorage
from swh.objstorage.exc import Error
from swh.objstorage.objstorage import decompressors
from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture
-class MockWeedFiler:
- """ WeedFiler mock that replicates its API """
+class PathDict:
+ """A dict-like object that handles "path-like" keys in a recursive dict
+ structure.
+
+ For example:
+
+ >>> a = PathDict()
+ >>> a['path/to/file'] = 'some file content'
+
+ will create a dict structure (in self.data) like:
+
+ >>> print(a.data)
+ {'path': {'to': {'file': 'some file content'}}}
+ >>> 'path/to/file' in a
+ True
+
+ This is a helper class for the FilerRequestsMock below.
+ """
+
+ def __init__(self):
+ self.data = {}
+
+ def __setitem__(self, key, value):
+ if key.endswith("/"):
+ raise ValueError("Nope")
+ if key.startswith("/"):
+ key = key[1:]
+ path = key.split("/")
+ resu = self.data
+ for p in path[:-1]:
+ resu = resu.setdefault(p, {})
+ resu[path[-1]] = value
+
+ def __getitem__(self, key):
+ assert isinstance(key, str)
+ if key.startswith("/"):
+ key = key[1:]
+ if key.endswith("/"):
+ key = key[:-1]
+
+ path = key.split("/")
+ resu = self.data
+ for p in path:
+ resu = resu[p]
+ return resu
+
+ def __delitem__(self, key):
+ if key.startswith("/"):
+ key = key[1:]
+ if key.endswith("/"):
+ key = key[:-1]
+ path = key.split("/")
+ resu = self.data
+ for p in path[:-1]:
+ resu = resu.setdefault(p, {})
+ del resu[path[-1]]
+
+ def __contains__(self, key):
+ try:
+ self[key]
+ return True
+ except KeyError:
+ return False
+
+ def flat(self):
+ def go(d):
+ for k, v in d.items():
+ if isinstance(v, dict):
+ yield from go(v)
+ else:
+ yield k
+
+ yield from go(self.data)
+
+
+class FilerRequestsMock:
+ """This is a requests_mock based mock for the seaweedfs Filer API
+
+ It does not implement the whole API, only the parts required to make the
+ WeedFiler (used by WeedObjStorage) work.
+
+ It stores the files in a dict-based structure, eg. the file
+ '0a/32/0a3245983255' will be stored in a dict like:
+
+ {'0a': {'32': {'0a3245983255': b'content'}}}
- def __init__(self, url):
- self.url = url
- self.content = {}
+ It uses the PathDict helper class to make it a bit easier to handle this
+ dict structure.
- def get(self, remote_path):
- return self.content[remote_path]
+ """
- def put(self, fp, remote_path):
- self.content[remote_path] = fp.read()
+ MODE_DIR = 0o20000000771
+ MODE_FILE = 0o660
- def exists(self, remote_path):
- return remote_path in self.content
+ def __init__(self):
+ self.content = PathDict()
+ self.requests_mock = fixture.Fixture()
+ self.requests_mock.setUp()
+ self.requests_mock.register_uri(
+ requests_mock.GET, requests_mock.ANY, content=self.get_cb
+ )
+ self.requests_mock.register_uri(
+ requests_mock.POST, requests_mock.ANY, content=self.post_cb
+ )
+ self.requests_mock.register_uri(
+ requests_mock.HEAD, requests_mock.ANY, content=self.head_cb
+ )
+ self.requests_mock.register_uri(
+ requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb
+ )
- def delete(self, remote_path):
- del self.content[remote_path]
+ def head_cb(self, request, context):
+ if request.path not in self.content:
+ context.status_code = 404
- def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT):
- keys = sorted(self.content.keys())
- if last_file_name is None:
- idx = 0
+ def get_cb(self, request, context):
+ content = None
+ if request.path not in self.content:
+ context.status_code = 404
else:
- idx = keys.index(last_file_name) + 1
- return {"Entries": [{"FullPath": x} for x in keys[idx : idx + limit]]}
+ content = self.content[request.path]
+ if isinstance(content, dict):
+ if "limit" in request.qs:
+ limit = int(request.qs["limit"][0])
+ assert limit > 0
+ else:
+ limit = None
+
+ items = sorted(content.items())
+ if items and "lastfilename" in request.qs:
+ lastfilename = request.qs["lastfilename"][0]
+ # exclude all filenames up to lastfilename
+ items = dropwhile(
+ lambda kv: kv[0].split("/")[-1] <= lastfilename, items
+ )
+
+ if limit:
+ # +1 to easily detect if there are more
+ items = islice(items, limit + 1)
+
+ entries = [
+ {
+ "FullPath": os.path.join(request.path, fname),
+ "Mode": self.MODE_DIR
+ if isinstance(obj, dict)
+ else self.MODE_FILE,
+ }
+ for fname, obj in items
+ ]
+
+ thereismore = False
+ if limit and len(entries) > limit:
+ entries = entries[:limit]
+ thereismore = True
+
+ if entries:
+ lastfilename = entries[-1]["FullPath"].split("/")[-1]
+ else:
+ lastfilename = None
+ text = json.dumps(
+ {
+ "Path": request.path,
+ "Limit": limit,
+ "LastFileName": lastfilename,
+ "ShouldDisplayLoadMore": thereismore,
+ "Entries": entries,
+ }
+ )
+ encoding = get_encoding_from_headers(request.headers) or "utf-8"
+ content = text.encode(encoding)
+ return content
+
+ def post_cb(self, request, context):
+ from requests_toolbelt.multipart import decoder
+
+ multipart_data = decoder.MultipartDecoder(
+ request.body, request.headers["content-type"]
+ )
+ part = multipart_data.parts[0]
+ self.content[request.path] = part.content
+
+ def delete_cb(self, request, context):
+ del self.content[request.path]
class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase):
compression = "none"
+ slicing = ""
def setUp(self):
super().setUp()
self.url = "http://127.0.0.1/test"
- self.storage = WeedObjStorage(url=self.url, compression=self.compression)
- self.storage.wf = MockWeedFiler(self.url)
+ self.storage = WeedObjStorage(
+ url=self.url, compression=self.compression, slicing=self.slicing
+ )
+ self.mock = FilerRequestsMock()
def test_compression(self):
content, obj_id = self.hash_content(b"test compression")
@@ -63,7 +231,7 @@
self.storage.add(content, obj_id=obj_id)
path = self.storage._path(obj_id)
- self.storage.wf.content[path] += b"trailing garbage"
+ self.mock.content[path] += b"trailing garbage"
if self.compression == "none":
with self.assertRaises(Error) as e:
@@ -73,6 +241,13 @@
self.storage.get(obj_id)
assert "trailing data" in e.exception.args[0]
+ def test_slicing(self):
+ content, obj_id = self.hash_content(b"test compression")
+ self.storage.add(content, obj_id=obj_id)
+
+ slicer = PathSlicer("/test", self.slicing)
+ assert slicer.get_path(obj_id.hex()) in self.mock.content
+
class TestWeedObjStorageBz2(TestWeedObjStorage):
compression = "bz2"
@@ -88,3 +263,15 @@
class TestWeedObjStorageZlib(TestWeedObjStorage):
compression = "zlib"
+
+
+class TestWeedObjStorageWithSlicing(TestWeedObjStorage):
+ slicing = "0:2/2:4"
+
+
+class TestWeedObjStorageWithSlicingAndSmallBatch(TestWeedObjStorage):
+ slicing = "0:2/2:4"
+
+ def setUp(self):
+ super().setUp()
+ self.storage.wf.batchsize = 1
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Jun 3, 7:22 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228674
Attached To
D6492: Add support for pathslicing in seaweedfs backend
Event Timeline
Log In to Comment