diff --git a/mypy.ini b/mypy.ini index 6535cfa..ac539ab 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,21 +1,24 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-azure.*] ignore_missing_imports = True [mypy-libcloud.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-rados.*] ignore_missing_imports = True + +[mypy-requests_toolbelt.*] +ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt index 424fc71..afd3efc 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,8 @@ apache-libcloud azure-storage-blob >= 12.0, != 12.9.0 # version 12.9.0 breaks mypy https://github.com/Azure/azure-sdk-for-python/pull/20891 pytest python-cephlibs +requests_mock[fixture] >= 1.9 +requests_toolbelt types-pyyaml types-requests diff --git a/requirements.txt b/requirements.txt index 0f20a59..e546330 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,12 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html # remote storage API server aiohttp >= 3 click +requests # optional dependencies # apache-libcloud # azure-storage-blob >= 12.0 diff --git a/swh/objstorage/backends/seaweed.py b/swh/objstorage/backends/seaweed.py index 381a105..e905bff 100644 --- a/swh/objstorage/backends/seaweed.py +++ b/swh/objstorage/backends/seaweed.py @@ -1,209 +1,211 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io import logging from urllib.parse import urljoin, urlparse import requests from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.objstorage import ( DEFAULT_LIMIT, ObjStorage, compressors, compute_hash, decompressors, ) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.ERROR) class WeedFiler(object): """Simple class that encapsulates access to a seaweedfs filer service. TODO: handle errors """ def __init__(self, url): self.url = url def get(self, remote_path): url = urljoin(self.url, remote_path) LOGGER.debug("Get file %s", url) - return requests.get(url).content + resp = requests.get(url) + resp.raise_for_status() + return resp.content def exists(self, remote_path): url = urljoin(self.url, remote_path) LOGGER.debug("Check file %s", url) return requests.head(url).status_code == 200 def put(self, fp, remote_path): url = urljoin(self.url, remote_path) LOGGER.debug("Put file %s", url) return requests.post(url, files={"file": fp}) def delete(self, remote_path): url = urljoin(self.url, remote_path) LOGGER.debug("Delete file %s", url) return requests.delete(url) def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): """list sub folders and files of @dir. show a better look if you turn on returns a dict of "sub-folders and files" """ d = dir if dir.endswith("/") else (dir + "/") url = urljoin(self.url, d) headers = {"Accept": "application/json"} params = {"limit": limit} if last_file_name: params["lastFileName"] = last_file_name LOGGER.debug("List directory %s", url) rsp = requests.get(url, params=params, headers=headers) if rsp.ok: return rsp.json() else: LOGGER.error('Error listing "%s". [HTTP %d]' % (url, rsp.status_code)) class WeedObjStorage(ObjStorage): """ObjStorage with seaweedfs abilities, using the Filer API. https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API """ def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): super().__init__(**kwargs) self.wf = WeedFiler(url) self.root_path = urlparse(url).path self.compression = compression def check_config(self, *, check_write): """Check the configuration for this object storage""" # FIXME: hopefully this blew up during instantiation return True def __contains__(self, obj_id): return self.wf.exists(self._path(obj_id)) def __iter__(self): """ Iterate over the objects present in the storage Warning: Iteration over the contents of a cloud-based object storage may have bad efficiency: due to the very high amount of objects in it and the fact that it is remote, get all the contents of the current object storage may result in a lot of network requests. You almost certainly don't want to use this method in production. """ obj_id = last_obj_id = None while True: for obj_id in self.list_content(last_obj_id=last_obj_id): yield obj_id if last_obj_id == obj_id: break last_obj_id = obj_id def __len__(self): """Compute the number of objects in the current object storage. Warning: this currently uses `__iter__`, its warning about bad performance applies. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id def compressor(data): comp = compressors[self.compression]() for chunk in data: yield comp.compress(chunk) yield comp.flush() if isinstance(content, bytes): content = [content] # XXX should handle streaming correctly... self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) return obj_id def restore(self, content, obj_id=None): return self.add(content, obj_id, check_presence=False) def get(self, obj_id): try: obj = self.wf.get(self._path(obj_id)) except Exception: raise ObjNotFoundError(obj_id) d = decompressors[self.compression]() ret = d.decompress(obj) if d.unused_data: hex_obj_id = hashutil.hash_to_hex(obj_id) raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret def check(self, obj_id): # Check the content integrity obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): super().delete(obj_id) # Check delete permission if obj_id not in self: raise ObjNotFoundError(obj_id) self.wf.delete(self._path(obj_id)) return True def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): if last_obj_id: last_obj_id = hashutil.hash_to_hex(last_obj_id) resp = self.wf.list(self.root_path, last_obj_id, limit) if resp is not None: entries = resp["Entries"] if entries: for obj in entries: if obj is not None: bytehex = obj["FullPath"].rsplit("/", 1)[-1] yield hashutil.bytehex_to_hash(bytehex.encode()) # internal methods def _put_object(self, content, obj_id): """Create an object in the cloud storage. Created object will contain the content and be referenced by the given id. """ def compressor(data): comp = compressors[self.compression]() for chunk in data: yield comp.compress(chunk) yield comp.flush() if isinstance(content, bytes): content = [content] self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) def _path(self, obj_id): return hashutil.hash_to_hex(obj_id) diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py index ef04beb..6183955 100644 --- a/swh/objstorage/tests/test_objstorage_seaweedfs.py +++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py @@ -1,90 +1,172 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from itertools import dropwhile, islice +import json +import os import unittest +from urllib.parse import urlparse -from swh.objstorage.backends.seaweed import DEFAULT_LIMIT, WeedObjStorage +from requests.utils import get_encoding_from_headers +import requests_mock +from requests_mock.contrib import fixture + +from swh.objstorage.backends.seaweed import WeedObjStorage from swh.objstorage.exc import Error from swh.objstorage.objstorage import decompressors from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture -class MockWeedFiler: - """ WeedFiler mock that replicates its API """ +class FilerRequestsMock: + """This is a requests_mock based mock for the seaweedfs Filer API - def __init__(self, url): - self.url = url - self.content = {} + It does not implement the whole API, only the parts required to make the + WeedFiler (used by WeedObjStorage) work. + + It stores the files in a dict. + """ - def get(self, remote_path): - return self.content[remote_path] + MODE_DIR = 0o20000000771 + MODE_FILE = 0o660 - def put(self, fp, remote_path): - self.content[remote_path] = fp.read() + def __init__(self, baseurl): + self.baseurl = baseurl + self.basepath = urlparse(baseurl).path + self.content = {} + self.requests_mock = fixture.Fixture() + self.requests_mock.setUp() + self.requests_mock.register_uri( + requests_mock.GET, requests_mock.ANY, content=self.get_cb + ) + self.requests_mock.register_uri( + requests_mock.POST, requests_mock.ANY, content=self.post_cb + ) + self.requests_mock.register_uri( + requests_mock.HEAD, requests_mock.ANY, content=self.head_cb + ) + self.requests_mock.register_uri( + requests_mock.DELETE, requests_mock.ANY, content=self.delete_cb + ) + + def relpath(self, path): + if path.startswith(self.basepath): + return os.path.relpath(path, self.basepath) + + def head_cb(self, request, context): + relpath = self.relpath(request.path) + if relpath == "." or relpath in self.content: + return b"Found" # ok, found it + context.status_code = 404 + return b"Not Found" + + def get_cb(self, request, context): + if self.head_cb(request, context) == b"Not Found": + return + relpath = self.relpath(request.path) + if relpath == ".": + if "limit" in request.qs: + limit = int(request.qs["limit"][0]) + assert limit > 0 + else: + limit = None + + items = sorted(self.content.items()) + if items and "lastfilename" in request.qs: + lastfilename = request.qs["lastfilename"][0] + if lastfilename: + # exclude all filenames up to lastfilename + items = dropwhile(lambda kv: kv[0] <= lastfilename, items) + + if limit: + # +1 to easily detect if there are more + items = islice(items, limit + 1) + + entries = [ + {"FullPath": os.path.join(request.path, fname), "Mode": self.MODE_FILE,} + for fname, obj in items + ] + + thereismore = False + if limit and len(entries) > limit: + entries = entries[:limit] + thereismore = True + + if entries: + lastfilename = entries[-1]["FullPath"].split("/")[-1] + else: + lastfilename = None + text = json.dumps( + { + "Path": request.path, + "Limit": limit, + "LastFileName": lastfilename, + "ShouldDisplayLoadMore": thereismore, + "Entries": entries, + } + ) + encoding = get_encoding_from_headers(request.headers) or "utf-8" + return text.encode(encoding) + else: + return self.content[relpath] - def exists(self, remote_path): - return remote_path in self.content + def post_cb(self, request, context): + from requests_toolbelt.multipart import decoder - def delete(self, remote_path): - del self.content[remote_path] + multipart_data = decoder.MultipartDecoder( + request.body, request.headers["content-type"] + ) + part = multipart_data.parts[0] + self.content[self.relpath(request.path)] = part.content - def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): - keys = sorted(self.content.keys()) - if last_file_name is None: - idx = 0 - else: - idx = keys.index(last_file_name) + 1 - return {"Entries": [{"FullPath": x} for x in keys[idx : idx + limit]]} + def delete_cb(self, request, context): + del self.content[self.relpath(request.path)] class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): compression = "none" + url = "http://127.0.0.1/test/" def setUp(self): super().setUp() - self.url = "http://127.0.0.1/test" self.storage = WeedObjStorage(url=self.url, compression=self.compression) - self.storage.wf = MockWeedFiler(self.url) + self.mock = FilerRequestsMock(baseurl=self.url) def test_compression(self): content, obj_id = self.hash_content(b"test compression") self.storage.add(content, obj_id=obj_id) raw_content = self.storage.wf.get(self.storage._path(obj_id)) d = decompressors[self.compression]() assert d.decompress(raw_content) == content assert d.unused_data == b"" def test_trailing_data_on_stored_blob(self): content, obj_id = self.hash_content(b"test content without garbage") self.storage.add(content, obj_id=obj_id) - path = self.storage._path(obj_id) - self.storage.wf.content[path] += b"trailing garbage" + self.mock.content[obj_id.hex()] += b"trailing garbage" if self.compression == "none": with self.assertRaises(Error) as e: self.storage.check(obj_id) else: with self.assertRaises(Error) as e: self.storage.get(obj_id) assert "trailing data" in e.exception.args[0] -class TestWeedObjStorageBz2(TestWeedObjStorage): - compression = "bz2" - - -class TestWeedObjStorageGzip(TestWeedObjStorage): - compression = "gzip" +class TestWeedObjStorageWithCompression(TestWeedObjStorage): + compression = "lzma" -class TestWeedObjStorageLzma(TestWeedObjStorage): - compression = "lzma" +class TestWeedObjStorageWithSmallBatch(TestWeedObjStorage): + def setUp(self): + super().setUp() + self.storage.wf.batchsize = 1 -class TestWeedObjStorageZlib(TestWeedObjStorage): - compression = "zlib" +class TestWeedObjStorageWithNoPath(TestWeedObjStorage): + url = "http://127.0.0.1/"