Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346022
D8014.id28876.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
33 KB
Subscribers
None
D8014.id28876.diff
View Options
diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py
--- a/swh/objstorage/api/client.py
+++ b/swh/objstorage/api/client.py
@@ -3,12 +3,14 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Iterator, Optional
+
from swh.core.api import RPCClient
from swh.core.utils import iter_chunks
from swh.model import hashutil
+from swh.objstorage.constants import DEFAULT_LIMIT, ID_DIGEST_LENGTH
from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError
-from swh.objstorage.interface import ObjStorageInterface
-from swh.objstorage.objstorage import DEFAULT_LIMIT, ID_DIGEST_LENGTH
+from swh.objstorage.interface import ObjId, ObjStorageInterface
class RemoteObjStorage(RPCClient):
@@ -28,13 +30,17 @@
reraise_exceptions = [ObjNotFoundError, Error]
backend_class = ObjStorageInterface
- def restore(self, content, obj_id):
+ def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId):
return self.add(content, obj_id, check_presence=False)
def __iter__(self):
yield from self.list_content()
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self,
+ last_obj_id: Optional[ObjId] = None,
+ limit: int = DEFAULT_LIMIT,
+ ) -> Iterator[ObjId]:
params = {"limit": limit}
if last_obj_id:
params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id)
diff --git a/swh/objstorage/backends/azure.py b/swh/objstorage/backends/azure.py
--- a/swh/objstorage/backends/azure.py
+++ b/swh/objstorage/backends/azure.py
@@ -8,7 +8,7 @@
import datetime
from itertools import product
import string
-from typing import Dict, Optional, Union
+from typing import Dict, Iterator, List, Optional, Union
import warnings
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
@@ -21,6 +21,7 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import (
ObjStorage,
compressors,
@@ -205,7 +206,7 @@
"""
return sum(1 for i in self)
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
"""Add an obj in storage if it's not there already."""
if check_presence and obj_id in self:
return obj_id
@@ -229,14 +230,14 @@
return obj_id
- def restore(self, content, obj_id):
+ def restore(self, content: bytes, obj_id: ObjId):
"""Restore a content."""
if obj_id in self:
self.delete(obj_id)
return self.add(content, obj_id, check_presence=False)
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
"""retrieve blob's content if found."""
return call_async(self._get_async, obj_id)
@@ -286,18 +287,18 @@
]
)
- def get_batch(self, obj_ids):
+ def get_batch(self, obj_ids: List[ObjId]) -> Iterator[Optional[bytes]]:
"""Retrieve objects' raw content in bulk from storage, concurrently."""
return call_async(self._get_batch_async, obj_ids)
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
"""Check the content integrity."""
obj_content = self.get(obj_id)
content_obj_id = compute_hash(obj_content)
if content_obj_id != obj_id:
raise Error(obj_id)
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
"""Delete an object."""
super().delete(obj_id) # Check delete permission
hex_obj_id = self._internal_id(obj_id)
diff --git a/swh/objstorage/backends/generator.py b/swh/objstorage/backends/generator.py
--- a/swh/objstorage/backends/generator.py
+++ b/swh/objstorage/backends/generator.py
@@ -1,7 +1,9 @@
from itertools import count, islice, repeat
import logging
import random
+from typing import Iterator, Optional
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage
logger = logging.getLogger(__name__)
@@ -211,7 +213,11 @@
def delete(self, obj_id, *args, **kwargs):
return True
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self,
+ last_obj_id: Optional[ObjId] = None,
+ limit: int = DEFAULT_LIMIT,
+ ) -> Iterator[ObjId]:
it = iter(self)
if last_obj_id:
next(it)
diff --git a/swh/objstorage/backends/http.py b/swh/objstorage/backends/http.py
--- a/swh/objstorage/backends/http.py
+++ b/swh/objstorage/backends/http.py
@@ -4,12 +4,14 @@
# See top-level LICENSE file for more information
import logging
+from typing import Iterator, Optional
from urllib.parse import urljoin
import requests
from swh.model import hashutil
from swh.objstorage import exc
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
ObjStorage,
@@ -53,19 +55,23 @@
def __len__(self):
raise exc.NonIterableObjStorage("__len__")
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
raise exc.ReadOnlyObjStorage("add")
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
raise exc.ReadOnlyObjStorage("delete")
- def restore(self, content, obj_id):
+ def restore(self, content: bytes, obj_id: ObjId):
raise exc.ReadOnlyObjStorage("restore")
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self,
+ last_obj_id: Optional[ObjId] = None,
+ limit: int = DEFAULT_LIMIT,
+ ) -> Iterator[ObjId]:
raise exc.NonIterableObjStorage("__len__")
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
try:
resp = self.session.get(self._path(obj_id))
resp.raise_for_status()
@@ -81,7 +87,7 @@
raise exc.Error("Corrupt object %s: trailing data found" % hex_obj_id)
return ret
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
# Check the content integrity
obj_content = self.get(obj_id)
content_obj_id = compute_hash(obj_content)
diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py
--- a/swh/objstorage/backends/in_memory.py
+++ b/swh/objstorage/backends/in_memory.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
from swh.objstorage.exc import Error, ObjNotFoundError
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import ObjStorage, compute_hash
@@ -27,7 +28,7 @@
def __iter__(self):
return iter(sorted(self.state))
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
if check_presence and obj_id in self:
return obj_id
@@ -35,20 +36,19 @@
return obj_id
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
if obj_id not in self:
raise ObjNotFoundError(obj_id)
return self.state[obj_id]
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
if obj_id not in self:
raise ObjNotFoundError(obj_id)
if compute_hash(self.state[obj_id]) != obj_id:
- raise Error("Corrupt object %s" % obj_id)
- return True
+ raise Error("Corrupt object %s" % obj_id.hex())
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
if obj_id not in self:
raise ObjNotFoundError(obj_id)
diff --git a/swh/objstorage/backends/libcloud.py b/swh/objstorage/backends/libcloud.py
--- a/swh/objstorage/backends/libcloud.py
+++ b/swh/objstorage/backends/libcloud.py
@@ -15,6 +15,7 @@
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import (
ObjStorage,
compressors,
@@ -61,7 +62,7 @@
def __init__(
self,
container_name: str,
- compression: Optional[str] = None,
+ compression: str = "gzip",
path_prefix: Optional[str] = None,
**kwargs,
):
@@ -156,17 +157,17 @@
"""
return sum(1 for i in self)
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
if check_presence and obj_id in self:
return obj_id
self._put_object(content, obj_id)
return obj_id
- def restore(self, content, obj_id):
+ def restore(self, content: bytes, obj_id: ObjId):
return self.add(content, obj_id, check_presence=False)
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
obj = b"".join(self._get_object(obj_id).as_stream())
d = decompressors[self.compression]()
ret = d.decompress(obj)
@@ -175,7 +176,7 @@
raise Error("Corrupt object %s: trailing data found" % hex_obj_id)
return ret
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
# Check that the file exists, as _get_object raises ObjNotFoundError
self._get_object(obj_id)
# Check the content integrity
@@ -184,7 +185,7 @@
if content_obj_id != obj_id:
raise Error(obj_id)
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
obj = self._get_object(obj_id)
return self.driver.delete_object(obj)
diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py
--- a/swh/objstorage/backends/pathslicing.py
+++ b/swh/objstorage/backends/pathslicing.py
@@ -1,26 +1,20 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from collections.abc import Iterator
from contextlib import contextmanager
from itertools import islice
import os
import random
import tempfile
-from typing import List
+from typing import Iterable, Iterator, List, Optional
from swh.model import hashutil
+from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH
from swh.objstorage.exc import Error, ObjNotFoundError
-from swh.objstorage.objstorage import (
- DEFAULT_LIMIT,
- ID_HASH_ALGO,
- ID_HEXDIGEST_LENGTH,
- ObjStorage,
- compressors,
- decompressors,
-)
+from swh.objstorage.interface import ObjId
+from swh.objstorage.objstorage import ObjStorage, compressors, decompressors
BUFSIZ = 1048576
@@ -188,11 +182,11 @@
return True
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
hex_obj_id = hashutil.hash_to_hex(obj_id)
return os.path.isfile(self.slicer.get_path(hex_obj_id))
- def __iter__(self):
+ def __iter__(self) -> Iterator[bytes]:
"""Iterate over the object identifiers currently available in the
storage.
@@ -217,7 +211,7 @@
return obj_iterator()
- def __len__(self):
+ def __len__(self) -> int:
"""Compute the number of objects available in the storage.
Warning: this currently uses `__iter__`, its warning about bad
@@ -228,23 +222,25 @@
"""
return sum(1 for i in self)
- def add(self, content, obj_id, check_presence=True):
+ def add(
+ self,
+ content: bytes,
+ obj_id: ObjId,
+ check_presence: bool = True,
+ ) -> ObjId:
if check_presence and obj_id in self:
# If the object is already present, return immediately.
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
- if not isinstance(content, Iterator):
- content = [content]
compressor = compressors[self.compression]()
with self._write_obj_file(hex_obj_id) as f:
- for chunk in content:
- f.write(compressor.compress(chunk))
+ f.write(compressor.compress(content))
f.write(compressor.flush())
return obj_id
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
if obj_id not in self:
raise ObjNotFoundError(obj_id)
@@ -260,7 +256,7 @@
return out
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
try:
data = self.get(obj_id)
except OSError:
@@ -282,7 +278,7 @@
% (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id))
)
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
if obj_id not in self:
raise ObjNotFoundError(obj_id)
@@ -296,7 +292,7 @@
# Management methods
- def get_random(self, batch_size):
+ def get_random(self, batch_size: int) -> Iterable[ObjId]:
def get_random_content(self, batch_size):
"""Get a batch of content inside a single directory.
@@ -334,7 +330,9 @@
yield lambda c: f.write(compressor.compress(c))
f.write(compressor.flush())
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT
+ ) -> Iterator[ObjId]:
if last_obj_id:
it = self.iter_from(last_obj_id)
else:
diff --git a/swh/objstorage/backends/seaweedfs/objstorage.py b/swh/objstorage/backends/seaweedfs/objstorage.py
--- a/swh/objstorage/backends/seaweedfs/objstorage.py
+++ b/swh/objstorage/backends/seaweedfs/objstorage.py
@@ -7,9 +7,11 @@
from itertools import islice
import logging
import os
+from typing import Iterator, Optional
from swh.model import hashutil
from swh.objstorage.exc import Error, ObjNotFoundError
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import (
DEFAULT_LIMIT,
ObjStorage,
@@ -72,27 +74,26 @@
"""
return sum(1 for i in self)
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
if check_presence and obj_id in self:
return obj_id
def compressor(data):
comp = compressors[self.compression]()
- for chunk in data:
- yield comp.compress(chunk)
+ yield comp.compress(data)
yield comp.flush()
- if isinstance(content, bytes):
- content = [content]
+ assert isinstance(
+ content, bytes
+ ), "list of content chunks is not supported anymore"
- # XXX should handle streaming correctly...
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id))
return obj_id
- def restore(self, content, obj_id):
+ def restore(self, content: bytes, obj_id: ObjId):
return self.add(content, obj_id, check_presence=False)
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
try:
obj = self.wf.get(self._path(obj_id))
except Exception:
@@ -105,21 +106,25 @@
raise Error("Corrupt object %s: trailing data found" % hex_obj_id)
return ret
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
# Check the content integrity
obj_content = self.get(obj_id)
content_obj_id = compute_hash(obj_content)
if content_obj_id != obj_id:
raise Error(obj_id)
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
if obj_id not in self:
raise ObjNotFoundError(obj_id)
self.wf.delete(self._path(obj_id))
return True
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self,
+ last_obj_id: Optional[ObjId] = None,
+ limit: int = DEFAULT_LIMIT,
+ ) -> Iterator[ObjId]:
if last_obj_id:
objid = hashutil.hash_to_hex(last_obj_id)
lastfilename = objid
diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py
--- a/swh/objstorage/backends/winery/objstorage.py
+++ b/swh/objstorage/backends/winery/objstorage.py
@@ -7,6 +7,7 @@
from multiprocessing import Process
from swh.objstorage import exc
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import ObjStorage
from .roshard import ROShard
@@ -28,7 +29,7 @@
def uninit(self):
self.winery.uninit()
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
return self.winery.get(obj_id)
def check_config(self, *, check_write):
@@ -37,13 +38,13 @@
def __contains__(self, obj_id):
return obj_id in self.winery
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
return self.winery.add(content, obj_id, check_presence)
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
return self.winery.check(obj_id)
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
raise PermissionError("Delete is not allowed.")
@@ -74,7 +75,7 @@
self.shards[name] = shard
return self.shards[name]
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
shard_info = self.base.get(obj_id)
if shard_info is None:
raise exc.ObjNotFoundError(obj_id)
@@ -140,7 +141,7 @@
self.shard.uninit()
super().uninit()
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
if check_presence and obj_id in self:
return obj_id
@@ -157,7 +158,7 @@
return obj_id
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
# load all shards packing == True and not locked (i.e. packer
# was interrupted for whatever reason) run pack for each of them
pass
diff --git a/swh/objstorage/constants.py b/swh/objstorage/constants.py
new file mode 100644
--- /dev/null
+++ b/swh/objstorage/constants.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2015-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing_extensions import Literal
+
+ID_HASH_ALGO: Literal["sha1"] = "sha1"
+
+ID_HEXDIGEST_LENGTH = 40
+"""Size in bytes of the hash hexadecimal representation."""
+
+ID_DIGEST_LENGTH = 20
+"""Size in bytes of the hash"""
+
+DEFAULT_LIMIT = 10000
+"""Default number of results of ``list_content``."""
diff --git a/swh/objstorage/factory.py b/swh/objstorage/factory.py
--- a/swh/objstorage/factory.py
+++ b/swh/objstorage/factory.py
@@ -15,7 +15,7 @@
from swh.objstorage.backends.seaweedfs import SeaweedFilerObjStorage
from swh.objstorage.multiplexer import MultiplexerObjStorage, StripingObjStorage
from swh.objstorage.multiplexer.filter import add_filters
-from swh.objstorage.objstorage import ID_HEXDIGEST_LENGTH, ObjStorage # noqa
+from swh.objstorage.objstorage import ObjStorage
__all__ = ["get_objstorage", "ObjStorage"]
diff --git a/swh/objstorage/interface.py b/swh/objstorage/interface.py
--- a/swh/objstorage/interface.py
+++ b/swh/objstorage/interface.py
@@ -3,12 +3,15 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Dict
+from typing import Dict, Iterable, Iterator, List, Optional
from typing_extensions import Protocol, runtime_checkable
from swh.core.api import remote_api_endpoint
-from swh.objstorage.objstorage import DEFAULT_LIMIT
+from swh.objstorage.constants import DEFAULT_LIMIT
+
+ObjId = bytes
+"""Type of object ids, which should be a sha1 hash."""
@runtime_checkable
@@ -48,11 +51,11 @@
...
@remote_api_endpoint("content/contains")
- def __contains__(self, obj_id):
+ def __contains__(self, obj_id: ObjId) -> bool:
"""Indicate if the given object is present in the storage.
Args:
- obj_id (bytes): object identifier.
+ obj_id: object identifier.
Returns:
True if and only if the object is present in the current object
@@ -62,12 +65,12 @@
...
@remote_api_endpoint("content/add")
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
"""Add a new object to the object storage.
Args:
- content (bytes): object's raw content to add in storage.
- obj_id (bytes): checksum of [bytes] using [ID_HASH_ALGO]
+ content: object's raw content to add in storage.
+ obj_id: checksum of [bytes] using [ID_HASH_ALGO]
algorithm. It is trusted to match the bytes.
check_presence (bool): indicate if the presence of the
content should be verified before adding the file.
@@ -92,7 +95,7 @@
"""
...
- def restore(self, content, obj_id):
+ def restore(self, content: bytes, obj_id: ObjId):
"""Restore a content that have been corrupted.
This function is identical to add but does not check if
@@ -101,21 +104,17 @@
suitable for most cases.
Args:
- content (bytes): object's raw content to add in storage
- obj_id (bytes): checksum of `bytes` as computed by
- ID_HASH_ALGO. When given, obj_id will be trusted to
- match bytes. If missing, obj_id will be computed on
- the fly.
-
+ content: object's raw content to add in storage
+ obj_id: dict of hashes of the content (or only the sha1, for legacy clients)
"""
...
@remote_api_endpoint("content/get")
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
"""Retrieve the content of a given object.
Args:
- obj_id (bytes): object id.
+ obj_id: object id.
Returns:
the content of the requested object as bytes.
@@ -127,7 +126,7 @@
...
@remote_api_endpoint("content/get/batch")
- def get_batch(self, obj_ids):
+ def get_batch(self, obj_ids: List[ObjId]) -> Iterator[Optional[bytes]]:
"""Retrieve objects' raw content in bulk from storage.
Note: This function does have a default implementation in
@@ -138,7 +137,7 @@
can be overridden to perform a more efficient operation.
Args:
- obj_ids ([bytes]: list of object ids.
+ obj_ids: list of object ids.
Returns:
list of resulting contents, or None if the content could
@@ -149,14 +148,14 @@
...
@remote_api_endpoint("content/check")
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
"""Perform an integrity check for a given object.
Verify that the file object is in place and that the content matches
the object id.
Args:
- obj_id (bytes): object identifier.
+ obj_id: object identifier.
Raises:
ObjNotFoundError: if the requested object is missing.
@@ -166,11 +165,11 @@
...
@remote_api_endpoint("content/delete")
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
"""Delete an object.
Args:
- obj_id (bytes): object identifier.
+ obj_id: object identifier.
Raises:
ObjNotFoundError: if the requested object is missing.
@@ -181,34 +180,35 @@
# Management methods
@remote_api_endpoint("content/get/random")
- def get_random(self, batch_size):
+ def get_random(self, batch_size: int) -> Iterable[ObjId]:
"""Get random ids of existing contents.
This method is used in order to get random ids to perform
content integrity verifications on random contents.
Args:
- batch_size (int): Number of ids that will be given
+ batch_size: Number of ids that will be given
Yields:
- An iterable of ids (bytes) of contents that are in the
- current object storage.
+ ids of contents that are in the current object storage.
"""
...
- def __iter__(self):
+ def __iter__(self) -> Iterator[ObjId]:
...
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT
+ ) -> Iterator[ObjId]:
"""Generates known object ids.
Args:
- last_obj_id (bytes): object id from which to iterate from
+ last_obj_id: object id from which to iterate from
(excluded).
limit (int): max number of object ids to generate.
Generates:
- obj_id (bytes): object ids.
+ obj_id: object ids.
"""
...
diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py
--- a/swh/objstorage/multiplexer/multiplexer_objstorage.py
+++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py
@@ -6,9 +6,10 @@
import queue
import random
import threading
-from typing import Dict
+from typing import Dict, Iterable
from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import ObjStorage
@@ -222,7 +223,7 @@
return obj_iterator()
- def add(self, content, obj_id, check_presence=True):
+ def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId:
"""Add a new object to the object storage.
If the adding step works in all the storages that accept this content,
@@ -255,6 +256,8 @@
continue
return result
+ assert False, "No backend objstorage configured"
+
def add_batch(self, contents, check_presence=True) -> Dict:
"""Add a batch of new objects to the object storage."""
write_threads = list(self.get_write_threads())
@@ -275,7 +278,7 @@
"object:add:bytes": summed["object:add:bytes"] // len(results),
}
- def restore(self, content, obj_id):
+ def restore(self, content: bytes, obj_id: ObjId):
return self.wrap_call(
self.get_write_threads(obj_id),
"restore",
@@ -283,7 +286,7 @@
obj_id=obj_id,
).pop()
- def get(self, obj_id):
+ def get(self, obj_id: ObjId) -> bytes:
for storage in self.get_read_threads(obj_id):
try:
return storage.get(obj_id)
@@ -292,7 +295,7 @@
# If no storage contains this content, raise the error
raise ObjNotFoundError(obj_id)
- def check(self, obj_id):
+ def check(self, obj_id: ObjId) -> None:
nb_present = 0
for storage in self.get_read_threads(obj_id):
try:
@@ -308,11 +311,11 @@
if nb_present == 0:
raise ObjNotFoundError(obj_id)
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
super().delete(obj_id) # Check delete permission
return all(self.wrap_call(self.get_write_threads(obj_id), "delete", obj_id))
- def get_random(self, batch_size):
+ def get_random(self, batch_size: int) -> Iterable[ObjId]:
storages_set = [storage for storage in self.storages if len(storage) > 0]
if len(storages_set) <= 0:
return []
diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py
--- a/swh/objstorage/objstorage.py
+++ b/swh/objstorage/objstorage.py
@@ -7,23 +7,14 @@
import bz2
from itertools import dropwhile, islice
import lzma
-from typing import Dict
+from typing import Callable, Dict, Iterable, Iterator, List, Optional
import zlib
from swh.model import hashutil
+from .constants import DEFAULT_LIMIT, ID_HASH_ALGO
from .exc import ObjNotFoundError
-
-ID_HASH_ALGO = "sha1"
-
-ID_HEXDIGEST_LENGTH = 40
-"""Size in bytes of the hash hexadecimal representation."""
-
-ID_DIGEST_LENGTH = 20
-"""Size in bytes of the hash"""
-
-DEFAULT_LIMIT = 10000
-"""Default number of results of ``list_content``."""
+from .interface import ObjId, ObjStorageInterface
def compute_hash(content, algo=ID_HASH_ALGO):
@@ -56,28 +47,43 @@
class NullDecompressor:
- def decompress(self, data):
+ def decompress(self, data: bytes) -> bytes:
return data
@property
- def unused_data(self):
+ def unused_data(self) -> bytes:
return b""
-decompressors = {
- "bz2": bz2.BZ2Decompressor,
- "lzma": lzma.LZMADecompressor,
- "gzip": lambda: zlib.decompressobj(wbits=31),
- "zlib": zlib.decompressobj,
- "none": NullDecompressor,
+class _CompressorProtocol:
+ def compress(self, data: bytes) -> bytes:
+ ...
+
+ def flush(self) -> bytes:
+ ...
+
+
+class _DecompressorProtocol:
+ def decompress(self, data: bytes) -> bytes:
+ ...
+
+ unused_data: bytes
+
+
+decompressors: Dict[str, Callable[[], _DecompressorProtocol]] = {
+ "bz2": bz2.BZ2Decompressor, # type: ignore
+ "lzma": lzma.LZMADecompressor, # type: ignore
+ "gzip": lambda: zlib.decompressobj(wbits=31), # type: ignore
+ "zlib": zlib.decompressobj, # type: ignore
+ "none": NullDecompressor, # type: ignore
}
-compressors = {
- "bz2": bz2.BZ2Compressor,
- "lzma": lzma.LZMACompressor,
- "gzip": lambda: zlib.compressobj(wbits=31),
- "zlib": zlib.compressobj,
- "none": NullCompressor,
+compressors: Dict[str, Callable[[], _CompressorProtocol]] = {
+ "bz2": bz2.BZ2Compressor, # type: ignore
+ "lzma": lzma.LZMACompressor, # type: ignore
+ "gzip": lambda: zlib.compressobj(wbits=31), # type: ignore
+ "zlib": zlib.compressobj, # type: ignore
+ "none": NullCompressor, # type: ignore
}
@@ -87,19 +93,7 @@
# it becomes needed
self.allow_delete = allow_delete
- @abc.abstractmethod
- def check_config(self, *, check_write):
- pass
-
- @abc.abstractmethod
- def __contains__(self, obj_id):
- pass
-
- @abc.abstractmethod
- def add(self, content, obj_id, check_presence=True):
- pass
-
- def add_batch(self, contents, check_presence=True) -> Dict:
+ def add_batch(self: ObjStorageInterface, contents, check_presence=True) -> Dict:
summary = {"object:add": 0, "object:add:bytes": 0}
for obj_id, content in contents.items():
if check_presence and obj_id in self:
@@ -109,15 +103,13 @@
summary["object:add:bytes"] += len(content)
return summary
- def restore(self, content, obj_id):
+ def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId):
# check_presence to false will erase the potential previous content.
return self.add(content, obj_id, check_presence=False)
- @abc.abstractmethod
- def get(self, obj_id):
- pass
-
- def get_batch(self, obj_ids):
+ def get_batch(
+ self: ObjStorageInterface, obj_ids: List[ObjId]
+ ) -> Iterator[Optional[bytes]]:
for obj_id in obj_ids:
try:
yield self.get(obj_id)
@@ -125,23 +117,19 @@
yield None
@abc.abstractmethod
- def check(self, obj_id):
- pass
-
- @abc.abstractmethod
- def delete(self, obj_id):
+ def delete(self, obj_id: ObjId):
if not self.allow_delete:
raise PermissionError("Delete is not allowed.")
- # Management methods
-
- def get_random(self, batch_size):
+ def get_random(self, batch_size: int) -> Iterable[ObjId]:
pass
- # Streaming methods
-
- def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
+ def list_content(
+ self: ObjStorageInterface,
+ last_obj_id: Optional[ObjId] = None,
+ limit: int = DEFAULT_LIMIT,
+ ) -> Iterator[ObjId]:
it = iter(self)
- if last_obj_id:
- it = dropwhile(lambda x: x <= last_obj_id, it)
+ if last_obj_id is not None:
+ it = dropwhile(last_obj_id.__ge__, it)
return islice(it, limit)
diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py
--- a/swh/objstorage/tests/test_objstorage_pathslicing.py
+++ b/swh/objstorage/tests/test_objstorage_pathslicing.py
@@ -10,8 +10,8 @@
from swh.model import hashutil
from swh.objstorage import exc
+from swh.objstorage.constants import ID_DIGEST_LENGTH
from swh.objstorage.factory import get_objstorage
-from swh.objstorage.objstorage import ID_DIGEST_LENGTH
from .objstorage_testing import ObjStorageTestFixture
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:40 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223443
Attached To
D8014: Add type annotations to all objstorage backends
Event Timeline
Log In to Comment