diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py index bc4dd77..d1c37e4 100644 --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,67 +1,73 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .objstorage import ObjStorage from .objstorage_pathslicing import PathSlicingObjStorage from .objstorage_in_memory import InMemoryObjStorage from .api.client import RemoteObjStorage from .multiplexer import MultiplexerObjStorage from .multiplexer.filter import add_filters __all__ = ['get_objstorage', 'ObjStorage'] _STORAGE_CLASSES = { 'pathslicing': PathSlicingObjStorage, 'remote': RemoteObjStorage, 'in-memory': InMemoryObjStorage, } try: from swh.objstorage.cloud.objstorage_azure import AzureCloudObjStorage _STORAGE_CLASSES['azure-storage'] = AzureCloudObjStorage except ImportError: pass +try: + from swh.objstorage.objstorage_rados import RADOSObjStorage + _STORAGE_CLASSES['rados'] = RADOSObjStorage +except ImportError: + pass + def get_objstorage(cls, args): """ Create an ObjStorage using the given implementation class. Args: cls (str): objstorage class unique key contained in the _STORAGE_CLASSES dict. args (dict): arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ try: return _STORAGE_CLASSES[cls](**args) except KeyError: raise ValueError('Storage class %s does not exist' % cls) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters( get_objstorage(**storage_conf), filters_conf ) _STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) _STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage diff --git a/swh/objstorage/objstorage_rados.py b/swh/objstorage/objstorage_rados.py new file mode 100644 index 0000000..82e41ca --- /dev/null +++ b/swh/objstorage/objstorage_rados.py @@ -0,0 +1,87 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import rados + +from swh.model import hashutil + +from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage import objstorage + +READ_SIZE = 8192 + + +class RADOSObjStorage(objstorage.ObjStorage): + """Object storage implemented with RADOS""" + + def __init__(self, *, rados_id, pool_name, ceph_config, + allow_delete=False): + super().__init__(allow_delete=allow_delete) + self.pool_name = pool_name + self.cluster = rados.Rados(rados_id=rados_id, conf=ceph_config) + self.cluster.connect() + self.__ioctx = None + + def check_config(self, *, check_write): + if self.pool_name not in self.cluster.list_pools(): + raise ValueError('Pool %s does not exist' % self.pool_name) + + @staticmethod + def _to_rados_obj_id(obj_id): + """Convert to a RADOS object identifier""" + return hashutil.hash_to_hex(obj_id) + + @property + def ioctx(self): + if not self.__ioctx: + self.__ioctx = self.cluster.open_ioctx(self.pool_name) + return self.__ioctx + + def __contains__(self, obj_id): + try: + self.ioctx.stat(self._to_rados_obj_id(obj_id)) + except rados.ObjectNotFound: + return False + else: + return True + + def add(self, content, obj_id=None, check_presence=True): + if not obj_id: + raise ValueError('add needs an obj_id') + + _obj_id = self._to_rados_obj_id(obj_id) + + if check_presence: + try: + self.ioctx.stat(_obj_id) + except rados.ObjectNotFound: + pass + else: + return obj_id + self.ioctx.write_full(_obj_id, content) + + return obj_id + + def get(self, obj_id): + chunks = [] + _obj_id = self._to_rados_obj_id(obj_id) + try: + length, mtime = self.ioctx.stat(_obj_id) + except rados.ObjectNotFound: + raise ObjNotFoundError(obj_id) from None + offset = 0 + while offset < length: + chunk = self.ioctx.read(_obj_id, offset, READ_SIZE) + chunks.append(chunk) + offset += len(chunk) + + return b''.join(chunks) + + def check(self, obj_id): + return True + + def delete(self, obj_id): + super().delete(obj_id) # check delete permission + return True