Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/multiplexer/striping_objstorage.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import queue | import queue | ||||
from typing import Dict | from typing import Dict | ||||
from typing_extensions import Literal | |||||
from swh.objstorage.interface import ObjId | |||||
from swh.objstorage.multiplexer.multiplexer_objstorage import ( | from swh.objstorage.multiplexer.multiplexer_objstorage import ( | ||||
MultiplexerObjStorage, | MultiplexerObjStorage, | ||||
ObjStorageThread, | ObjStorageThread, | ||||
) | ) | ||||
class StripingObjStorage(MultiplexerObjStorage): | class StripingObjStorage(MultiplexerObjStorage): | ||||
"""Stripes objects across multiple objstorages | """Stripes objects across multiple objstorages | ||||
This objstorage implementation will write objects to objstorages in a | This objstorage implementation will write objects to objstorages in a | ||||
predictable way: it takes the modulo of the last 8 bytes of the object | predictable way: it takes the modulo of the last 8 bytes of the object | ||||
identifier with the number of object storages passed, which will yield an | identifier with the number of object storages passed, which will yield an | ||||
(almost) even distribution. | (almost) even distribution. | ||||
Objects are read from all storages in turn until it succeeds. | Objects are read from all storages in turn until it succeeds. | ||||
""" | """ | ||||
MOD_BYTES = 8 | MOD_BYTES = 8 | ||||
PRIMARY_HASH: Literal["sha1"] = "sha1" | |||||
def __init__(self, storages, **kwargs): | def __init__(self, storages, **kwargs): | ||||
super().__init__(storages, **kwargs) | super().__init__(storages, **kwargs) | ||||
self.num_storages = len(storages) | self.num_storages = len(storages) | ||||
def get_storage_index(self, obj_id): | def get_storage_index(self, obj_id: ObjId): | ||||
if obj_id is None: | if obj_id is None: | ||||
raise ValueError("StripingObjStorage always needs obj_id to be set") | raise ValueError("StripingObjStorage always needs obj_id to be set") | ||||
if isinstance(obj_id, dict): | |||||
obj_id = obj_id[self.PRIMARY_HASH] | |||||
index = int.from_bytes(obj_id[: -self.MOD_BYTES], "big") | index = int.from_bytes(obj_id[: -self.MOD_BYTES], "big") | ||||
return index % self.num_storages | return index % self.num_storages | ||||
def get_write_threads(self, obj_id): | def get_write_threads(self, obj_id): | ||||
idx = self.get_storage_index(obj_id) | idx = self.get_storage_index(obj_id) | ||||
yield self.storage_threads[idx] | yield self.storage_threads[idx] | ||||
Show All 32 Lines |