Page MenuHomeSoftware Heritage

D8029.diff
No OneTemporary

D8029.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,6 +11,9 @@
[mypy-libcloud.*]
ignore_missing_imports = True
+[mypy-msgpack.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,6 +3,7 @@
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+msgpack
typing-extensions >= 3.7.4
# seaweedfs backend
diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py
--- a/swh/objstorage/api/client.py
+++ b/swh/objstorage/api/client.py
@@ -5,12 +5,13 @@
from typing import Iterator, Optional
+import msgpack
+
from swh.core.api import RPCClient
-from swh.core.utils import iter_chunks
from swh.model import hashutil
-from swh.objstorage.constants import DEFAULT_LIMIT, ID_DIGEST_LENGTH
+from swh.objstorage.constants import DEFAULT_LIMIT
from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError
-from swh.objstorage.interface import ObjId, ObjStorageInterface
+from swh.objstorage.interface import CompositeObjId, ObjId, ObjStorageInterface
class RemoteObjStorage(RPCClient):
@@ -33,17 +34,22 @@
def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId) -> None:
return self.add(content, obj_id, check_presence=False)
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
yield from self.list_content()
def list_content(
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
params = {"limit": limit}
if last_obj_id:
params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id)
- yield from iter_chunks(
- self._get_stream("content", params=params), chunk_size=ID_DIGEST_LENGTH
+ response = self.raw_verb(
+ "get",
+ "content",
+ headers={"accept": "application/x-msgpack"},
+ params=params,
+ stream=True,
)
+ yield from msgpack.Unpacker(response.raw, raw=True)
diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py
--- a/swh/objstorage/api/server.py
+++ b/swh/objstorage/api/server.py
@@ -7,8 +7,10 @@
import functools
import logging
import os
+from typing import Iterator
from flask import request
+import msgpack
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
@@ -98,9 +100,11 @@
last_obj_id = bytes.fromhex(last_obj_id)
limit = int(request.args.get("limit", DEFAULT_LIMIT))
- def generate():
+ def generate() -> Iterator[bytes]:
with timed_context("list_content"):
- yield from get_objstorage().list_content(last_obj_id, limit=limit)
+ packer = msgpack.Packer(use_bin_type=True)
+ for obj in get_objstorage().list_content(last_obj_id, limit=limit):
+ yield packer.pack(obj)
return app.response_class(generate())
diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py
--- a/swh/objstorage/backends/azure.py
+++ b/swh/objstorage/backends/azure.py
@@ -21,7 +21,7 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
ObjStorage,
compressors,
@@ -180,7 +180,7 @@
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
"""Does the storage contains the obj_id."""
hex_obj_id = self._internal_id(obj_id)
client = self.get_blob_client(hex_obj_id)
@@ -191,7 +191,7 @@
else:
return True
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the objects present in the storage."""
for client in self.get_all_container_clients():
for obj in client.list_blobs():
diff --git a/swh/objstorage/backends/generator.py b/swh/objstorage/backends/generator.py
--- a/swh/objstorage/backends/generator.py
+++ b/swh/objstorage/backends/generator.py
@@ -1,9 +1,10 @@
from itertools import count, islice, repeat
import logging
import random
-from typing import Iterator, Optional
+from typing import Generator, Iterator, Optional, cast
-from swh.objstorage.interface import ObjId
+from swh.objstorage.constants import ID_HASH_ALGO
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage
logger = logging.getLogger(__name__)
@@ -189,10 +190,10 @@
def __contains__(self, obj_id, *args, **kwargs):
return False
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
i = 1
while True:
- j = yield (b"%d" % i)
+ j = yield {ID_HASH_ALGO: b"%d" % i}
if self.total and i >= self.total:
logger.debug("DONE")
break
@@ -217,8 +218,10 @@
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
- it = iter(self)
+ ) -> Iterator[CompositeObjId]:
+ if isinstance(last_obj_id, dict):
+ last_obj_id = last_obj_id[ID_HASH_ALGO]
+ it = cast(Generator[CompositeObjId, int, None], iter(self))
if last_obj_id:
next(it)
it.send(int(last_obj_id))
diff --git a/swh/objstorage/backends/http.py b/swh/objstorage/backends/http.py
--- a/swh/objstorage/backends/http.py
+++ b/swh/objstorage/backends/http.py
@@ -11,7 +11,7 @@
from swh.model import hashutil
from swh.objstorage import exc
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
ObjStorage,
@@ -45,11 +45,11 @@
"""Check the configuration for this object storage"""
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
resp = self.session.head(self._path(obj_id))
return resp.status_code == 200
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
raise exc.NonIterableObjStorage("__iter__")
def __len__(self):
@@ -68,7 +68,7 @@
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
raise exc.NonIterableObjStorage("__len__")
def get(self, obj_id: ObjId) -> bytes:
diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py
--- a/swh/objstorage/backends/in_memory.py
+++ b/swh/objstorage/backends/in_memory.py
@@ -3,9 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterator
+
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
-from swh.objstorage.objstorage import ObjStorage, compute_hash
+from swh.objstorage.interface import CompositeObjId, ObjId
+from swh.objstorage.objstorage import ObjStorage, compute_hash, objid_to_default_hex
class InMemoryObjStorage(ObjStorage):
@@ -22,10 +24,10 @@
def check_config(self, *, check_write):
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
return obj_id in self.state
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
return iter(sorted(self.state))
def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None:
@@ -44,7 +46,7 @@
if obj_id not in self:
raise ObjNotFoundError(obj_id)
if compute_hash(self.state[obj_id]) != obj_id:
- raise Error("Corrupt object %s" % obj_id.hex())
+ raise Error("Corrupt object %s" % objid_to_default_hex(obj_id))
def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
diff --git a/swh/objstorage/backends/libcloud.py b/swh/objstorage/backends/libcloud.py
--- a/swh/objstorage/backends/libcloud.py
+++ b/swh/objstorage/backends/libcloud.py
@@ -5,8 +5,7 @@
import abc
from collections import OrderedDict
-from collections.abc import Iterator
-from typing import Optional
+from typing import Iterator, Optional
from urllib.parse import urlencode
from libcloud.storage import providers
@@ -15,7 +14,7 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
ObjStorage,
compressors,
@@ -116,7 +115,7 @@
# FIXME: hopefully this blew up during instantiation
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
try:
self._get_object(obj_id)
except ObjNotFoundError:
@@ -124,7 +123,7 @@
else:
return True
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the objects present in the storage
Warning: Iteration over the contents of a cloud-based object storage
diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py
--- a/swh/objstorage/backends/pathslicing.py
+++ b/swh/objstorage/backends/pathslicing.py
@@ -12,8 +12,13 @@
from swh.model import hashutil
from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
-from swh.objstorage.objstorage import ObjStorage, compressors, decompressors
+from swh.objstorage.interface import CompositeObjId, ObjId
+from swh.objstorage.objstorage import (
+ ObjStorage,
+ compressors,
+ decompressors,
+ objid_to_default_hex,
+)
BUFSIZ = 1048576
@@ -182,10 +187,10 @@
return True
def __contains__(self, obj_id: ObjId) -> bool:
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
return os.path.isfile(self.slicer.get_path(hex_obj_id))
- def __iter__(self) -> Iterator[bytes]:
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the object identifiers currently available in the
storage.
@@ -231,7 +236,7 @@
# If the object is already present, return immediately.
return
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
compressor = compressors[self.compression]()
with self._write_obj_file(hex_obj_id) as f:
f.write(compressor.compress(content))
@@ -242,7 +247,7 @@
raise ObjNotFoundError(obj_id)
# Open the file and return its content as bytes
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
d = decompressors[self.compression]()
with open(self.slicer.get_path(hex_obj_id), "rb") as f:
out = d.decompress(f.read())
@@ -257,7 +262,7 @@
try:
data = self.get(obj_id)
except OSError:
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
raise Error(
"Corrupt object %s: not a proper compressed file" % hex_obj_id,
)
@@ -266,13 +271,13 @@
data, hash_names=[ID_HASH_ALGO]
).digest()
- actual_obj_id = checksums[ID_HASH_ALGO]
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ actual_obj_sha1 = checksums[ID_HASH_ALGO]
+ hex_obj_id = objid_to_default_hex(obj_id)
- if hex_obj_id != hashutil.hash_to_hex(actual_obj_id):
+ if hex_obj_id != hashutil.hash_to_hex(actual_obj_sha1):
raise Error(
"Corrupt object %s should have id %s"
- % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id))
+ % (objid_to_default_hex(obj_id), hashutil.hash_to_hex(actual_obj_sha1))
)
def delete(self, obj_id: ObjId):
@@ -280,7 +285,7 @@
if obj_id not in self:
raise ObjNotFoundError(obj_id)
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
try:
os.remove(self.slicer.get_path(hex_obj_id))
except FileNotFoundError:
@@ -291,7 +296,7 @@
@contextmanager
def chunk_writer(self, obj_id):
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
compressor = compressors[self.compression]()
with self._write_obj_file(hex_obj_id) as f:
yield lambda c: f.write(compressor.compress(c))
@@ -299,7 +304,7 @@
def list_content(
self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
if last_obj_id:
it = self.iter_from(last_obj_id)
else:
@@ -307,7 +312,7 @@
return islice(it, limit)
def iter_from(self, obj_id, n_leaf=False):
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
slices = self.slicer.get_slices(hex_obj_id)
rlen = len(self.root.split("/"))
diff --git a/swh/objstorage/backends/seaweedfs/objstorage.py b/swh/objstorage/backends/seaweedfs/objstorage.py
--- a/swh/objstorage/backends/seaweedfs/objstorage.py
+++ b/swh/objstorage/backends/seaweedfs/objstorage.py
@@ -11,13 +11,14 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
ObjStorage,
compressors,
compute_hash,
decompressors,
+ objid_to_default_hex,
)
from .http import HttpFiler
@@ -41,10 +42,10 @@
# FIXME: hopefully this blew up during instantiation
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
return self.wf.exists(self._path(obj_id))
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the objects present in the storage
Warning: Iteration over the contents of a cloud-based object storage
@@ -123,9 +124,9 @@
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
if last_obj_id:
- objid = hashutil.hash_to_hex(last_obj_id)
+ objid = objid_to_default_hex(last_obj_id)
lastfilename = objid
else:
lastfilename = None
diff --git a/swh/objstorage/interface.py b/swh/objstorage/interface.py
--- a/swh/objstorage/interface.py
+++ b/swh/objstorage/interface.py
@@ -3,15 +3,24 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict, Iterator, List, Optional
+from typing import Dict, Iterator, List, Optional, Union
-from typing_extensions import Protocol, runtime_checkable
+from typing_extensions import Protocol, TypedDict, runtime_checkable
from swh.core.api import remote_api_endpoint
from swh.objstorage.constants import DEFAULT_LIMIT
-ObjId = bytes
-"""Type of object ids, which should be a sha1 hash."""
+
+class CompositeObjId(TypedDict, total=False):
+ sha1: bytes
+ sha1_git: bytes
+ sha256: bytes
+ blake2s256: bytes
+
+
+ObjId = Union[bytes, CompositeObjId]
+"""Type of object ids, which should be ``{hash: value for hash in SUPPORTED_HASHES}``;
+but single sha1 hashes are supported for legacy clients"""
@runtime_checkable
@@ -172,12 +181,12 @@
"""
...
- def __iter__(self) -> Iterator[ObjId]:
+ def __iter__(self) -> Iterator[CompositeObjId]:
...
def list_content(
self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
"""Generates known object ids.
Args:
diff --git a/swh/objstorage/multiplexer/filter/filter.py b/swh/objstorage/multiplexer/filter/filter.py
--- a/swh/objstorage/multiplexer/filter/filter.py
+++ b/swh/objstorage/multiplexer/filter/filter.py
@@ -3,6 +3,9 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterator
+
+from swh.objstorage.interface import CompositeObjId
from swh.objstorage.objstorage import ObjStorage
@@ -39,7 +42,7 @@
def __contains__(self, *args, **kwargs):
return self.storage.__contains__(*args, **kwargs)
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterates over the content of each storages
Warning: The `__iter__` methods frequently have bad performance. You
diff --git a/swh/objstorage/multiplexer/filter/id_filter.py b/swh/objstorage/multiplexer/filter/id_filter.py
--- a/swh/objstorage/multiplexer/filter/id_filter.py
+++ b/swh/objstorage/multiplexer/filter/id_filter.py
@@ -5,9 +5,11 @@
import abc
import re
+from typing import Iterator
from swh.model import hashutil
from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.interface import CompositeObjId
from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter
@@ -33,7 +35,7 @@
def __len__(self):
return sum(1 for i in [id for id in self.storage if self.is_valid(id)])
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
yield from filter(lambda id: self.is_valid(id), iter(self.storage))
def add(self, content, obj_id, check_presence=True, *args, **kwargs):
diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py
--- a/swh/objstorage/multiplexer/multiplexer_objstorage.py
+++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py
@@ -5,10 +5,10 @@
import queue
import threading
-from typing import Dict
+from typing import Dict, Iterator
from swh.objstorage.exc import ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import ObjStorage
@@ -199,7 +199,7 @@
)
)
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
"""Indicate if the given object is present in the storage.
Args:
@@ -215,7 +215,7 @@
return True
return False
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
def obj_iterator():
for storage in self.storages:
yield from storage
diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py
--- a/swh/objstorage/objstorage.py
+++ b/swh/objstorage/objstorage.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2020 The Software Heritage developers
+# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -14,7 +14,18 @@
from .constants import DEFAULT_LIMIT, ID_HASH_ALGO
from .exc import ObjNotFoundError
-from .interface import ObjId, ObjStorageInterface
+from .interface import CompositeObjId, ObjId, ObjStorageInterface
+
+
+def objid_to_default_hex(obj_id: ObjId) -> str:
+ """Converts SHA1 hashes and multi-hashes to the hexadecimal representation
+ of the SHA1."""
+ if isinstance(obj_id, bytes):
+ return hashutil.hash_to_hex(obj_id)
+ elif isinstance(obj_id, str):
+ return obj_id
+ else:
+ return hashutil.hash_to_hex(obj_id[ID_HASH_ALGO])
def compute_hash(content, algo=ID_HASH_ALGO):
@@ -125,8 +136,9 @@
self: ObjStorageInterface,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
it = iter(self)
- if last_obj_id is not None:
- it = dropwhile(last_obj_id.__ge__, it)
+ if last_obj_id:
+ last_obj_id_hex = objid_to_default_hex(last_obj_id)
+ it = dropwhile(lambda x: objid_to_default_hex(x) <= last_obj_id_hex, it)
return islice(it, limit)
diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py
--- a/swh/objstorage/tests/objstorage_testing.py
+++ b/swh/objstorage/tests/objstorage_testing.py
@@ -23,7 +23,10 @@
missing_methods = []
for meth_name in dir(interface):
- if meth_name.startswith("_"):
+ if meth_name.startswith("_") and meth_name not in (
+ "__iter__",
+ "__contains__",
+ ):
continue
interface_meth = getattr(interface, meth_name)
concrete_meth = getattr(self.storage, meth_name)
diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py
--- a/swh/objstorage/tests/test_objstorage_random_generator.py
+++ b/swh/objstorage/tests/test_objstorage_random_generator.py
@@ -21,10 +21,12 @@
sto = get_objstorage("random", total=100)
assert isinstance(sto.list_content(), Iterator)
- assert list(sto.list_content()) == [b"%d" % i for i in range(1, 101)]
- assert list(sto.list_content(limit=10)) == [b"%d" % i for i in range(1, 11)]
+ assert list(sto.list_content()) == [{"sha1": b"%d" % i} for i in range(1, 101)]
+ assert list(sto.list_content(limit=10)) == [
+ {"sha1": b"%d" % i} for i in range(1, 11)
+ ]
assert list(sto.list_content(last_obj_id=b"10", limit=10)) == [
- b"%d" % i for i in range(11, 21)
+ {"sha1": b"%d" % i} for i in range(11, 21)
]

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:32 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217075

Event Timeline