Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345820
D8029.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
D8029.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,6 +11,9 @@
[mypy-libcloud.*]
ignore_missing_imports = True
+[mypy-msgpack.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,6 +3,7 @@
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+msgpack
typing-extensions >= 3.7.4
# seaweedfs backend
diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py
--- a/swh/objstorage/api/client.py
+++ b/swh/objstorage/api/client.py
@@ -5,12 +5,13 @@
from typing import Iterator, Optional
+import msgpack
+
from swh.core.api import RPCClient
-from swh.core.utils import iter_chunks
from swh.model import hashutil
-from swh.objstorage.constants import DEFAULT_LIMIT, ID_DIGEST_LENGTH
+from swh.objstorage.constants import DEFAULT_LIMIT
from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError
-from swh.objstorage.interface import ObjId, ObjStorageInterface
+from swh.objstorage.interface import CompositeObjId, ObjId, ObjStorageInterface
class RemoteObjStorage(RPCClient):
@@ -33,17 +34,22 @@
def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId) -> None:
return self.add(content, obj_id, check_presence=False)
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
yield from self.list_content()
def list_content(
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
params = {"limit": limit}
if last_obj_id:
params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id)
- yield from iter_chunks(
- self._get_stream("content", params=params), chunk_size=ID_DIGEST_LENGTH
+ response = self.raw_verb(
+ "get",
+ "content",
+ headers={"accept": "application/x-msgpack"},
+ params=params,
+ stream=True,
)
+ yield from msgpack.Unpacker(response.raw, raw=True)
diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py
--- a/swh/objstorage/api/server.py
+++ b/swh/objstorage/api/server.py
@@ -7,8 +7,10 @@
import functools
import logging
import os
+from typing import Iterator
from flask import request
+import msgpack
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
@@ -98,9 +100,11 @@
last_obj_id = bytes.fromhex(last_obj_id)
limit = int(request.args.get("limit", DEFAULT_LIMIT))
- def generate():
+ def generate() -> Iterator[bytes]:
with timed_context("list_content"):
- yield from get_objstorage().list_content(last_obj_id, limit=limit)
+ packer = msgpack.Packer(use_bin_type=True)
+ for obj in get_objstorage().list_content(last_obj_id, limit=limit):
+ yield packer.pack(obj)
return app.response_class(generate())
diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py
--- a/swh/objstorage/backends/azure.py
+++ b/swh/objstorage/backends/azure.py
@@ -21,7 +21,7 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
ObjStorage,
compressors,
@@ -180,7 +180,7 @@
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
"""Does the storage contains the obj_id."""
hex_obj_id = self._internal_id(obj_id)
client = self.get_blob_client(hex_obj_id)
@@ -191,7 +191,7 @@
else:
return True
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the objects present in the storage."""
for client in self.get_all_container_clients():
for obj in client.list_blobs():
diff --git a/swh/objstorage/backends/generator.py b/swh/objstorage/backends/generator.py
--- a/swh/objstorage/backends/generator.py
+++ b/swh/objstorage/backends/generator.py
@@ -1,9 +1,10 @@
from itertools import count, islice, repeat
import logging
import random
-from typing import Iterator, Optional
+from typing import Generator, Iterator, Optional, cast
-from swh.objstorage.interface import ObjId
+from swh.objstorage.constants import ID_HASH_ALGO
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage
logger = logging.getLogger(__name__)
@@ -189,10 +190,10 @@
def __contains__(self, obj_id, *args, **kwargs):
return False
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
i = 1
while True:
- j = yield (b"%d" % i)
+ j = yield {ID_HASH_ALGO: b"%d" % i}
if self.total and i >= self.total:
logger.debug("DONE")
break
@@ -217,8 +218,10 @@
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
- it = iter(self)
+ ) -> Iterator[CompositeObjId]:
+ if isinstance(last_obj_id, dict):
+ last_obj_id = last_obj_id[ID_HASH_ALGO]
+ it = cast(Generator[CompositeObjId, int, None], iter(self))
if last_obj_id:
next(it)
it.send(int(last_obj_id))
diff --git a/swh/objstorage/backends/http.py b/swh/objstorage/backends/http.py
--- a/swh/objstorage/backends/http.py
+++ b/swh/objstorage/backends/http.py
@@ -11,7 +11,7 @@
from swh.model import hashutil
from swh.objstorage import exc
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
ObjStorage,
@@ -45,11 +45,11 @@
"""Check the configuration for this object storage"""
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
resp = self.session.head(self._path(obj_id))
return resp.status_code == 200
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
raise exc.NonIterableObjStorage("__iter__")
def __len__(self):
@@ -68,7 +68,7 @@
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
raise exc.NonIterableObjStorage("__len__")
def get(self, obj_id: ObjId) -> bytes:
diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py
--- a/swh/objstorage/backends/in_memory.py
+++ b/swh/objstorage/backends/in_memory.py
@@ -3,9 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterator
+
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
-from swh.objstorage.objstorage import ObjStorage, compute_hash
+from swh.objstorage.interface import CompositeObjId, ObjId
+from swh.objstorage.objstorage import ObjStorage, compute_hash, objid_to_default_hex
class InMemoryObjStorage(ObjStorage):
@@ -22,10 +24,10 @@
def check_config(self, *, check_write):
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
return obj_id in self.state
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
return iter(sorted(self.state))
def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None:
@@ -44,7 +46,7 @@
if obj_id not in self:
raise ObjNotFoundError(obj_id)
if compute_hash(self.state[obj_id]) != obj_id:
- raise Error("Corrupt object %s" % obj_id.hex())
+ raise Error("Corrupt object %s" % objid_to_default_hex(obj_id))
def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
diff --git a/swh/objstorage/backends/libcloud.py b/swh/objstorage/backends/libcloud.py
--- a/swh/objstorage/backends/libcloud.py
+++ b/swh/objstorage/backends/libcloud.py
@@ -5,8 +5,7 @@
import abc
from collections import OrderedDict
-from collections.abc import Iterator
-from typing import Optional
+from typing import Iterator, Optional
from urllib.parse import urlencode
from libcloud.storage import providers
@@ -15,7 +14,7 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
ObjStorage,
compressors,
@@ -116,7 +115,7 @@
# FIXME: hopefully this blew up during instantiation
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
try:
self._get_object(obj_id)
except ObjNotFoundError:
@@ -124,7 +123,7 @@
else:
return True
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the objects present in the storage
Warning: Iteration over the contents of a cloud-based object storage
diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py
--- a/swh/objstorage/backends/pathslicing.py
+++ b/swh/objstorage/backends/pathslicing.py
@@ -12,8 +12,13 @@
from swh.model import hashutil
from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
-from swh.objstorage.objstorage import ObjStorage, compressors, decompressors
+from swh.objstorage.interface import CompositeObjId, ObjId
+from swh.objstorage.objstorage import (
+ ObjStorage,
+ compressors,
+ decompressors,
+ objid_to_default_hex,
+)
BUFSIZ = 1048576
@@ -182,10 +187,10 @@
return True
def __contains__(self, obj_id: ObjId) -> bool:
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
return os.path.isfile(self.slicer.get_path(hex_obj_id))
- def __iter__(self) -> Iterator[bytes]:
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the object identifiers currently available in the
storage.
@@ -231,7 +236,7 @@
# If the object is already present, return immediately.
return
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
compressor = compressors[self.compression]()
with self._write_obj_file(hex_obj_id) as f:
f.write(compressor.compress(content))
@@ -242,7 +247,7 @@
raise ObjNotFoundError(obj_id)
# Open the file and return its content as bytes
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
d = decompressors[self.compression]()
with open(self.slicer.get_path(hex_obj_id), "rb") as f:
out = d.decompress(f.read())
@@ -257,7 +262,7 @@
try:
data = self.get(obj_id)
except OSError:
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
raise Error(
"Corrupt object %s: not a proper compressed file" % hex_obj_id,
)
@@ -266,13 +271,13 @@
data, hash_names=[ID_HASH_ALGO]
).digest()
- actual_obj_id = checksums[ID_HASH_ALGO]
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ actual_obj_sha1 = checksums[ID_HASH_ALGO]
+ hex_obj_id = objid_to_default_hex(obj_id)
- if hex_obj_id != hashutil.hash_to_hex(actual_obj_id):
+ if hex_obj_id != hashutil.hash_to_hex(actual_obj_sha1):
raise Error(
"Corrupt object %s should have id %s"
- % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id))
+ % (objid_to_default_hex(obj_id), hashutil.hash_to_hex(actual_obj_sha1))
)
def delete(self, obj_id: ObjId):
@@ -280,7 +285,7 @@
if obj_id not in self:
raise ObjNotFoundError(obj_id)
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
try:
os.remove(self.slicer.get_path(hex_obj_id))
except FileNotFoundError:
@@ -291,7 +296,7 @@
@contextmanager
def chunk_writer(self, obj_id):
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
compressor = compressors[self.compression]()
with self._write_obj_file(hex_obj_id) as f:
yield lambda c: f.write(compressor.compress(c))
@@ -299,7 +304,7 @@
def list_content(
self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
if last_obj_id:
it = self.iter_from(last_obj_id)
else:
@@ -307,7 +312,7 @@
return islice(it, limit)
def iter_from(self, obj_id, n_leaf=False):
- hex_obj_id = hashutil.hash_to_hex(obj_id)
+ hex_obj_id = objid_to_default_hex(obj_id)
slices = self.slicer.get_slices(hex_obj_id)
rlen = len(self.root.split("/"))
diff --git a/swh/objstorage/backends/seaweedfs/objstorage.py b/swh/objstorage/backends/seaweedfs/objstorage.py
--- a/swh/objstorage/backends/seaweedfs/objstorage.py
+++ b/swh/objstorage/backends/seaweedfs/objstorage.py
@@ -11,13 +11,14 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
ObjStorage,
compressors,
compute_hash,
decompressors,
+ objid_to_default_hex,
)
from .http import HttpFiler
@@ -41,10 +42,10 @@
# FIXME: hopefully this blew up during instantiation
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
return self.wf.exists(self._path(obj_id))
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterate over the objects present in the storage
Warning: Iteration over the contents of a cloud-based object storage
@@ -123,9 +124,9 @@
self,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
if last_obj_id:
- objid = hashutil.hash_to_hex(last_obj_id)
+ objid = objid_to_default_hex(last_obj_id)
lastfilename = objid
else:
lastfilename = None
diff --git a/swh/objstorage/interface.py b/swh/objstorage/interface.py
--- a/swh/objstorage/interface.py
+++ b/swh/objstorage/interface.py
@@ -3,15 +3,24 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict, Iterator, List, Optional
+from typing import Dict, Iterator, List, Optional, Union
-from typing_extensions import Protocol, runtime_checkable
+from typing_extensions import Protocol, TypedDict, runtime_checkable
from swh.core.api import remote_api_endpoint
from swh.objstorage.constants import DEFAULT_LIMIT
-ObjId = bytes
-"""Type of object ids, which should be a sha1 hash."""
+
+class CompositeObjId(TypedDict, total=False):
+ sha1: bytes
+ sha1_git: bytes
+ sha256: bytes
+ blake2s256: bytes
+
+
+ObjId = Union[bytes, CompositeObjId]
+"""Type of object ids, which should be ``{hash: value for hash in SUPPORTED_HASHES}``;
+but single sha1 hashes are supported for legacy clients"""
@runtime_checkable
@@ -172,12 +181,12 @@
"""
...
- def __iter__(self) -> Iterator[ObjId]:
+ def __iter__(self) -> Iterator[CompositeObjId]:
...
def list_content(
self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
"""Generates known object ids.
Args:
diff --git a/swh/objstorage/multiplexer/filter/filter.py b/swh/objstorage/multiplexer/filter/filter.py
--- a/swh/objstorage/multiplexer/filter/filter.py
+++ b/swh/objstorage/multiplexer/filter/filter.py
@@ -3,6 +3,9 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterator
+
+from swh.objstorage.interface import CompositeObjId
from swh.objstorage.objstorage import ObjStorage
@@ -39,7 +42,7 @@
def __contains__(self, *args, **kwargs):
return self.storage.__contains__(*args, **kwargs)
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
"""Iterates over the content of each storages
Warning: The `__iter__` methods frequently have bad performance. You
diff --git a/swh/objstorage/multiplexer/filter/id_filter.py b/swh/objstorage/multiplexer/filter/id_filter.py
--- a/swh/objstorage/multiplexer/filter/id_filter.py
+++ b/swh/objstorage/multiplexer/filter/id_filter.py
@@ -5,9 +5,11 @@
import abc
import re
+from typing import Iterator
from swh.model import hashutil
from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.interface import CompositeObjId
from swh.objstorage.multiplexer.filter.filter import ObjStorageFilter
@@ -33,7 +35,7 @@
def __len__(self):
return sum(1 for i in [id for id in self.storage if self.is_valid(id)])
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
yield from filter(lambda id: self.is_valid(id), iter(self.storage))
def add(self, content, obj_id, check_presence=True, *args, **kwargs):
diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py
--- a/swh/objstorage/multiplexer/multiplexer_objstorage.py
+++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py
@@ -5,10 +5,10 @@
import queue
import threading
-from typing import Dict
+from typing import Dict, Iterator
from swh.objstorage.exc import ObjNotFoundError
-from swh.objstorage.interface import ObjId
+from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import ObjStorage
@@ -199,7 +199,7 @@
)
)
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
"""Indicate if the given object is present in the storage.
Args:
@@ -215,7 +215,7 @@
return True
return False
- def __iter__(self):
+ def __iter__(self) -> Iterator[CompositeObjId]:
def obj_iterator():
for storage in self.storages:
yield from storage
diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py
--- a/swh/objstorage/objstorage.py
+++ b/swh/objstorage/objstorage.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2020 The Software Heritage developers
+# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -14,7 +14,18 @@
from .constants import DEFAULT_LIMIT, ID_HASH_ALGO
from .exc import ObjNotFoundError
-from .interface import ObjId, ObjStorageInterface
+from .interface import CompositeObjId, ObjId, ObjStorageInterface
+
+
+def objid_to_default_hex(obj_id: ObjId) -> str:
+ """Converts SHA1 hashes and multi-hashes to the hexadecimal representation
+ of the SHA1."""
+ if isinstance(obj_id, bytes):
+ return hashutil.hash_to_hex(obj_id)
+ elif isinstance(obj_id, str):
+ return obj_id
+ else:
+ return hashutil.hash_to_hex(obj_id[ID_HASH_ALGO])
def compute_hash(content, algo=ID_HASH_ALGO):
@@ -125,8 +136,9 @@
self: ObjStorageInterface,
last_obj_id: Optional[ObjId] = None,
limit: int = DEFAULT_LIMIT,
- ) -> Iterator[ObjId]:
+ ) -> Iterator[CompositeObjId]:
it = iter(self)
- if last_obj_id is not None:
- it = dropwhile(last_obj_id.__ge__, it)
+ if last_obj_id:
+ last_obj_id_hex = objid_to_default_hex(last_obj_id)
+ it = dropwhile(lambda x: objid_to_default_hex(x) <= last_obj_id_hex, it)
return islice(it, limit)
diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py
--- a/swh/objstorage/tests/objstorage_testing.py
+++ b/swh/objstorage/tests/objstorage_testing.py
@@ -23,7 +23,10 @@
missing_methods = []
for meth_name in dir(interface):
- if meth_name.startswith("_"):
+ if meth_name.startswith("_") and meth_name not in (
+ "__iter__",
+ "__contains__",
+ ):
continue
interface_meth = getattr(interface, meth_name)
concrete_meth = getattr(self.storage, meth_name)
diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py
--- a/swh/objstorage/tests/test_objstorage_random_generator.py
+++ b/swh/objstorage/tests/test_objstorage_random_generator.py
@@ -21,10 +21,12 @@
sto = get_objstorage("random", total=100)
assert isinstance(sto.list_content(), Iterator)
- assert list(sto.list_content()) == [b"%d" % i for i in range(1, 101)]
- assert list(sto.list_content(limit=10)) == [b"%d" % i for i in range(1, 11)]
+ assert list(sto.list_content()) == [{"sha1": b"%d" % i} for i in range(1, 101)]
+ assert list(sto.list_content(limit=10)) == [
+ {"sha1": b"%d" % i} for i in range(1, 11)
+ ]
assert list(sto.list_content(last_obj_id=b"10", limit=10)) == [
- b"%d" % i for i in range(11, 21)
+ {"sha1": b"%d" % i} for i in range(11, 21)
]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:32 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217075
Attached To
D8029: Start introducing composite ObjId in the interface
Event Timeline
Log In to Comment