diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -51,6 +51,12 @@ _STORAGE_CLASSES_MISSING['s3'] = e.args[0] _STORAGE_CLASSES_MISSING['swift'] = e.args[0] +try: + from swh.objstorage.objstorage_weed import WeedObjStorage + _STORAGE_CLASSES['weed'] = WeedObjStorage +except ImportError as e: + _STORAGE_CLASSES_MISSING['weed'] = e.args[0] + def get_objstorage(cls, args): """ Create an ObjStorage using the given implementation class. diff --git a/swh/objstorage/objstorage_weed.py b/swh/objstorage/objstorage_weed.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/objstorage_weed.py @@ -0,0 +1,200 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import io +import logging +from urllib.parse import urljoin +import functools + +import requests + +from swh.model import hashutil +from swh.objstorage.objstorage import ObjStorage, compute_hash +from swh.objstorage.objstorage import compressors, decompressors +from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE + +from swh.objstorage.exc import ObjNotFoundError, Error + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.CRITICAL) + +DEFAULT_LIMIT = 1000 + + +class WeedFiler(object): + """ weed filer service. + """ + + def __init__(self, url): + self.url = url + + def get(self, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Get file %s', url) + return requests.get(url).content + + def put(self, fp, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Put file %s', url) + return requests.post(url, files={'file': fp}) + + def delete(self, remote_path): + url = urljoin(self.url, remote_path) + LOGGER.debug('Delete file %s', url) + return requests.delete(url) + + def list(self, dir, last_file_name=None, limit=DEFAULT_LIMIT): + '''list sub folders and files of @dir. show a better look if you turn on + + returns a dict of "sub-folders and files' + + ''' + d = dir if dir.endswith('/') else (dir + '/') + url = urljoin(self.url, d) + headers = {'Accept': 'application/json'} + params = {'limit': limit} + if last_file_name: + params['lastFileName'] = last_file_name + + LOGGER.debug('List directory %s', url) + rsp = requests.get(url, params=params, headers=headers) + if rsp.ok: + return rsp.json() + else: + LOGGER.error('Error listing "%s". [HTTP %d]' % ( + url, rsp.status_code)) + + +class WeedObjStorage(ObjStorage): + """ + """ + def __init__(self, url='http://127.0.0.1:8888/swh', + compression=None, **kwargs): + super().__init__(**kwargs) + self.wf = WeedFiler(url) + self.root_path = '/swh' + self.compression = compression + + def check_config(self, *, check_write): + """Check the configuration for this object storage""" + # FIXME: hopefully this blew up during instantiation + return True + + def __contains__(self, obj_id): + try: + self._get_object(obj_id) + except ObjNotFoundError: + return False + else: + return True + + def __iter__(self): + """ Iterate over the objects present in the storage + + Warning: Iteration over the contents of a cloud-based object storage + may have bad efficiency: due to the very high amount of objects in it + and the fact that it is remote, get all the contents of the current + object storage may result in a lot of network requests. + + You almost certainly don't want to use this method in production. + """ + yield from (hashutil.bytehex_to_hash( + obj['FullPath'].rsplit('/', 1)[-1].encode()) for obj in + self.wf.list(self.root_path, limit=1000)['Entries']) + + def __len__(self): + """Compute the number of objects in the current object storage. + + Warning: this currently uses `__iter__`, its warning about bad + performance applies. + + Returns: + number of objects contained in the storage. + + """ + return sum(1 for i in self) + + def add(self, content, obj_id=None, check_presence=True): + if obj_id is None: + # Checksum is missing, compute it on the fly. + obj_id = compute_hash(content) + + if check_presence and obj_id in self: + return obj_id + + self._put_object(content, obj_id) + return obj_id + + def restore(self, content, obj_id=None): + return self.add(content, obj_id, check_presence=False) + + def get(self, obj_id): + return self._get_object(obj_id) + + def check(self, obj_id): + # Check that the file exists, as _get_object raises ObjNotFoundError + self._get_object(obj_id) + # Check the content integrity + obj_content = self.get(obj_id) + content_obj_id = compute_hash(obj_content) + if content_obj_id != obj_id: + raise Error(obj_id) + + def delete(self, obj_id): + super().delete(obj_id) # Check delete permission + self.wf.delete(self.path(obj_id)) + + def _get_object(self, obj_id): + """Get a Libcloud wrapper for an object pointer. + + This wrapper does not retrieve the content of the object + directly. + + """ + try: + obj = self.wf.get(self.path(obj_id)) + return decompressors[self.compression](obj) + except Exception: + raise ObjNotFoundError(obj_id) + + def _put_object(self, content, obj_id): + """Create an object in the cloud storage. + + Created object will contain the content and be referenced by + the given id. + + """ + def compressor(data): + comp = compressors[self.compression]() + for chunk in data: + yield comp.compress(chunk) + yield comp.flush() + + if isinstance(content, bytes): + content = [content] + self.wf.put(io.BytesIO(b''.join(compressor(content))), + self.path(obj_id)) + + def path(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return os.path.join(self.root_path, hex_obj_id) + + def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): + data = io.BytesIO(self.get(obj_id)) + reader = functools.partial(data.read, chunk_size) + yield from iter(reader, b'') + + def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): + if last_obj_id: + last_obj_id = hashutil.hash_to_hex(last_obj_id) + resp = self.wf.list(self.root_path, last_obj_id, limit) + if resp is not None: + return (obj_id for obj_id in ( + hashutil.bytehex_to_hash( + obj['FullPath'].rsplit('/', 1)[-1].encode()) for obj in + resp['Entries'])) + else: + return ()