Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/in_memory.py
Show All 22 Lines | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | ||||
return obj_id in self.state | return obj_id in self.state | ||||
def __iter__(self): | def __iter__(self): | ||||
return iter(sorted(self.state)) | return iter(sorted(self.state)) | ||||
def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
return obj_id | return | ||||
self.state[obj_id] = content | self.state[obj_id] = content | ||||
return obj_id | |||||
def get(self, obj_id: ObjId) -> bytes: | def get(self, obj_id: ObjId) -> bytes: | ||||
if obj_id not in self: | if obj_id not in self: | ||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | ||||
return self.state[obj_id] | return self.state[obj_id] | ||||
def check(self, obj_id: ObjId) -> None: | def check(self, obj_id: ObjId) -> None: | ||||
if obj_id not in self: | if obj_id not in self: | ||||
Show All 11 Lines |