Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/pathslicing.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from itertools import islice | from itertools import islice | ||||
import os | import os | ||||
import tempfile | import tempfile | ||||
from typing import Iterator, List, Optional | from typing import Iterator, List, Optional | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH | from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH | ||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.interface import ObjId | from swh.objstorage.interface import CompositeObjId, ObjId | ||||
from swh.objstorage.objstorage import ObjStorage, compressors, decompressors | from swh.objstorage.objstorage import ( | ||||
ObjStorage, | |||||
compressors, | |||||
decompressors, | |||||
objid_to_default_hex, | |||||
) | |||||
BUFSIZ = 1048576 | BUFSIZ = 1048576 | ||||
DIR_MODE = 0o755 | DIR_MODE = 0o755 | ||||
FILE_MODE = 0o644 | FILE_MODE = 0o644 | ||||
class PathSlicer: | class PathSlicer: | ||||
▲ Show 20 Lines • Show All 152 Lines • ▼ Show 20 Lines | def check_config(self, *, check_write): | ||||
raise ValueError( | raise ValueError( | ||||
'Unknown compression algorithm "%s" for ' | 'Unknown compression algorithm "%s" for ' | ||||
"PathSlicingObjStorage" % self.compression | "PathSlicingObjStorage" % self.compression | ||||
) | ) | ||||
return True | return True | ||||
def __contains__(self, obj_id: ObjId) -> bool: | def __contains__(self, obj_id: ObjId) -> bool: | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
return os.path.isfile(self.slicer.get_path(hex_obj_id)) | return os.path.isfile(self.slicer.get_path(hex_obj_id)) | ||||
def __iter__(self) -> Iterator[bytes]: | def __iter__(self) -> Iterator[CompositeObjId]: | ||||
"""Iterate over the object identifiers currently available in the | """Iterate over the object identifiers currently available in the | ||||
storage. | storage. | ||||
Warning: with the current implementation of the object | Warning: with the current implementation of the object | ||||
storage, this method will walk the filesystem to list objects, | storage, this method will walk the filesystem to list objects, | ||||
meaning that listing all objects will be very slow for large | meaning that listing all objects will be very slow for large | ||||
storages. You almost certainly don't want to use this method | storages. You almost certainly don't want to use this method | ||||
in production. | in production. | ||||
Show All 29 Lines | def add( | ||||
content: bytes, | content: bytes, | ||||
obj_id: ObjId, | obj_id: ObjId, | ||||
check_presence: bool = True, | check_presence: bool = True, | ||||
) -> None: | ) -> None: | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
# If the object is already present, return immediately. | # If the object is already present, return immediately. | ||||
return | return | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
with self._write_obj_file(hex_obj_id) as f: | with self._write_obj_file(hex_obj_id) as f: | ||||
f.write(compressor.compress(content)) | f.write(compressor.compress(content)) | ||||
f.write(compressor.flush()) | f.write(compressor.flush()) | ||||
def get(self, obj_id: ObjId) -> bytes: | def get(self, obj_id: ObjId) -> bytes: | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
# Open the file and return its content as bytes | # Open the file and return its content as bytes | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
with open(self.slicer.get_path(hex_obj_id), "rb") as f: | with open(self.slicer.get_path(hex_obj_id), "rb") as f: | ||||
out = d.decompress(f.read()) | out = d.decompress(f.read()) | ||||
if d.unused_data: | if d.unused_data: | ||||
raise Error( | raise Error( | ||||
"Corrupt object %s: trailing data found" % hex_obj_id, | "Corrupt object %s: trailing data found" % hex_obj_id, | ||||
) | ) | ||||
return out | return out | ||||
def check(self, obj_id: ObjId) -> None: | def check(self, obj_id: ObjId) -> None: | ||||
try: | try: | ||||
data = self.get(obj_id) | data = self.get(obj_id) | ||||
except OSError: | except OSError: | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
raise Error( | raise Error( | ||||
"Corrupt object %s: not a proper compressed file" % hex_obj_id, | "Corrupt object %s: not a proper compressed file" % hex_obj_id, | ||||
) | ) | ||||
checksums = hashutil.MultiHash.from_data( | checksums = hashutil.MultiHash.from_data( | ||||
data, hash_names=[ID_HASH_ALGO] | data, hash_names=[ID_HASH_ALGO] | ||||
).digest() | ).digest() | ||||
actual_obj_id = checksums[ID_HASH_ALGO] | actual_obj_sha1 = checksums[ID_HASH_ALGO] | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
if hex_obj_id != hashutil.hash_to_hex(actual_obj_id): | if hex_obj_id != hashutil.hash_to_hex(actual_obj_sha1): | ||||
raise Error( | raise Error( | ||||
"Corrupt object %s should have id %s" | "Corrupt object %s should have id %s" | ||||
% (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) | % (objid_to_default_hex(obj_id), hashutil.hash_to_hex(actual_obj_sha1)) | ||||
) | ) | ||||
def delete(self, obj_id: ObjId): | def delete(self, obj_id: ObjId): | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
try: | try: | ||||
os.remove(self.slicer.get_path(hex_obj_id)) | os.remove(self.slicer.get_path(hex_obj_id)) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
return True | return True | ||||
# Streaming methods | # Streaming methods | ||||
@contextmanager | @contextmanager | ||||
def chunk_writer(self, obj_id): | def chunk_writer(self, obj_id): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
with self._write_obj_file(hex_obj_id) as f: | with self._write_obj_file(hex_obj_id) as f: | ||||
yield lambda c: f.write(compressor.compress(c)) | yield lambda c: f.write(compressor.compress(c)) | ||||
f.write(compressor.flush()) | f.write(compressor.flush()) | ||||
def list_content( | def list_content( | ||||
self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT | self, last_obj_id: Optional[ObjId] = None, limit: int = DEFAULT_LIMIT | ||||
) -> Iterator[ObjId]: | ) -> Iterator[CompositeObjId]: | ||||
if last_obj_id: | if last_obj_id: | ||||
it = self.iter_from(last_obj_id) | it = self.iter_from(last_obj_id) | ||||
else: | else: | ||||
it = iter(self) | it = iter(self) | ||||
return islice(it, limit) | return islice(it, limit) | ||||
def iter_from(self, obj_id, n_leaf=False): | def iter_from(self, obj_id, n_leaf=False): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = objid_to_default_hex(obj_id) | ||||
slices = self.slicer.get_slices(hex_obj_id) | slices = self.slicer.get_slices(hex_obj_id) | ||||
rlen = len(self.root.split("/")) | rlen = len(self.root.split("/")) | ||||
i = 0 | i = 0 | ||||
for root, dirs, files in os.walk(self.root): | for root, dirs, files in os.walk(self.root): | ||||
if not dirs: | if not dirs: | ||||
i += 1 | i += 1 | ||||
level = len(root.split("/")) - rlen | level = len(root.split("/")) - rlen | ||||
▲ Show 20 Lines • Show All 50 Lines • Show Last 20 Lines |