Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/generator.py
from itertools import count, islice, repeat | from itertools import count, islice, repeat | ||||
import logging | import logging | ||||
import random | import random | ||||
from typing import Iterator, Optional | |||||
from swh.objstorage.interface import ObjId | |||||
from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage | from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class Randomizer: | class Randomizer: | ||||
def __init__(self): | def __init__(self): | ||||
self.size = 0 | self.size = 0 | ||||
▲ Show 20 Lines • Show All 193 Lines • ▼ Show 20 Lines | def add(self, content, obj_id, check_presence=True, *args, **kwargs): | ||||
pass | pass | ||||
def check(self, obj_id, *args, **kwargs): | def check(self, obj_id, *args, **kwargs): | ||||
return True | return True | ||||
def delete(self, obj_id, *args, **kwargs): | def delete(self, obj_id, *args, **kwargs): | ||||
return True | return True | ||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content( | ||||
self, | |||||
last_obj_id: Optional[ObjId] = None, | |||||
limit: int = DEFAULT_LIMIT, | |||||
) -> Iterator[ObjId]: | |||||
it = iter(self) | it = iter(self) | ||||
if last_obj_id: | if last_obj_id: | ||||
next(it) | next(it) | ||||
it.send(int(last_obj_id)) | it.send(int(last_obj_id)) | ||||
return islice(it, limit) | return islice(it, limit) |