Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/objstorage.py
Show First 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | def add_batch(self: ObjStorageInterface, contents, check_presence=True) -> Dict: | ||||
for obj_id, content in contents.items(): | for obj_id, content in contents.items(): | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
continue | continue | ||||
self.add(content, obj_id, check_presence=False) | self.add(content, obj_id, check_presence=False) | ||||
summary["object:add"] += 1 | summary["object:add"] += 1 | ||||
summary["object:add:bytes"] += len(content) | summary["object:add:bytes"] += len(content) | ||||
return summary | return summary | ||||
def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId): | def restore(self: ObjStorageInterface, content: bytes, obj_id: ObjId) -> None: | ||||
# check_presence to false will erase the potential previous content. | # check_presence to false will erase the potential previous content. | ||||
return self.add(content, obj_id, check_presence=False) | self.add(content, obj_id, check_presence=False) | ||||
def get_batch( | def get_batch( | ||||
self: ObjStorageInterface, obj_ids: List[ObjId] | self: ObjStorageInterface, obj_ids: List[ObjId] | ||||
) -> Iterator[Optional[bytes]]: | ) -> Iterator[Optional[bytes]]: | ||||
for obj_id in obj_ids: | for obj_id in obj_ids: | ||||
try: | try: | ||||
yield self.get(obj_id) | yield self.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
Show All 19 Lines |