diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -11,6 +11,9 @@ [mypy-libcloud.*] ignore_missing_imports = True +[mypy-msgpack.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click +msgpack typing-extensions >= 3.7.4 # seaweedfs backend diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -5,12 +5,13 @@ from typing import Iterator, Optional +import msgpack + from swh.core.api import RPCClient -from swh.core.utils import iter_chunks from swh.model import hashutil -from swh.objstorage.constants import DEFAULT_LIMIT, ID_DIGEST_LENGTH +from swh.objstorage.constants import DEFAULT_LIMIT from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError -from swh.objstorage.interface import ObjId, ObjStorageInterface +from swh.objstorage.interface import CompositeObjId, ObjId, ObjStorageInterface class RemoteObjStorage(RPCClient): @@ -33,17 +34,22 @@ def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId) -> None: return self.add(content, obj_id, check_presence=False) - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: yield from self.list_content() def list_content( self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT, - ) -> Iterator[ObjId]: + ) -> Iterator[CompositeObjId]: params = {"limit": limit} if last_obj_id: params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id) - yield from iter_chunks( - self._get_stream("content", params=params), chunk_size=ID_DIGEST_LENGTH + response = self.raw_verb( + "get", + "content", + headers={"accept": "application/x-msgpack"}, + params=params, + stream=True, ) + yield from msgpack.Unpacker(response.raw, raw=True) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -7,8 +7,10 @@ import functools import logging import os +from typing import Iterator from flask import request +import msgpack from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data @@ -98,9 +100,11 @@ last_obj_id = bytes.fromhex(last_obj_id) limit = int(request.args.get("limit", DEFAULT_LIMIT)) - def generate(): + def generate() -> Iterator[bytes]: with timed_context("list_content"): - yield from get_objstorage().list_content(last_obj_id, limit=limit) + packer = msgpack.Packer(use_bin_type=True) + for obj in get_objstorage().list_content(last_obj_id, limit=limit): + yield packer.pack(obj) return app.response_class(generate()) diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py --- a/swh/objstorage/backends/azure.py +++ b/swh/objstorage/backends/azure.py @@ -21,7 +21,7 @@ from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.interface import ObjId +from swh.objstorage.interface import CompositeObjId, ObjId from swh.objstorage.objstorage import ( ObjStorage, compressors, @@ -180,7 +180,7 @@ return True - def __contains__(self, obj_id): + def __contains__(self, obj_id: ObjId) -> bool: """Does the storage contains the obj_id.""" hex_obj_id = self._internal_id(obj_id) client = self.get_blob_client(hex_obj_id) @@ -191,7 +191,7 @@ else: return True - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: """Iterate over the objects present in the storage.""" for client in self.get_all_container_clients(): for obj in client.list_blobs(): diff --git a/swh/objstorage/backends/generator.py b/swh/objstorage/backends/generator.py --- a/swh/objstorage/backends/generator.py +++ b/swh/objstorage/backends/generator.py @@ -1,9 +1,10 @@ from itertools import count, islice, repeat import logging import random -from typing import Iterator, Optional +from typing import Generator, Iterator, Optional, cast -from swh.objstorage.interface import ObjId +from swh.objstorage.constants import ID_HASH_ALGO +from swh.objstorage.interface import CompositeObjId, ObjId from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage logger = logging.getLogger(__name__) @@ -189,10 +190,10 @@ def __contains__(self, obj_id, *args, **kwargs): return False - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: i = 1 while True: - j = yield (b"%d" % i) + j = yield {ID_HASH_ALGO: b"%d" % i} if self.total and i >= self.total: logger.debug("DONE") break @@ -217,8 +218,10 @@ self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT, - ) -> Iterator[ObjId]: - it = iter(self) + ) -> Iterator[CompositeObjId]: + if isinstance(last_obj_id, dict): + last_obj_id = last_obj_id[ID_HASH_ALGO] + it = cast(Generator[CompositeObjId, int, None], iter(self)) if last_obj_id: next(it) it.send(int(last_obj_id)) diff --git a/swh/objstorage/backends/http.py b/swh/objstorage/backends/http.py --- a/swh/objstorage/backends/http.py +++ b/swh/objstorage/backends/http.py @@ -11,7 +11,7 @@ from swh.model import hashutil from swh.objstorage import exc -from swh.objstorage.interface import ObjId +from swh.objstorage.interface import CompositeObjId, ObjId from swh.objstorage.objstorage import ( DEFAULT_LIMIT, ObjStorage, @@ -45,11 +45,11 @@ """Check the configuration for this object storage""" return True - def __contains__(self, obj_id): + def __contains__(self, obj_id: ObjId) -> bool: resp = self.session.head(self._path(obj_id)) return resp.status_code == 200 - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: raise exc.NonIterableObjStorage("__iter__") def __len__(self): @@ -68,7 +68,7 @@ self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT, - ) -> Iterator[ObjId]: + ) -> Iterator[CompositeObjId]: raise exc.NonIterableObjStorage("__len__") def get(self, obj_id: ObjId) -> bytes: diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py --- a/swh/objstorage/backends/in_memory.py +++ b/swh/objstorage/backends/in_memory.py @@ -3,9 +3,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Iterator + from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.interface import ObjId -from swh.objstorage.objstorage import ObjStorage, compute_hash +from swh.objstorage.interface import CompositeObjId, ObjId +from swh.objstorage.objstorage import ObjStorage, compute_hash, objid_to_default_hex class InMemoryObjStorage(ObjStorage): @@ -22,10 +24,10 @@ def check_config(self, *, check_write): return True - def __contains__(self, obj_id): + def __contains__(self, obj_id: ObjId) -> bool: return obj_id in self.state - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: return iter(sorted(self.state)) def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: @@ -44,7 +46,7 @@ if obj_id not in self: raise ObjNotFoundError(obj_id) if compute_hash(self.state[obj_id]) != obj_id: - raise Error("Corrupt object %s" % obj_id.hex()) + raise Error("Corrupt object %s" % objid_to_default_hex(obj_id)) def delete(self, obj_id: ObjId): super().delete(obj_id) # Check delete permission diff --git a/swh/objstorage/backends/libcloud.py b/swh/objstorage/backends/libcloud.py --- a/swh/objstorage/backends/libcloud.py +++ b/swh/objstorage/backends/libcloud.py @@ -5,8 +5,7 @@ import abc from collections import OrderedDict -from collections.abc import Iterator -from typing import Optional +from typing import Iterator, Optional from urllib.parse import urlencode from libcloud.storage import providers @@ -15,7 +14,7 @@ from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.interface import ObjId +from swh.objstorage.interface import CompositeObjId, ObjId from swh.objstorage.objstorage import ( ObjStorage, compressors, @@ -116,7 +115,7 @@ # FIXME: hopefully this blew up during instantiation return True - def __contains__(self, obj_id): + def __contains__(self, obj_id: ObjId) -> bool: try: self._get_object(obj_id) except ObjNotFoundError: @@ -124,7 +123,7 @@ else: return True - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: """Iterate over the objects present in the storage Warning: Iteration over the contents of a cloud-based object storage diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py --- a/swh/objstorage/backends/pathslicing.py +++ b/swh/objstorage/backends/pathslicing.py @@ -12,8 +12,13 @@ from swh.model import hashutil from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.interface import ObjId -from swh.objstorage.objstorage import ObjStorage, compressors, decompressors +from swh.objstorage.interface import CompositeObjId, ObjId +from swh.objstorage.objstorage import ( + ObjStorage, + compressors, + decompressors, + objid_to_default_hex, +) BUFSIZ = 1048576 @@ -182,10 +187,10 @@ return True def __contains__(self, obj_id: ObjId) -> bool: - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) return os.path.isfile(self.slicer.get_path(hex_obj_id)) - def __iter__(self) -> Iterator[bytes]: + def __iter__(self) -> Iterator[CompositeObjId]: """Iterate over the object identifiers currently available in the storage. @@ -231,7 +236,7 @@ # If the object is already present, return immediately. return - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) compressor = compressors[self.compression]() with self._write_obj_file(hex_obj_id) as f: f.write(compressor.compress(content)) @@ -242,7 +247,7 @@ raise ObjNotFoundError(obj_id) # Open the file and return its content as bytes - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) d = decompressors[self.compression]() with open(self.slicer.get_path(hex_obj_id), "rb") as f: out = d.decompress(f.read()) @@ -257,7 +262,7 @@ try: data = self.get(obj_id) except OSError: - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) raise Error( "Corrupt object %s: not a proper compressed file" % hex_obj_id, ) @@ -266,13 +271,13 @@ data, hash_names=[ID_HASH_ALGO] ).digest() - actual_obj_id = checksums[ID_HASH_ALGO] - hex_obj_id = hashutil.hash_to_hex(obj_id) + actual_obj_sha1 = checksums[ID_HASH_ALGO] + hex_obj_id = objid_to_default_hex(obj_id) - if hex_obj_id != hashutil.hash_to_hex(actual_obj_id): + if hex_obj_id != hashutil.hash_to_hex(actual_obj_sha1): raise Error( "Corrupt object %s should have id %s" - % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) + % (objid_to_default_hex(obj_id), hashutil.hash_to_hex(actual_obj_sha1)) ) def delete(self, obj_id: ObjId): @@ -280,7 +285,7 @@ if obj_id not in self: raise ObjNotFoundError(obj_id) - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) try: os.remove(self.slicer.get_path(hex_obj_id)) except FileNotFoundError: @@ -291,7 +296,7 @@ @contextmanager def chunk_writer(self, obj_id): - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) compressor = compressors[self.compression]() with self._write_obj_file(hex_obj_id) as f: yield lambda c: f.write(compressor.compress(c)) @@ -299,7 +304,7 @@ def list_content( self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT - ) -> Iterator[ObjId]: + ) -> Iterator[CompositeObjId]: if last_obj_id: it = self.iter_from(last_obj_id) else: @@ -307,7 +312,7 @@ return islice(it, limit) def iter_from(self, obj_id, n_leaf=False): - hex_obj_id = hashutil.hash_to_hex(obj_id) + hex_obj_id = objid_to_default_hex(obj_id) slices = self.slicer.get_slices(hex_obj_id) rlen = len(self.root.split("/")) diff --git a/swh/objstorage/backends/seaweedfs/objstorage.py b/swh/objstorage/backends/seaweedfs/objstorage.py --- a/swh/objstorage/backends/seaweedfs/objstorage.py +++ b/swh/objstorage/backends/seaweedfs/objstorage.py @@ -11,13 +11,14 @@ from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.interface import ObjId +from swh.objstorage.interface import CompositeObjId, ObjId from swh.objstorage.objstorage import ( DEFAULT_LIMIT, ObjStorage, compressors, compute_hash, decompressors, + objid_to_default_hex, ) from .http import HttpFiler @@ -41,10 +42,10 @@ # FIXME: hopefully this blew up during instantiation return True - def __contains__(self, obj_id): + def __contains__(self, obj_id: ObjId) -> bool: return self.wf.exists(self._path(obj_id)) - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: """Iterate over the objects present in the storage Warning: Iteration over the contents of a cloud-based object storage @@ -123,9 +124,9 @@ self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT, - ) -> Iterator[ObjId]: + ) -> Iterator[CompositeObjId]: if last_obj_id: - objid = hashutil.hash_to_hex(last_obj_id) + objid = objid_to_default_hex(last_obj_id) lastfilename = objid else: lastfilename = None diff --git a/swh/objstorage/interface.py b/swh/objstorage/interface.py --- a/swh/objstorage/interface.py +++ b/swh/objstorage/interface.py @@ -3,15 +3,24 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterator, List, Optional +from typing import Dict, Iterator, List, Optional, Union -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Protocol, TypedDict, runtime_checkable from swh.core.api import remote_api_endpoint from swh.objstorage.constants import DEFAULT_LIMIT -ObjId = bytes -"""Type of object ids, which should be a sha1 hash.""" + +class CompositeObjId(TypedDict, total=False): + sha1: bytes + sha1_git: bytes + sha256: bytes + blake2s256: bytes + + +ObjId = Union[bytes, CompositeObjId] +"""Type of object ids, which should be ``{hash: value for hash in SUPPORTED_HASHES}``; +but single sha1 hashes are supported for legacy clients""" @runtime_checkable @@ -172,12 +181,12 @@ """ ... - def __iter__(self) -> Iterator[ObjId]: + def __iter__(self) -> Iterator[CompositeObjId]: ... def list_content( self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT - ) -> Iterator[ObjId]: + ) -> Iterator[CompositeObjId]: """Generates known object ids. Args: diff --git a/swh/objstorage/multiplexer/filter/filter.py b/swh/objstorage/multiplexer/filter/filter.py --- a/swh/objstorage/multiplexer/filter/filter.py +++ b/swh/objstorage/multiplexer/filter/filter.py @@ -3,6 +3,9 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Iterator + +from swh.objstorage.interface import CompositeObjId from swh.objstorage.objstorage import ObjStorage @@ -39,7 +42,7 @@ def __contains__(self, *args, **kwargs): return self.storage.__contains__(*args, **kwargs) - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: """Iterates over the content of each storages Warning: The `__iter__` methods frequently have bad performance. You diff --git a/swh/objstorage/multiplexer/filter/id_filter.py b/swh/objstorage/multiplexer/filter/id_filter.py --- a/swh/objstorage/multiplexer/filter/id_filter.py +++ b/swh/objstorage/multiplexer/filter/id_filter.py @@ -5,9 +5,11 @@ import abc import re +from typing import Iterator from swh.model import hashutil from swh.objstorage.exc import ObjNotFoundError +from swh.objstorage.interface import CompositeObjId from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter @@ -33,7 +35,7 @@ def __len__(self): return sum(1 for i in [id for id in self.storage if self.is_valid(id)]) - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: yield from filter(lambda id: self.is_valid(id), iter(self.storage)) def add(self, content, obj_id, check_presence=True, *args, **kwargs): diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py --- a/swh/objstorage/multiplexer/multiplexer_objstorage.py +++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py @@ -5,10 +5,10 @@ import queue import threading -from typing import Dict +from typing import Dict, Iterator from swh.objstorage.exc import ObjNotFoundError -from swh.objstorage.interface import ObjId +from swh.objstorage.interface import CompositeObjId, ObjId from swh.objstorage.objstorage import ObjStorage @@ -199,7 +199,7 @@ ) ) - def __contains__(self, obj_id): + def __contains__(self, obj_id: ObjId) -> bool: """Indicate if the given object is present in the storage. Args: @@ -215,7 +215,7 @@ return True return False - def __iter__(self): + def __iter__(self) -> Iterator[CompositeObjId]: def obj_iterator(): for storage in self.storages: yield from storage diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -14,7 +14,18 @@ from .constants import DEFAULT_LIMIT, ID_HASH_ALGO from .exc import ObjNotFoundError -from .interface import ObjId, ObjStorageInterface +from .interface import CompositeObjId, ObjId, ObjStorageInterface + + +def objid_to_default_hex(obj_id: ObjId) -> str: + """Converts SHA1 hashes and multi-hashes to the hexadecimal representation + of the SHA1.""" + if isinstance(obj_id, bytes): + return hashutil.hash_to_hex(obj_id) + elif isinstance(obj_id, str): + return obj_id + else: + return hashutil.hash_to_hex(obj_id[ID_HASH_ALGO]) def compute_hash(content, algo=ID_HASH_ALGO): @@ -125,8 +136,9 @@ self: ObjStorageInterface, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT, - ) -> Iterator[ObjId]: + ) -> Iterator[CompositeObjId]: it = iter(self) - if last_obj_id is not None: - it = dropwhile(last_obj_id.__ge__, it) + if last_obj_id: + last_obj_id_hex = objid_to_default_hex(last_obj_id) + it = dropwhile(lambda x: objid_to_default_hex(x) <= last_obj_id_hex, it) return islice(it, limit) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -23,7 +23,10 @@ missing_methods = [] for meth_name in dir(interface): - if meth_name.startswith("_"): + if meth_name.startswith("_") and meth_name not in ( + "__iter__", + "__contains__", + ): continue interface_meth = getattr(interface, meth_name) concrete_meth = getattr(self.storage, meth_name) diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py --- a/swh/objstorage/tests/test_objstorage_random_generator.py +++ b/swh/objstorage/tests/test_objstorage_random_generator.py @@ -21,10 +21,12 @@ sto = get_objstorage("random", total=100) assert isinstance(sto.list_content(), Iterator) - assert list(sto.list_content()) == [b"%d" % i for i in range(1, 101)] - assert list(sto.list_content(limit=10)) == [b"%d" % i for i in range(1, 11)] + assert list(sto.list_content()) == [{"sha1": b"%d" % i} for i in range(1, 101)] + assert list(sto.list_content(limit=10)) == [ + {"sha1": b"%d" % i} for i in range(1, 11) + ] assert list(sto.list_content(last_obj_id=b"10", limit=10)) == [ - b"%d" % i for i in range(11, 21) + {"sha1": b"%d" % i} for i in range(11, 21) ]