Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/seaweedfs/objstorage.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
from itertools import islice | from itertools import islice | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Iterator, Optional | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.interface import ObjId | |||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | ||||
DEFAULT_LIMIT, | DEFAULT_LIMIT, | ||||
ObjStorage, | ObjStorage, | ||||
compressors, | compressors, | ||||
compute_hash, | compute_hash, | ||||
decompressors, | decompressors, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def __len__(self): | ||||
performance applies. | performance applies. | ||||
Returns: | Returns: | ||||
number of objects contained in the storage. | number of objects contained in the storage. | ||||
""" | """ | ||||
return sum(1 for i in self) | return sum(1 for i in self) | ||||
def add(self, content, obj_id, check_presence=True): | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
return obj_id | return obj_id | ||||
def compressor(data): | def compressor(data): | ||||
comp = compressors[self.compression]() | comp = compressors[self.compression]() | ||||
for chunk in data: | yield comp.compress(data) | ||||
yield comp.compress(chunk) | |||||
yield comp.flush() | yield comp.flush() | ||||
if isinstance(content, bytes): | assert isinstance( | ||||
content = [content] | content, bytes | ||||
), "list of content chunks is not supported anymore" | |||||
# XXX should handle streaming correctly... | |||||
self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | self.wf.put(io.BytesIO(b"".join(compressor(content))), self._path(obj_id)) | ||||
return obj_id | return obj_id | ||||
def restore(self, content, obj_id): | def restore(self, content: bytes, obj_id: ObjId): | ||||
return self.add(content, obj_id, check_presence=False) | return self.add(content, obj_id, check_presence=False) | ||||
def get(self, obj_id): | def get(self, obj_id: ObjId) -> bytes: | ||||
try: | try: | ||||
obj = self.wf.get(self._path(obj_id)) | obj = self.wf.get(self._path(obj_id)) | ||||
except Exception: | except Exception: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
ret = d.decompress(obj) | ret = d.decompress(obj) | ||||
if d.unused_data: | if d.unused_data: | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
raise Error("Corrupt object %s: trailing data found" % hex_obj_id) | raise Error("Corrupt object %s: trailing data found" % hex_obj_id) | ||||
return ret | return ret | ||||
def check(self, obj_id): | def check(self, obj_id: ObjId) -> None: | ||||
# Check the content integrity | # Check the content integrity | ||||
obj_content = self.get(obj_id) | obj_content = self.get(obj_id) | ||||
content_obj_id = compute_hash(obj_content) | content_obj_id = compute_hash(obj_content) | ||||
if content_obj_id != obj_id: | if content_obj_id != obj_id: | ||||
raise Error(obj_id) | raise Error(obj_id) | ||||
def delete(self, obj_id): | def delete(self, obj_id: ObjId): | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
self.wf.delete(self._path(obj_id)) | self.wf.delete(self._path(obj_id)) | ||||
return True | return True | ||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content( | ||||
self, | |||||
last_obj_id: Optional[ObjId] = None, | |||||
limit: int = DEFAULT_LIMIT, | |||||
) -> Iterator[ObjId]: | |||||
if last_obj_id: | if last_obj_id: | ||||
objid = hashutil.hash_to_hex(last_obj_id) | objid = hashutil.hash_to_hex(last_obj_id) | ||||
lastfilename = objid | lastfilename = objid | ||||
else: | else: | ||||
lastfilename = None | lastfilename = None | ||||
for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit): | for fname in islice(self.wf.iterfiles(last_file_name=lastfilename), limit): | ||||
bytehex = fname.rsplit("/", 1)[-1] | bytehex = fname.rsplit("/", 1)[-1] | ||||
yield hashutil.bytehex_to_hash(bytehex.encode()) | yield hashutil.bytehex_to_hash(bytehex.encode()) | ||||
Show All 22 Lines |