Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/multiplexer/multiplexer_objstorage.py
Show First 20 Lines • Show All 217 Lines • ▼ Show 20 Lines | class MultiplexerObjStorage(ObjStorage): | ||||
def __iter__(self): | def __iter__(self): | ||||
def obj_iterator(): | def obj_iterator(): | ||||
for storage in self.storages: | for storage in self.storages: | ||||
yield from storage | yield from storage | ||||
return obj_iterator() | return obj_iterator() | ||||
def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: | ||||
"""Add a new object to the object storage. | """Add a new object to the object storage. | ||||
If the adding step works in all the storages that accept this content, | If the adding step works in all the storages that accept this content, | ||||
this is a success. Otherwise, the full adding step is an error even if | this is a success. Otherwise, the full adding step is an error even if | ||||
it succeed in some of the storages. | it succeed in some of the storages. | ||||
Args: | Args: | ||||
content: content of the object to be added to the storage. | content: content of the object to be added to the storage. | ||||
obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When | obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When | ||||
given, obj_id will be trusted to match the bytes. If missing, | given, obj_id will be trusted to match the bytes. If missing, | ||||
obj_id will be computed on the fly. | obj_id will be computed on the fly. | ||||
check_presence: indicate if the presence of the content should be | check_presence: indicate if the presence of the content should be | ||||
verified before adding the file. | verified before adding the file. | ||||
Returns: | Returns: | ||||
an id of the object into the storage. As the write-storages are | an id of the object into the storage. As the write-storages are | ||||
always readable as well, any id will be valid to retrieve a | always readable as well, any id will be valid to retrieve a | ||||
content. | content. | ||||
""" | """ | ||||
results = self.wrap_call( | self.wrap_call( | ||||
self.get_write_threads(obj_id), | self.get_write_threads(obj_id), | ||||
"add", | "add", | ||||
content, | content, | ||||
obj_id=obj_id, | obj_id=obj_id, | ||||
check_presence=check_presence, | check_presence=check_presence, | ||||
) | ) | ||||
for result in results: | |||||
if not result: | |||||
continue | |||||
return result | |||||
assert False, "No backend objstorage configured" | |||||
def add_batch(self, contents, check_presence=True) -> Dict: | def add_batch(self, contents, check_presence=True) -> Dict: | ||||
"""Add a batch of new objects to the object storage.""" | """Add a batch of new objects to the object storage.""" | ||||
write_threads = list(self.get_write_threads()) | write_threads = list(self.get_write_threads()) | ||||
results = self.wrap_call( | results = self.wrap_call( | ||||
write_threads, | write_threads, | ||||
"add_batch", | "add_batch", | ||||
contents, | contents, | ||||
check_presence=check_presence, | check_presence=check_presence, | ||||
) | ) | ||||
summed = {"object:add": 0, "object:add:bytes": 0} | summed = {"object:add": 0, "object:add:bytes": 0} | ||||
for result in results: | for result in results: | ||||
summed["object:add"] += result["object:add"] | summed["object:add"] += result["object:add"] | ||||
summed["object:add:bytes"] += result["object:add:bytes"] | summed["object:add:bytes"] += result["object:add:bytes"] | ||||
return { | return { | ||||
"object:add": summed["object:add"] // len(results), | "object:add": summed["object:add"] // len(results), | ||||
"object:add:bytes": summed["object:add:bytes"] // len(results), | "object:add:bytes": summed["object:add:bytes"] // len(results), | ||||
} | } | ||||
def restore(self, content: bytes, obj_id: ObjId): | def restore(self, content: bytes, obj_id: ObjId) -> None: | ||||
return self.wrap_call( | return self.wrap_call( | ||||
self.get_write_threads(obj_id), | self.get_write_threads(obj_id), | ||||
"restore", | "restore", | ||||
content, | content, | ||||
obj_id=obj_id, | obj_id=obj_id, | ||||
).pop() | ).pop() | ||||
def get(self, obj_id: ObjId) -> bytes: | def get(self, obj_id: ObjId) -> bytes: | ||||
▲ Show 20 Lines • Show All 44 Lines • Show Last 20 Lines |