Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/multiplexer/multiplexer_objstorage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import queue | import queue | ||||
import random | |||||
import threading | import threading | ||||
from typing import Dict, Iterable | from typing import Dict | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.interface import ObjId | from swh.objstorage.interface import ObjId | ||||
from swh.objstorage.objstorage import ObjStorage | from swh.objstorage.objstorage import ObjStorage | ||||
class ObjStorageThread(threading.Thread): | class ObjStorageThread(threading.Thread): | ||||
def __init__(self, storage): | def __init__(self, storage): | ||||
▲ Show 20 Lines • Show All 284 Lines • ▼ Show 20 Lines | def check(self, obj_id: ObjId) -> None: | ||||
# Raise the ObjNotFoundError only if the content couldn't be found in | # Raise the ObjNotFoundError only if the content couldn't be found in | ||||
# all the storages. | # all the storages. | ||||
if nb_present == 0: | if nb_present == 0: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
def delete(self, obj_id: ObjId): | def delete(self, obj_id: ObjId): | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
return all(self.wrap_call(self.get_write_threads(obj_id), "delete", obj_id)) | return all(self.wrap_call(self.get_write_threads(obj_id), "delete", obj_id)) | ||||
def get_random(self, batch_size: int) -> Iterable[ObjId]: | |||||
storages_set = [storage for storage in self.storages if len(storage) > 0] | |||||
if len(storages_set) <= 0: | |||||
return [] | |||||
while storages_set: | |||||
storage = random.choice(storages_set) | |||||
try: | |||||
return storage.get_random(batch_size) | |||||
except NotImplementedError: | |||||
storages_set.remove(storage) | |||||
# There is no storage that allow the get_random operation | |||||
raise NotImplementedError( | |||||
"There is no storage implementation into the multiplexer that " | |||||
"support the 'get_random' operation" | |||||
) |