diff --git a/swh/objstorage/backends/seaweedfs/__init__.py b/swh/objstorage/backends/seaweedfs/__init__.py new file mode 100644 index 0000000..7619e40 --- /dev/null +++ b/swh/objstorage/backends/seaweedfs/__init__.py @@ -0,0 +1 @@ +from .objstorage import SeaweedFilerObjStorage # noqa: F401 diff --git a/swh/objstorage/backends/seaweedfs/http.py b/swh/objstorage/backends/seaweedfs/http.py new file mode 100644 index 0000000..df027ec --- /dev/null +++ b/swh/objstorage/backends/seaweedfs/http.py @@ -0,0 +1,101 @@ +# Copyright (C) 2019-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from typing import Iterator +from urllib.parse import urljoin, urlparse + +import requests + +from swh.objstorage.objstorage import DEFAULT_LIMIT + +LOGGER = logging.getLogger(__name__) + + +class HttpFiler: + """Simple class that encapsulates access to a seaweedfs filer service. + + Objects are expected to be in a single directory. + TODO: handle errors + """ + + def __init__(self, url): + if not url.endswith("/"): + url = url + "/" + self.url = url + self.baseurl = urljoin(url, "/") + self.basepath = urlparse(url).path + + self.session = requests.Session() + self.session.headers["Accept"] = "application/json" + + self.batchsize = DEFAULT_LIMIT + + def build_url(self, path): + assert path == self.basepath or path.startswith(self.basepath) + return urljoin(self.baseurl, path) + + def get(self, remote_path): + url = self.build_url(remote_path) + LOGGER.debug("Get file %s", url) + resp = self.session.get(url) + resp.raise_for_status() + return resp.content + + def exists(self, remote_path): + url = self.build_url(remote_path) + LOGGER.debug("Check file %s", url) + return self.session.head(url).status_code == 200 + + def put(self, fp, remote_path): + url = self.build_url(remote_path) + LOGGER.debug("Put file %s", url) + return self.session.post(url, files={"file": fp}) + + def delete(self, remote_path): + url = self.build_url(remote_path) + LOGGER.debug("Delete file %s", url) + return self.session.delete(url) + + def iterfiles(self, last_file_name: str = "") -> Iterator[str]: + """yield absolute file names + + Args: + last_file_name: if given, starts from the file just after; must + be basename. + + Yields: + absolute file names + + """ + for entry in self._iter_dir(last_file_name): + fullpath = entry["FullPath"] + if entry["Mode"] & 1 << 31: # it's a directory, recurse + # see https://pkg.go.dev/io/fs#FileMode + yield from self.iterfiles(fullpath) + else: + yield fullpath + + def _iter_dir(self, last_file_name: str = ""): + params = {"limit": self.batchsize} + if last_file_name: + params["lastFileName"] = last_file_name + + LOGGER.debug("List directory %s", self.url) + while True: + rsp = self.session.get(self.url, params=params) + if rsp.ok: + dircontent = rsp.json() + if dircontent["Entries"]: + yield from dircontent["Entries"] + if not dircontent["ShouldDisplayLoadMore"]: + break + params["lastFileName"] = dircontent["LastFileName"] + + else: + LOGGER.error( + 'Error listing "%s". [HTTP %d]', self.url, rsp.status_code + ) + break diff --git a/swh/objstorage/backends/seaweed.py b/swh/objstorage/backends/seaweedfs/objstorage.py similarity index 59% rename from swh/objstorage/backends/seaweed.py rename to swh/objstorage/backends/seaweedfs/objstorage.py index 4720063..fa1efb1 100644 --- a/swh/objstorage/backends/seaweed.py +++ b/swh/objstorage/backends/seaweedfs/objstorage.py @@ -1,249 +1,156 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from itertools import islice import logging import os -from typing import Iterator -from urllib.parse import urljoin, urlparse - -import requests from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.objstorage import ( DEFAULT_LIMIT, ObjStorage, compressors, compute_hash, decompressors, ) -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.ERROR) - - -class WeedFiler(object): - """Simple class that encapsulates access to a seaweedfs filer service. - - Objects are expected to be in a single directory. - TODO: handle errors - """ - - def __init__(self, url): - if not url.endswith("/"): - url = url + "/" - self.url = url - self.baseurl = urljoin(url, "/") - self.basepath = urlparse(url).path - - self.session = requests.Session() - self.session.headers["Accept"] = "application/json" - - self.batchsize = DEFAULT_LIMIT - - def build_url(self, path): - assert path == self.basepath or path.startswith(self.basepath) - return urljoin(self.baseurl, path) - - def get(self, remote_path): - url = self.build_url(remote_path) - LOGGER.debug("Get file %s", url) - resp = self.session.get(url) - resp.raise_for_status() - return resp.content +from .http import HttpFiler - def exists(self, remote_path): - url = self.build_url(remote_path) - LOGGER.debug("Check file %s", url) - return self.session.head(url).status_code == 200 - - def put(self, fp, remote_path): - url = self.build_url(remote_path) - LOGGER.debug("Put file %s", url) - return self.session.post(url, files={"file": fp}) - - def delete(self, remote_path): - url = self.build_url(remote_path) - LOGGER.debug("Delete file %s", url) - return self.session.delete(url) - - def iterfiles(self, last_file_name: str = "") -> Iterator[str]: - """yield absolute file names - - Args: - last_file_name: if given, starts from the file just after; must - be basename. - - Yields: - absolute file names - - """ - for entry in self._iter_dir(last_file_name): - fullpath = entry["FullPath"] - if entry["Mode"] & 1 << 31: # it's a directory, recurse - # see https://pkg.go.dev/io/fs#FileMode - yield from self.iterfiles(fullpath) - else: - yield fullpath - - def _iter_dir(self, last_file_name: str = ""): - params = {"limit": self.batchsize} - if last_file_name: - params["lastFileName"] = last_file_name - - LOGGER.debug("List directory %s", self.url) - while True: - rsp = self.session.get(self.url, params=params) - if rsp.ok: - dircontent = rsp.json() - if dircontent["Entries"]: - yield from dircontent["Entries"] - if not dircontent["ShouldDisplayLoadMore"]: - break - params["lastFileName"] = dircontent["LastFileName"] - - else: - LOGGER.error( - 'Error listing "%s". [HTTP %d]' % (self.url, rsp.status_code) - ) - break +LOGGER = logging.getLogger(__name__) -class WeedObjStorage(ObjStorage): +class SeaweedFilerObjStorage(ObjStorage): """ObjStorage with seaweedfs abilities, using the Filer API. https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API """ - def __init__(self, url="http://127.0.0.1:8888/swh", compression=None, **kwargs): + def __init__(self, url, compression=None, **kwargs): super().__init__(**kwargs) - self.wf = WeedFiler(url) - self.root_path = urlparse(url).path - if not self.root_path.endswith("/"): - self.root_path += "/" + self.wf = HttpFiler(url) self.compression = compression def check_config(self, *, check_write): """Check the configuration for this object storage""" # FIXME: hopefully this blew up during instantiation return True def __contains__(self, obj_id): return self.wf.exists(self._path(obj_id)) def __iter__(self): """ Iterate over the objects present in the storage Warning: Iteration over the contents of a cloud-based object storage may have bad efficiency: due to the very high amount of objects in it and the fact that it is remote, get all the contents of the current object storage may result in a lot of network requests. You almost certainly don't want to use this method in production. """ obj_id = last_obj_id = None while True: for obj_id in self.list_content(last_obj_id=last_obj_id): yield obj_id if last_obj_id == obj_id: break last_obj_id = obj_id def __len__(self): """Compute the number of objects in the current object storage. Warning: this currently uses `__iter__`, its warning about bad performance applies. Returns: number of objects contained in the storage. """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): if obj_id is None: # Checksum is missing, compute it on the fly. obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id def compressor(data): comp = compressors[self.compression]() for chunk in data: yield comp.compress(chunk) yield comp.flush() if isinstance(content, bytes): content = [content] # XXX should handle streaming correctly... self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) return obj_id def restore(self, content, obj_id=None): return self.add(content, obj_id, check_presence=False) def get(self, obj_id): try: obj = self.wf.get(self._path(obj_id)) except Exception: raise ObjNotFoundError(obj_id) d = decompressors[self.compression]() ret = d.decompress(obj) if d.unused_data: hex_obj_id = hashutil.hash_to_hex(obj_id) raise Error("Corrupt object %s: trailing data found" % hex_obj_id) return ret def check(self, obj_id): # Check the content integrity obj_content = self.get(obj_id) content_obj_id = compute_hash(obj_content) if content_obj_id != obj_id: raise Error(obj_id) def delete(self, obj_id): super().delete(obj_id) # Check delete permission if obj_id not in self: raise ObjNotFoundError(obj_id) self.wf.delete(self._path(obj_id)) return True def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): if last_obj_id: objid = hashutil.hash_to_hex(last_obj_id) lastfilename = objid else: lastfilename = None for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit): bytehex = fname.rsplit("/", 1)[-1] yield hashutil.bytehex_to_hash(bytehex.encode()) # internal methods def _put_object(self, content, obj_id): """Create an object in the cloud storage. Created object will contain the content and be referenced by the given id. """ def compressor(data): comp = compressors[self.compression]() for chunk in data: yield comp.compress(chunk) yield comp.flush() if isinstance(content, bytes): content = [content] self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) def _path(self, obj_id): return os.path.join(self.wf.basepath, hashutil.hash_to_hex(obj_id)) diff --git a/swh/objstorage/factory.py b/swh/objstorage/factory.py index 316a084..df8bcf6 100644 --- a/swh/objstorage/factory.py +++ b/swh/objstorage/factory.py @@ -1,125 +1,125 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Callable, Dict, Union import warnings from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.backends.generator import RandomGeneratorObjStorage from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.objstorage.backends.pathslicing import PathSlicingObjStorage -from swh.objstorage.backends.seaweed import WeedObjStorage +from swh.objstorage.backends.seaweedfs import SeaweedFilerObjStorage from swh.objstorage.multiplexer import MultiplexerObjStorage, StripingObjStorage from swh.objstorage.multiplexer.filter import add_filters from swh.objstorage.objstorage import ID_HASH_LENGTH, ObjStorage # noqa __all__ = ["get_objstorage", "ObjStorage"] _STORAGE_CLASSES: Dict[str, Union[type, Callable[..., type]]] = { "pathslicing": PathSlicingObjStorage, "remote": RemoteObjStorage, "memory": InMemoryObjStorage, - "seaweedfs": WeedObjStorage, + "seaweedfs": SeaweedFilerObjStorage, "random": RandomGeneratorObjStorage, } _STORAGE_CLASSES_MISSING = {} _STORAGE_CLASSES_DEPRECATED = {"weed": "seaweedfs"} try: from swh.objstorage.backends.azure import ( AzureCloudObjStorage, PrefixedAzureCloudObjStorage, ) _STORAGE_CLASSES["azure"] = AzureCloudObjStorage _STORAGE_CLASSES["azure-prefixed"] = PrefixedAzureCloudObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING["azure"] = e.args[0] _STORAGE_CLASSES_MISSING["azure-prefixed"] = e.args[0] try: from swh.objstorage.backends.rados import RADOSObjStorage _STORAGE_CLASSES["rados"] = RADOSObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING["rados"] = e.args[0] try: from swh.objstorage.backends.libcloud import ( AwsCloudObjStorage, OpenStackCloudObjStorage, ) _STORAGE_CLASSES["s3"] = AwsCloudObjStorage _STORAGE_CLASSES["swift"] = OpenStackCloudObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING["s3"] = e.args[0] _STORAGE_CLASSES_MISSING["swift"] = e.args[0] def get_objstorage(cls: str, args=None, **kwargs): """ Create an ObjStorage using the given implementation class. Args: cls: objstorage class unique key contained in the _STORAGE_CLASSES dict. kwargs: arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ if cls in _STORAGE_CLASSES_DEPRECATED: warnings.warn( f"{cls} objstorage class is deprecated, " f"use {_STORAGE_CLASSES_DEPRECATED[cls]} class instead.", DeprecationWarning, ) cls = _STORAGE_CLASSES_DEPRECATED[cls] if cls in _STORAGE_CLASSES: if args is not None: warnings.warn( 'Explicit "args" key is deprecated for objstorage initialization, ' "use class arguments keys directly instead.", DeprecationWarning, ) # TODO: when removing this, drop the "args" backwards compatibility # from swh.objstorage.api.server configuration checker kwargs = args return _STORAGE_CLASSES[cls](**kwargs) else: raise ValueError( "Storage class {} is not available: {}".format( cls, _STORAGE_CLASSES_MISSING.get(cls, "unknown name") ) ) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters(get_objstorage(**storage_conf), filters_conf) _STORAGE_CLASSES["filtered"] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) _STORAGE_CLASSES["multiplexer"] = _construct_multiplexer_objstorage def _construct_striping_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return StripingObjStorage(storages) _STORAGE_CLASSES["striping"] = _construct_striping_objstorage