Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/pathslicing.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections.abc import Iterator | |||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from itertools import islice | from itertools import islice | ||||
import os | import os | ||||
import random | import random | ||||
import tempfile | import tempfile | ||||
from typing import List | from typing import Iterable, Iterator, List, Optional | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH | |||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.objstorage import ( | from swh.objstorage.interface import ObjId | ||||
DEFAULT_LIMIT, | from swh.objstorage.objstorage import ObjStorage, compressors, decompressors | ||||
ID_HASH_ALGO, | |||||
ID_HEXDIGEST_LENGTH, | |||||
ObjStorage, | |||||
compressors, | |||||
decompressors, | |||||
) | |||||
BUFSIZ = 1048576 | BUFSIZ = 1048576 | ||||
DIR_MODE = 0o755 | DIR_MODE = 0o755 | ||||
FILE_MODE = 0o644 | FILE_MODE = 0o644 | ||||
class PathSlicer: | class PathSlicer: | ||||
▲ Show 20 Lines • Show All 151 Lines • ▼ Show 20 Lines | def check_config(self, *, check_write): | ||||
if self.compression not in compressors: | if self.compression not in compressors: | ||||
raise ValueError( | raise ValueError( | ||||
'Unknown compression algorithm "%s" for ' | 'Unknown compression algorithm "%s" for ' | ||||
"PathSlicingObjStorage" % self.compression | "PathSlicingObjStorage" % self.compression | ||||
) | ) | ||||
return True | return True | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id: ObjId) -> bool: | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
return os.path.isfile(self.slicer.get_path(hex_obj_id)) | return os.path.isfile(self.slicer.get_path(hex_obj_id)) | ||||
def __iter__(self): | def __iter__(self) -> Iterator[bytes]: | ||||
"""Iterate over the object identifiers currently available in the | """Iterate over the object identifiers currently available in the | ||||
storage. | storage. | ||||
Warning: with the current implementation of the object | Warning: with the current implementation of the object | ||||
storage, this method will walk the filesystem to list objects, | storage, this method will walk the filesystem to list objects, | ||||
meaning that listing all objects will be very slow for large | meaning that listing all objects will be very slow for large | ||||
storages. You almost certainly don't want to use this method | storages. You almost certainly don't want to use this method | ||||
in production. | in production. | ||||
Return: | Return: | ||||
Iterator over object IDs | Iterator over object IDs | ||||
""" | """ | ||||
def obj_iterator(): | def obj_iterator(): | ||||
# XXX hackish: it does not verify that the depth of found files | # XXX hackish: it does not verify that the depth of found files | ||||
# matches the slicing depth of the storage | # matches the slicing depth of the storage | ||||
for root, _dirs, files in os.walk(self.root): | for root, _dirs, files in os.walk(self.root): | ||||
_dirs.sort() | _dirs.sort() | ||||
for f in sorted(files): | for f in sorted(files): | ||||
yield bytes.fromhex(f) | yield bytes.fromhex(f) | ||||
return obj_iterator() | return obj_iterator() | ||||
def __len__(self): | def __len__(self) -> int: | ||||
"""Compute the number of objects available in the storage. | """Compute the number of objects available in the storage. | ||||
Warning: this currently uses `__iter__`, its warning about bad | Warning: this currently uses `__iter__`, its warning about bad | ||||
performances applies | performances applies | ||||
Return: | Return: | ||||
number of objects contained in the storage | number of objects contained in the storage | ||||
""" | """ | ||||
return sum(1 for i in self) | return sum(1 for i in self) | ||||
def add(self, content, obj_id, check_presence=True): | def add( | ||||
self, | |||||
content: bytes, | |||||
obj_id: ObjId, | |||||
check_presence: bool = True, | |||||
) -> ObjId: | |||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
# If the object is already present, return immediately. | # If the object is already present, return immediately. | ||||
return obj_id | return obj_id | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
if not isinstance(content, Iterator): | |||||
content = [content] | |||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
with self._write_obj_file(hex_obj_id) as f: | with self._write_obj_file(hex_obj_id) as f: | ||||
for chunk in content: | f.write(compressor.compress(content)) | ||||
f.write(compressor.compress(chunk)) | |||||
f.write(compressor.flush()) | f.write(compressor.flush()) | ||||
return obj_id | return obj_id | ||||
def get(self, obj_id): | def get(self, obj_id: ObjId) -> bytes: | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
# Open the file and return its content as bytes | # Open the file and return its content as bytes | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
with open(self.slicer.get_path(hex_obj_id), "rb") as f: | with open(self.slicer.get_path(hex_obj_id), "rb") as f: | ||||
out = d.decompress(f.read()) | out = d.decompress(f.read()) | ||||
if d.unused_data: | if d.unused_data: | ||||
raise Error( | raise Error( | ||||
"Corrupt object %s: trailing data found" % hex_obj_id, | "Corrupt object %s: trailing data found" % hex_obj_id, | ||||
) | ) | ||||
return out | return out | ||||
def check(self, obj_id): | def check(self, obj_id: ObjId) -> None: | ||||
try: | try: | ||||
data = self.get(obj_id) | data = self.get(obj_id) | ||||
except OSError: | except OSError: | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
raise Error( | raise Error( | ||||
"Corrupt object %s: not a proper compressed file" % hex_obj_id, | "Corrupt object %s: not a proper compressed file" % hex_obj_id, | ||||
) | ) | ||||
checksums = hashutil.MultiHash.from_data( | checksums = hashutil.MultiHash.from_data( | ||||
data, hash_names=[ID_HASH_ALGO] | data, hash_names=[ID_HASH_ALGO] | ||||
).digest() | ).digest() | ||||
actual_obj_id = checksums[ID_HASH_ALGO] | actual_obj_id = checksums[ID_HASH_ALGO] | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
if hex_obj_id != hashutil.hash_to_hex(actual_obj_id): | if hex_obj_id != hashutil.hash_to_hex(actual_obj_id): | ||||
raise Error( | raise Error( | ||||
"Corrupt object %s should have id %s" | "Corrupt object %s should have id %s" | ||||
% (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) | % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) | ||||
) | ) | ||||
def delete(self, obj_id): | def delete(self, obj_id: ObjId): | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
try: | try: | ||||
os.remove(self.slicer.get_path(hex_obj_id)) | os.remove(self.slicer.get_path(hex_obj_id)) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
return True | return True | ||||
# Management methods | # Management methods | ||||
def get_random(self, batch_size): | def get_random(self, batch_size: int) -> Iterable[ObjId]: | ||||
def get_random_content(self, batch_size): | def get_random_content(self, batch_size): | ||||
"""Get a batch of content inside a single directory. | """Get a batch of content inside a single directory. | ||||
Returns: | Returns: | ||||
a tuple (batch size, batch). | a tuple (batch size, batch). | ||||
""" | """ | ||||
dirs = [] | dirs = [] | ||||
for level in range(len(self.slicer)): | for level in range(len(self.slicer)): | ||||
Show All 21 Lines | class PathSlicingObjStorage(ObjStorage): | ||||
@contextmanager | @contextmanager | ||||
def chunk_writer(self, obj_id): | def chunk_writer(self, obj_id): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
with self._write_obj_file(hex_obj_id) as f: | with self._write_obj_file(hex_obj_id) as f: | ||||
yield lambda c: f.write(compressor.compress(c)) | yield lambda c: f.write(compressor.compress(c)) | ||||
f.write(compressor.flush()) | f.write(compressor.flush()) | ||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content( | ||||
self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT | |||||
) -> Iterator[ObjId]: | |||||
if last_obj_id: | if last_obj_id: | ||||
it = self.iter_from(last_obj_id) | it = self.iter_from(last_obj_id) | ||||
else: | else: | ||||
it = iter(self) | it = iter(self) | ||||
return islice(it, limit) | return islice(it, limit) | ||||
def iter_from(self, obj_id, n_leaf=False): | def iter_from(self, obj_id, n_leaf=False): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
▲ Show 20 Lines • Show All 58 Lines • Show Last 20 Lines |