Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/multiplexer/multiplexer_objstorage.py
Show First 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | Example 2: | ||||
If a content starting with 0, 1 or 2 is present in the storage_v1, it | If a content starting with 0, 1 or 2 is present in the storage_v1, it | ||||
would be ignored anyway. | would be ignored anyway. | ||||
""" | """ | ||||
def __init__(self, storages): | def __init__(self, storages): | ||||
self.storages = storages | self.storages = storages | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | ||||
""" Indicates if the given object is present in the storage | |||||
See base class [ObjStorage]. | |||||
""" | |||||
for storage in self.storages: | for storage in self.storages: | ||||
if obj_id in storage: | if obj_id in storage: | ||||
return True | return True | ||||
return False | return False | ||||
def __iter__(self): | def __iter__(self): | ||||
""" Iterates over the content of each storages | |||||
Due to the demultiplexer nature, same content can be in multiple | |||||
storages and may be yielded multiple times. | |||||
Warning: The `__iter__` methods frequently have bad performance. You | |||||
almost certainly don't want to use this method in production. | |||||
""" | |||||
for storage in self.storages: | for storage in self.storages: | ||||
yield from storage | yield from storage | ||||
def __len__(self): | def __len__(self): | ||||
""" Returns the number of files in the storage. | """ Compute the number of objects in the current object storage. | ||||
Identical objects present in multiple storages will be counted as | |||||
multiple objects. | |||||
Warning: this currently uses `__iter__`, its warning about bad | |||||
performance applies. | |||||
Warning: Multiple files may represent the same content, so this method | Returns: | ||||
does not indicate how many different contents are in the storage. | number of objects contained in the storage. | ||||
""" | """ | ||||
return sum(map(len, self.storages)) | return sum(map(len, self.storages)) | ||||
def add(self, content, obj_id=None, check_presence=True): | def add(self, content, obj_id=None, check_presence=True): | ||||
""" Add a new object to the object storage. | """ Add a new object to the object storage. | ||||
If the adding step works in all the storages that accept this content, | If the adding step works in all the storages that accept this content, | ||||
this is a success. Otherwise, the full adding step is an error even if | this is a success. Otherwise, the full adding step is an error even if | ||||
Show All 13 Lines | def add(self, content, obj_id=None, check_presence=True): | ||||
content. | content. | ||||
""" | """ | ||||
return [storage.add(content, obj_id, check_presence) | return [storage.add(content, obj_id, check_presence) | ||||
for storage in self.storages].pop() | for storage in self.storages].pop() | ||||
def restore(self, content, obj_id=None): | def restore(self, content, obj_id=None): | ||||
""" Restore a content that have been corrupted. | """ Restore a content that have been corrupted. | ||||
This function is identical to add_bytes but does not check if | See base class [ObjStorage] & self.add() method. | ||||
the object id is already in the file system. | |||||
(see "add" method) | |||||
Args: | |||||
content: content of the object to be added to the storage | |||||
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When | |||||
given, obj_id will be trusted to match bytes. If missing, | |||||
obj_id will be computed on the fly. | |||||
Returns: | |||||
an id of the object into the storage. As the write-storages are | |||||
always readable as well, any id will be valid to retrieve a | |||||
content. | |||||
""" | """ | ||||
return [storage.restore(content, obj_id) | return [storage.restore(content, obj_id) | ||||
for storage in self.storages].pop() | for storage in self.storages].pop() | ||||
def get(self, obj_id): | def get(self, obj_id): | ||||
""" Retrieve the content of a given object. | """ Retrieve the content of a given object. | ||||
Args: | See base class [ObjStorage]. | ||||
obj_id: object id. | |||||
Returns: | |||||
the content of the requested object as bytes. | |||||
Raises: | |||||
ObjNotFoundError: if the requested object is missing. | |||||
""" | """ | ||||
for storage in self.storages: | for storage in self.storages: | ||||
try: | try: | ||||
return storage.get(obj_id) | return storage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
continue | continue | ||||
# If no storage contains this content, raise the error | # If no storage contains this content, raise the error | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
def check(self, obj_id): | def check(self, obj_id): | ||||
""" Perform an integrity check for a given object. | """ Perform an integrity check for a given object. | ||||
Verify that the file object is in place and that the gziped content | See base class [ObjStorage]. | ||||
matches the object id. | |||||
Args: | |||||
obj_id: object id. | |||||
Raises: | |||||
ObjNotFoundError: if the requested object is missing. | |||||
Error: if the request object is corrupted. | |||||
""" | """ | ||||
nb_present = 0 | nb_present = 0 | ||||
for storage in self.storages: | for storage in self.storages: | ||||
try: | try: | ||||
storage.check(obj_id) | storage.check(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
continue | continue | ||||
else: | else: | ||||
nb_present += 1 | nb_present += 1 | ||||
# If there is an Error because of a corrupted file, then let it pass. | # If there is an Error because of a corrupted file, then let it pass. | ||||
# Raise the ObjNotFoundError only if the content coulnd't be found in | # Raise the ObjNotFoundError only if the content coulnd't be found in | ||||
# all the storages. | # all the storages. | ||||
if nb_present == 0: | if nb_present == 0: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
def get_random(self, batch_size): | def get_random(self, batch_size): | ||||
""" Get random ids of existing contents | """ Get random ids of existing contents | ||||
This method is used in order to get random ids to perform | See base class [ObjStorage]. | ||||
content integrity verifications on random contents. | |||||
Attributes: | |||||
batch_size (int): Number of ids that will be given | |||||
Yields: | |||||
An iterable of ids of contents that are in the current object | |||||
storage. | |||||
""" | """ | ||||
storages_set = [storage for storage in self.storages | storages_set = [storage for storage in self.storages | ||||
if len(storage) > 0] | if len(storage) > 0] | ||||
if len(storages_set) <= 0: | if len(storages_set) <= 0: | ||||
return [] | return [] | ||||
while storages_set: | while storages_set: | ||||
storage = random.choice(storages_set) | storage = random.choice(storages_set) | ||||
Show All 9 Lines |