Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/pathslicing.py
Show First 20 Lines • Show All 221 Lines • ▼ Show 20 Lines | def __len__(self) -> int: | ||||
""" | """ | ||||
return sum(1 for i in self) | return sum(1 for i in self) | ||||
def add( | def add( | ||||
self, | self, | ||||
content: bytes, | content: bytes, | ||||
obj_id: ObjId, | obj_id: ObjId, | ||||
check_presence: bool = True, | check_presence: bool = True, | ||||
) -> ObjId: | ) -> None: | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
# If the object is already present, return immediately. | # If the object is already present, return immediately. | ||||
return obj_id | return | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | ||||
with self._write_obj_file(hex_obj_id) as f: | with self._write_obj_file(hex_obj_id) as f: | ||||
f.write(compressor.compress(content)) | f.write(compressor.compress(content)) | ||||
f.write(compressor.flush()) | f.write(compressor.flush()) | ||||
return obj_id | |||||
def get(self, obj_id: ObjId) -> bytes: | def get(self, obj_id: ObjId) -> bytes: | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
# Open the file and return its content as bytes | # Open the file and return its content as bytes | ||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | ||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | ||||
with open(self.slicer.get_path(hex_obj_id), "rb") as f: | with open(self.slicer.get_path(hex_obj_id), "rb") as f: | ||||
▲ Show 20 Lines • Show All 151 Lines • Show Last 20 Lines |