Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/pathslicing.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from itertools import islice | from itertools import islice | ||||
import os | import os | ||||
import random | |||||
import tempfile | import tempfile | ||||
from typing import Iterable, Iterator, List, Optional | from typing import Iterator, List, Optional | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH | from swh.objstorage.constants import DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH | ||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | ||||
from swh.objstorage.interface import ObjId | from swh.objstorage.interface import ObjId | ||||
from swh.objstorage.objstorage import ObjStorage, compressors, decompressors | from swh.objstorage.objstorage import ObjStorage, compressors, decompressors | ||||
BUFSIZ = 1048576 | BUFSIZ = 1048576 | ||||
▲ Show 20 Lines • Show All 263 Lines • ▼ Show 20 Lines | def delete(self, obj_id: ObjId): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
try: | try: | ||||
os.remove(self.slicer.get_path(hex_obj_id)) | os.remove(self.slicer.get_path(hex_obj_id)) | ||||
except FileNotFoundError: | except FileNotFoundError: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
return True | return True | ||||
# Management methods | |||||
def get_random(self, batch_size: int) -> Iterable[ObjId]: | |||||
def get_random_content(self, batch_size): | |||||
"""Get a batch of content inside a single directory. | |||||
Returns: | |||||
a tuple (batch size, batch). | |||||
""" | |||||
dirs = [] | |||||
for level in range(len(self.slicer)): | |||||
path = os.path.join(self.root, *dirs) | |||||
dir_list = next(os.walk(path))[1] | |||||
if "tmp" in dir_list: | |||||
dir_list.remove("tmp") | |||||
dirs.append(random.choice(dir_list)) | |||||
path = os.path.join(self.root, *dirs) | |||||
content_list = next(os.walk(path))[2] | |||||
length = min(batch_size, len(content_list)) | |||||
return ( | |||||
length, | |||||
map(hashutil.hash_to_bytes, random.sample(content_list, length)), | |||||
) | |||||
while batch_size: | |||||
length, it = get_random_content(self, batch_size) | |||||
batch_size = batch_size - length | |||||
yield from it | |||||
# Streaming methods | # Streaming methods | ||||
@contextmanager | @contextmanager | ||||
def chunk_writer(self, obj_id): | def chunk_writer(self, obj_id): | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
with self._write_obj_file(hex_obj_id) as f: | with self._write_obj_file(hex_obj_id) as f: | ||||
yield lambda c: f.write(compressor.compress(c)) | yield lambda c: f.write(compressor.compress(c)) | ||||
▲ Show 20 Lines • Show All 71 Lines • Show Last 20 Lines |