Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/objstorage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import bz2 | import bz2 | ||||
from itertools import dropwhile, islice | from itertools import dropwhile, islice | ||||
import lzma | import lzma | ||||
from typing import Dict | from typing import Callable, Dict, Iterable, Iterator, List, Optional | ||||
import zlib | import zlib | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from .constants import DEFAULT_LIMIT, ID_HASH_ALGO | |||||
from .exc import ObjNotFoundError | from .exc import ObjNotFoundError | ||||
from .interface import ObjId, ObjStorageInterface | |||||
ID_HASH_ALGO = "sha1" | |||||
ID_HEXDIGEST_LENGTH = 40 | |||||
"""Size in bytes of the hash hexadecimal representation.""" | |||||
ID_DIGEST_LENGTH = 20 | |||||
"""Size in bytes of the hash""" | |||||
DEFAULT_LIMIT = 10000 | |||||
"""Default number of results of ``list_content``.""" | |||||
def compute_hash(content, algo=ID_HASH_ALGO): | def compute_hash(content, algo=ID_HASH_ALGO): | ||||
"""Compute the content's hash. | """Compute the content's hash. | ||||
Args: | Args: | ||||
content (bytes): The raw content to hash | content (bytes): The raw content to hash | ||||
hash_name (str): Hash's name (default to ID_HASH_ALGO) | hash_name (str): Hash's name (default to ID_HASH_ALGO) | ||||
Show All 16 Lines | class NullCompressor: | ||||
def compress(self, data): | def compress(self, data): | ||||
return data | return data | ||||
def flush(self): | def flush(self): | ||||
return b"" | return b"" | ||||
class NullDecompressor: | class NullDecompressor: | ||||
def decompress(self, data): | def decompress(self, data: bytes) -> bytes: | ||||
return data | return data | ||||
@property | @property | ||||
def unused_data(self): | def unused_data(self) -> bytes: | ||||
return b"" | return b"" | ||||
decompressors = { | class _CompressorProtocol: | ||||
"bz2": bz2.BZ2Decompressor, | def compress(self, data: bytes) -> bytes: | ||||
"lzma": lzma.LZMADecompressor, | ... | ||||
"gzip": lambda: zlib.decompressobj(wbits=31), | |||||
"zlib": zlib.decompressobj, | def flush(self) -> bytes: | ||||
"none": NullDecompressor, | ... | ||||
class _DecompressorProtocol: | |||||
def decompress(self, data: bytes) -> bytes: | |||||
... | |||||
unused_data: bytes | |||||
decompressors: Dict[str, Callable[[], _DecompressorProtocol]] = { | |||||
"bz2": bz2.BZ2Decompressor, # type: ignore | |||||
"lzma": lzma.LZMADecompressor, # type: ignore | |||||
"gzip": lambda: zlib.decompressobj(wbits=31), # type: ignore | |||||
"zlib": zlib.decompressobj, # type: ignore | |||||
"none": NullDecompressor, # type: ignore | |||||
} | } | ||||
compressors = { | compressors: Dict[str, Callable[[], _CompressorProtocol]] = { | ||||
"bz2": bz2.BZ2Compressor, | "bz2": bz2.BZ2Compressor, # type: ignore | ||||
"lzma": lzma.LZMACompressor, | "lzma": lzma.LZMACompressor, # type: ignore | ||||
"gzip": lambda: zlib.compressobj(wbits=31), | "gzip": lambda: zlib.compressobj(wbits=31), # type: ignore | ||||
"zlib": zlib.compressobj, | "zlib": zlib.compressobj, # type: ignore | ||||
"none": NullCompressor, | "none": NullCompressor, # type: ignore | ||||
} | } | ||||
class ObjStorage(metaclass=abc.ABCMeta): | class ObjStorage(metaclass=abc.ABCMeta): | ||||
def __init__(self, *, allow_delete=False, **kwargs): | def __init__(self, *, allow_delete=False, **kwargs): | ||||
# A more complete permission system could be used in place of that if | # A more complete permission system could be used in place of that if | ||||
# it becomes needed | # it becomes needed | ||||
self.allow_delete = allow_delete | self.allow_delete = allow_delete | ||||
@abc.abstractmethod | def add_batch(self: ObjStorageInterface, contents, check_presence=True) -> Dict: | ||||
def check_config(self, *, check_write): | |||||
pass | |||||
@abc.abstractmethod | |||||
def __contains__(self, obj_id): | |||||
pass | |||||
@abc.abstractmethod | |||||
def add(self, content, obj_id, check_presence=True): | |||||
pass | |||||
def add_batch(self, contents, check_presence=True) -> Dict: | |||||
summary = {"object:add": 0, "object:add:bytes": 0} | summary = {"object:add": 0, "object:add:bytes": 0} | ||||
for obj_id, content in contents.items(): | for obj_id, content in contents.items(): | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
continue | continue | ||||
self.add(content, obj_id, check_presence=False) | self.add(content, obj_id, check_presence=False) | ||||
summary["object:add"] += 1 | summary["object:add"] += 1 | ||||
summary["object:add:bytes"] += len(content) | summary["object:add:bytes"] += len(content) | ||||
return summary | return summary | ||||
def restore(self, content, obj_id): | def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId): | ||||
# check_presence to false will erase the potential previous content. | # check_presence to false will erase the potential previous content. | ||||
return self.add(content, obj_id, check_presence=False) | return self.add(content, obj_id, check_presence=False) | ||||
@abc.abstractmethod | def get_batch( | ||||
def get(self, obj_id): | self: ObjStorageInterface, obj_ids: List[ObjId] | ||||
pass | ) -> Iterator[Optional[bytes]]: | ||||
def get_batch(self, obj_ids): | |||||
for obj_id in obj_ids: | for obj_id in obj_ids: | ||||
try: | try: | ||||
yield self.get(obj_id) | yield self.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
yield None | yield None | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def check(self, obj_id): | def delete(self, obj_id: ObjId): | ||||
pass | |||||
@abc.abstractmethod | |||||
def delete(self, obj_id): | |||||
if not self.allow_delete: | if not self.allow_delete: | ||||
raise PermissionError("Delete is not allowed.") | raise PermissionError("Delete is not allowed.") | ||||
# Management methods | def get_random(self, batch_size: int) -> Iterable[ObjId]: | ||||
def get_random(self, batch_size): | |||||
pass | pass | ||||
# Streaming methods | def list_content( | ||||
self: ObjStorageInterface, | |||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | last_obj_id: Optional[ObjId] = None, | ||||
limit: int = DEFAULT_LIMIT, | |||||
) -> Iterator[ObjId]: | |||||
it = iter(self) | it = iter(self) | ||||
if last_obj_id: | if last_obj_id is not None: | ||||
it = dropwhile(lambda x: x <= last_obj_id, it) | it = dropwhile(last_obj_id.__ge__, it) | ||||
return islice(it, limit) | return islice(it, limit) |