Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/storage/archive.py
from typing import List | from typing import Any, Dict, List | ||||
# from functools import lru_cache | # from functools import lru_cache | ||||
from methodtools import lru_cache | from methodtools import lru_cache | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from ..archive import ArchiveInterface | |||||
class ArchiveStorage: | |||||
class ArchiveStorage(ArchiveInterface): | |||||
def __init__(self, cls: str, **kwargs): | def __init__(self, cls: str, **kwargs): | ||||
self.storage = get_storage(cls, **kwargs) | self.storage = get_storage(cls, **kwargs) | ||||
@lru_cache(maxsize=1000000) | @lru_cache(maxsize=1000000) | ||||
def directory_ls(self, id: bytes): | def directory_ls(self, id: bytes) -> List[Dict[str, Any]]: | ||||
# TODO: filter unused fields | # TODO: filter unused fields | ||||
return [entry for entry in self.storage.directory_ls(id)] | return [entry for entry in self.storage.directory_ls(id)] | ||||
def iter_origins(self): | def iter_origins(self): | ||||
from swh.storage.algos.origin import iter_origins | from swh.storage.algos.origin import iter_origins | ||||
yield from iter_origins(self.storage) | yield from iter_origins(self.storage) | ||||
Show All 25 Lines |