Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/multiplexer/multiplexer_objstorage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import queue | import queue | ||||
import random | import random | ||||
import threading | import threading | ||||
from typing import Dict | from typing import Dict, Iterable | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.interface import ObjId | |||||
from swh.objstorage.objstorage import ObjStorage | from swh.objstorage.objstorage import ObjStorage | ||||
class ObjStorageThread(threading.Thread): | class ObjStorageThread(threading.Thread): | ||||
def __init__(self, storage): | def __init__(self, storage): | ||||
super().__init__(daemon=True) | super().__init__(daemon=True) | ||||
self.storage = storage | self.storage = storage | ||||
self.commands = queue.Queue() | self.commands = queue.Queue() | ||||
▲ Show 20 Lines • Show All 197 Lines • ▼ Show 20 Lines | class MultiplexerObjStorage(ObjStorage): | ||||
def __iter__(self): | def __iter__(self): | ||||
def obj_iterator(): | def obj_iterator(): | ||||
for storage in self.storages: | for storage in self.storages: | ||||
yield from storage | yield from storage | ||||
return obj_iterator() | return obj_iterator() | ||||
def add(self, content, obj_id, check_presence=True): | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | ||||
"""Add a new object to the object storage. | """Add a new object to the object storage. | ||||
If the adding step works in all the storages that accept this content, | If the adding step works in all the storages that accept this content, | ||||
this is a success. Otherwise, the full adding step is an error even if | this is a success. Otherwise, the full adding step is an error even if | ||||
it succeed in some of the storages. | it succeed in some of the storages. | ||||
Args: | Args: | ||||
content: content of the object to be added to the storage. | content: content of the object to be added to the storage. | ||||
Show All 16 Lines | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | ||||
check_presence=check_presence, | check_presence=check_presence, | ||||
) | ) | ||||
for result in results: | for result in results: | ||||
if not result: | if not result: | ||||
continue | continue | ||||
return result | return result | ||||
assert False, "No backend objstorage configured" | |||||
def add_batch(self, contents, check_presence=True) -> Dict: | def add_batch(self, contents, check_presence=True) -> Dict: | ||||
"""Add a batch of new objects to the object storage.""" | """Add a batch of new objects to the object storage.""" | ||||
write_threads = list(self.get_write_threads()) | write_threads = list(self.get_write_threads()) | ||||
results = self.wrap_call( | results = self.wrap_call( | ||||
write_threads, | write_threads, | ||||
"add_batch", | "add_batch", | ||||
contents, | contents, | ||||
check_presence=check_presence, | check_presence=check_presence, | ||||
) | ) | ||||
summed = {"object:add": 0, "object:add:bytes": 0} | summed = {"object:add": 0, "object:add:bytes": 0} | ||||
for result in results: | for result in results: | ||||
summed["object:add"] += result["object:add"] | summed["object:add"] += result["object:add"] | ||||
summed["object:add:bytes"] += result["object:add:bytes"] | summed["object:add:bytes"] += result["object:add:bytes"] | ||||
return { | return { | ||||
"object:add": summed["object:add"] // len(results), | "object:add": summed["object:add"] // len(results), | ||||
"object:add:bytes": summed["object:add:bytes"] // len(results), | "object:add:bytes": summed["object:add:bytes"] // len(results), | ||||
} | } | ||||
def restore(self, content, obj_id): | def restore(self, content: bytes, obj_id: ObjId): | ||||
return self.wrap_call( | return self.wrap_call( | ||||
self.get_write_threads(obj_id), | self.get_write_threads(obj_id), | ||||
"restore", | "restore", | ||||
content, | content, | ||||
obj_id=obj_id, | obj_id=obj_id, | ||||
).pop() | ).pop() | ||||
def get(self, obj_id): | def get(self, obj_id: ObjId) -> bytes: | ||||
for storage in self.get_read_threads(obj_id): | for storage in self.get_read_threads(obj_id): | ||||
try: | try: | ||||
return storage.get(obj_id) | return storage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
continue | continue | ||||
# If no storage contains this content, raise the error | # If no storage contains this content, raise the error | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
def check(self, obj_id): | def check(self, obj_id: ObjId) -> None: | ||||
nb_present = 0 | nb_present = 0 | ||||
for storage in self.get_read_threads(obj_id): | for storage in self.get_read_threads(obj_id): | ||||
try: | try: | ||||
storage.check(obj_id) | storage.check(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
continue | continue | ||||
else: | else: | ||||
nb_present += 1 | nb_present += 1 | ||||
# If there is an Error because of a corrupted file, then let it pass. | # If there is an Error because of a corrupted file, then let it pass. | ||||
# Raise the ObjNotFoundError only if the content couldn't be found in | # Raise the ObjNotFoundError only if the content couldn't be found in | ||||
# all the storages. | # all the storages. | ||||
if nb_present == 0: | if nb_present == 0: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
def delete(self, obj_id): | def delete(self, obj_id: ObjId): | ||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | ||||
return all(self.wrap_call(self.get_write_threads(obj_id), "delete", obj_id)) | return all(self.wrap_call(self.get_write_threads(obj_id), "delete", obj_id)) | ||||
def get_random(self, batch_size): | def get_random(self, batch_size: int) -> Iterable[ObjId]: | ||||
storages_set = [storage for storage in self.storages if len(storage) > 0] | storages_set = [storage for storage in self.storages if len(storage) > 0] | ||||
if len(storages_set) <= 0: | if len(storages_set) <= 0: | ||||
return [] | return [] | ||||
while storages_set: | while storages_set: | ||||
storage = random.choice(storages_set) | storage = random.choice(storages_set) | ||||
try: | try: | ||||
return storage.get_random(batch_size) | return storage.get_random(batch_size) | ||||
except NotImplementedError: | except NotImplementedError: | ||||
storages_set.remove(storage) | storages_set.remove(storage) | ||||
# There is no storage that allow the get_random operation | # There is no storage that allow the get_random operation | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"There is no storage implementation into the multiplexer that " | "There is no storage implementation into the multiplexer that " | ||||
"support the 'get_random' operation" | "support the 'get_random' operation" | ||||
) | ) |