diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -10,6 +10,7 @@ from .multiplexer import MultiplexerObjStorage, StripingObjStorage from .multiplexer.filter import add_filters +from swh.objstorage.objstorage_weed import WeedObjStorage __all__ = ['get_objstorage', 'ObjStorage'] @@ -18,6 +19,7 @@ 'pathslicing': PathSlicingObjStorage, 'remote': RemoteObjStorage, 'memory': InMemoryObjStorage, + 'weed': WeedObjStorage, } _STORAGE_CLASSES_MISSING = { diff --git a/swh/objstorage/objstorage_weed.py b/swh/objstorage/objstorage_weed.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/objstorage_weed.py @@ -0,0 +1,202 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import io +import logging +from urllib.parse import urljoin, urlparse + +import requests + +from swh.model import hashutil +from swh.objstorage.objstorage import ObjStorage, compute_hash +from swh.objstorage.objstorage import compressors, decompressors +from swh.objstorage.objstorage import DEFAULT_LIMIT + +from swh.objstorage.exc import ObjNotFoundError, Error + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.ERROR) + + +class WeedFiler(object): + """Simple class that encapsulates access to a seaweedfs filer service. + + TODO: handle errors + """ + + def __init__(self, url): + self.url = url + + def get(self, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Get file %s', url) + return requests.get(url).content + + def exists(self, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Check file %s', url) + return requests.head(url).status_code == 200 + + def put(self, fp, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Put file %s', url) + return requests.post(url, files={'file': fp}) + + def delete(self, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Delete file %s', url) + return requests.delete(url) + + def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): + '''list sub folders and files of @dir. show a better look if you turn on + + returns a dict of "sub-folders and files' + + ''' + d = dir if dir.endswith('/') else (dir + '/') + url = urljoin(self.url, d) + headers = {'Accept': 'application/json'} + params = {'limit': limit} + if last_file_name: + params['lastFileName'] = last_file_name + + LOGGER.debug('List directory %s', url) + rsp = requests.get(url, params=params, headers=headers) + if rsp.ok: + return rsp.json() + else: + LOGGER.error('Error listing "%s". [HTTP %d]' % ( + url, rsp.status_code)) + + +class WeedObjStorage(ObjStorage): + """ObjStorage with seaweedfs abilities, using the Filer API. + + https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API + """ + def __init__(self, url='http://127.0.0.1:8888/swh', + compression=None, **kwargs): + super().__init__(**kwargs) + self.wf = WeedFiler(url) + self.root_path = urlparse(url).path + self.compression = compression + + def check_config(self, *, check_write): + """Check the configuration for this object storage""" + # FIXME: hopefully this blew up during instantiation + return True + + def __contains__(self, obj_id): + return self.wf.exists(self._path(obj_id)) + + def __iter__(self): + """ Iterate over the objects present in the storage + + Warning: Iteration over the contents of a cloud-based object storage + may have bad efficiency: due to the very high amount of objects in it + and the fact that it is remote, get all the contents of the current + object storage may result in a lot of network requests. + + You almost certainly don't want to use this method in production. + """ + obj_id = last_obj_id = None + while True: + for obj_id in self.list_content(last_obj_id=last_obj_id): + yield obj_id + if last_obj_id == obj_id: + break + last_obj_id = obj_id + + def __len__(self): + """Compute the number of objects in the current object storage. + + Warning: this currently uses `__iter__`, its warning about bad + performance applies. + + Returns: + number of objects contained in the storage. + + """ + return sum(1 for i in self) + + def add(self, content, obj_id=None, check_presence=True): + if obj_id is None: + # Checksum is missing, compute it on the fly. + obj_id = compute_hash(content) + + if check_presence and obj_id in self: + return obj_id + + def compressor(data): + comp = compressors[self.compression]() + for chunk in data: + yield comp.compress(chunk) + yield comp.flush() + + if isinstance(content, bytes): + content = [content] + + # XXX should handle streaming correctly... + self.wf.put(io.BytesIO(b''.join(compressor(content))), + self._path(obj_id)) + return obj_id + + def restore(self, content, obj_id=None): + return self.add(content, obj_id, check_presence=False) + + def get(self, obj_id): + try: + obj = self.wf.get(self._path(obj_id)) + return decompressors[self.compression](obj) + except Exception: + raise ObjNotFoundError(obj_id) + + def check(self, obj_id): + # Check the content integrity + obj_content = self.get(obj_id) + content_obj_id = compute_hash(obj_content) + if content_obj_id != obj_id: + raise Error(obj_id) + + def delete(self, obj_id): + super().delete(obj_id) # Check delete permission + if obj_id not in self: + raise ObjNotFoundError(obj_id) + self.wf.delete(self._path(obj_id)) + return True + + def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): + if last_obj_id: + last_obj_id = hashutil.hash_to_hex(last_obj_id) + resp = self.wf.list(self.root_path, last_obj_id, limit) + if resp is not None: + entries = resp['Entries'] + if entries: + for obj in entries: + if obj is not None: + bytehex = obj['FullPath'].rsplit('/', 1)[-1] + yield hashutil.bytehex_to_hash(bytehex.encode()) + + # internal methods + def _put_object(self, content, obj_id): + """Create an object in the cloud storage. + + Created object will contain the content and be referenced by + the given id. + + """ + def compressor(data): + comp = compressors[self.compression]() + for chunk in data: + yield comp.compress(chunk) + yield comp.flush() + + if isinstance(content, bytes): + content = [content] + self.wf.put(io.BytesIO(b''.join(compressor(content))), + self._path(obj_id)) + + def _path(self, obj_id): + return hashutil.hash_to_hex(obj_id) diff --git a/swh/objstorage/tests/test_objstorage_seaweedfs.py b/swh/objstorage/tests/test_objstorage_seaweedfs.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_seaweedfs.py @@ -0,0 +1,45 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from swh.objstorage.backends.seaweed import WeedObjStorage, DEFAULT_LIMIT +from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture + + +class MockWeedFiler: + """ WeedFiler mock that replicates its API """ + def __init__(self, url): + self.url = url + self.content = {} + + def get(self, remote_path): + return self.content[remote_path] + + def put(self, fp, remote_path): + self.content[remote_path] = fp.read() + + def exists(self, remote_path): + return remote_path in self.content + + def delete(self, remote_path): + del self.content[remote_path] + + def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): + keys = sorted(self.content.keys()) + if last_file_name is None: + idx = 0 + else: + idx = keys.index(last_file_name) + 1 + return {'Entries': [{'FullPath': x} for x in keys[idx:idx+limit]]} + + +class TestWeedObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.url = 'http://127.0.0.1/test' + self.storage = WeedObjStorage(url=self.url) + self.storage.wf = MockWeedFiler(self.url)